はじめに
現代のJavaアプリケーション開発において、非同期処理は必須のスキルとなっています。大規模データの処理、リアルタイムな応答性の要求、マイクロサービスアーキテクチャの採用など、様々な場面で非同期処理の実装が求められています。
なぜ今、非同期処理が重要なのか
1. パフォーマンスの要求
● ユーザー体験の向上
● システムのスループット改善
● リソースの効率的な利用
2. システムの複雑化
● マイクロサービスアーキテクチャの普及
● 分散システムの一般化
● リアルタイム処理の需要増加
3. 開発生産性の向上
● モダンなJavaの機能による実装の簡素化
● 非同期処理のフレームワークやライブラリの充実
● ベストプラクティスの確立
この記事の目的
本記事では、Javaにおける非同期処理の実装について、基礎から実践まで体系的に解説します。初級者から中級者のJavaエンジニアを対象に、以下の内容を提供します。
- 非同期処理の基本概念と実装パターン
- 実践的な実装テクニックとベストプラクティス
- パフォーマンスチューニングの方法
- 実際のプロジェクトでの活用例
各セクションでは、実行可能なコード例と具体的な実装手順を提供し、実務で即座に活用できる知識の習得を目指します。
それでは、Javaにおける非同期処理の世界を詳しく見ていきましょう。
1.非同期処理の基礎知識
1.1 同期処理vs非同期処理:パフォーマンスの違い
同期処理と非同期処理の最も大きな違いは、タスクの実行方法にあります。以下の表で主な違いを比較してみましょう。
| 特性 | 同期処理 | 非同期処理 |
|---|---|---|
| 実行順序 | 順次実行(直列) | 並行実行(並列可能) |
| ブロッキング | あり(処理完了待ち) | なし(他の処理を実行可能) |
| 実装の複雑さ | シンプル | やや複雑 |
| リソース効率 | 低い | 高い |
| 適用ケース | 順序依存の処理 | I/O処理、並列可能な処理 |
パフォーマンス比較例
実際の処理時間の違いを、ファイル読み込みを例に見てみましょう。
// 同期処理の場合
public void syncProcessing() {
// 3つの重い処理を順次実行
processFile("file1.txt"); // 3秒
processFile("file2.txt"); // 3秒
processFile("file3.txt"); // 3秒
// 合計実行時間:約9秒
}
// 非同期処理の場合
public void asyncProcessing() {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> processFile("file1.txt")),
CompletableFuture.runAsync(() -> processFile("file2.txt")),
CompletableFuture.runAsync(() -> processFile("file3.txt"))
).join();
// 合計実行時間:約3秒
}
1.2 Javaで非同期処理が必要なケース5選
1. 大量データの処理
● バッチ処理
● データ変換処理
● ログ解析処理
2. 外部システムとの連携
● APIリクエスト
● データベース操作
● ファイルI/O
3. 並列計算が必要な処理
● 画像・動画処理
● 科学技術計算
● シミュレーション
4. バックグラウンド処理
● メール送信
● 定期的なデータ更新
● キャッシュの更新
5. リアルタイム処理
● Webソケット通信
● プッシュ通知
● ストリーミングデータ処理
1.3 非同期処理の基本的な仕組み
基本的な処理フロー
1. メインスレッドが非同期タスクを作成
2. 別スレッドでタスクを実行
3. 結果取得時に必要に応じてコールバック実行
// 基本的な非同期処理の実装例
public class AsyncExample {
public static void main(String[] args) {
// 1. 非同期タスクの作成
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 2. 別スレッドでの処理
try {
Thread.sleep(1000); // 重い処理をシミュレート
return "処理完了";
} catch (InterruptedException e) {
return "エラー発生";
}
});
// 3. コールバックの設定
future.thenAccept(result -> {
System.out.println("結果: " + result);
});
// メインスレッドは他の処理を続行可能
System.out.println("メイン処理は継続...");
}
}
スレッドとタスクの関係
非同期処理では、以下の要素が重要な役割を果たします。
● スレッドプール:
● タスクを実行するスレッドを管理
● リソースの効率的な利用を実現
● スレッドの再利用により、生成コストを削減
● タスクキュー:
● 実行待ちのタスクを管理
● 優先順位付けが可能
● スレッドプールと連携して処理を最適化
非同期処理の注意点
1. スレッドセーフティ
● 共有リソースへの同時アクセスに注意
● 適切な同期機構の使用が必要
2. 例外処理
● 非同期処理での例外は別スレッドで発生
● 適切なエラーハンドリングが重要
3. リソース管理
● スレッドプールのサイズ設定
● メモリ使用量の監視
● リソースのクリーンアップ
このように、非同期処理は適切に実装することで、アプリケーションのパフォーマンスを大幅に向上させることができます。次のセクションでは、より具体的な実装パターンについて詳しく見ていきましょう。
2.Javaにおける7つの非同期処理実装パターン
2.1 Thread/Runnableによる基本実装
最も基本的な非同期処理の実装方法です。直接スレッドを制御できる反面、管理が複雑になりやすい特徴があります。
public class BasicAsyncExample {
public static void main(String[] args) {
// Runnableを使用した実装
Runnable task = () -> {
try {
// 非同期で実行する処理
System.out.println("Processing in thread: " + Thread.currentThread().getName());
Thread.sleep(1000);
System.out.println("Task completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 新しいスレッドで実行
Thread thread = new Thread(task);
thread.start();
// メインスレッドの処理を継続
System.out.println("Main thread continues...");
}
}
- シンプルな非同期処理に適している
- スレッドの完全な制御が必要な場合に使用
- プロトタイプ開発や学習用途に適している
2.2 ExecutorServiceによるスレッドプール管理
スレッドのライフサイクルを効率的に管理し、リソースを最適化します。
public class ExecutorServiceExample {
public static void main(String[] args) {
// スレッドプールの作成
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// 複数のタスクを提出
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executing in " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Task " + taskId + " result";
});
}
} finally {
// 正しいシャットダウン処理
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
- スレッドの再利用によるリソース効率の向上
- タスクのキューイングと実行管理
- 柔軟なスレッドプール設定
2.3 Future/CallableでJavaの非同期処理を制御する
戻り値のある非同期処理を実装する際に使用します。
public class FutureCallableExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// Callableタスクの作成
Callable<Integer> task = () -> {
Thread.sleep(2000); // 重い処理をシミュレート
return 42;
};
// Future経由で実行
Future<Integer> future = executor.submit(task);
// 他の処理を実行可能
System.out.println("Waiting for result...");
try {
// 結果の取得(ブロッキング)
Integer result = future.get(3, TimeUnit.SECONDS);
System.out.println("Result: " + result);
} catch (TimeoutException e) {
future.cancel(true);
System.out.println("Task timed out");
}
} finally {
executor.shutdownNow();
}
}
}
- 処理結果の取得が可能
- タイムアウト設定
- タスクのキャンセル機能
2.4 CompletableFutureを使用したモダンな実装
Java 8以降で導入された、より柔軟な非同期処理のAPI。関数型プログラミングスタイルでチェーン処理が可能です。
public class CompletableFutureExample {
public static void main(String[] args) {
// 非同期処理の連鎖
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 第一段階の処理
return "Hello";
})
.thenApplyAsync(s -> {
// 第二段階の処理
return s + " CompletableFuture";
})
.thenApplyAsync(String::toUpperCase);
// エラーハンドリング
future
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Error: " + throwable.getMessage());
return null;
});
// 複数の非同期処理の組み合わせ
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "First");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Second");
CompletableFuture.allOf(future1, future2)
.thenRun(() -> System.out.println("All tasks completed"));
}
}
- 非同期処理の連鎖が可能
- 宣言的なエラーハンドリング
- 複数の非同期処理の組み合わせが容易
2.5 ParallelStreamによる簡潔な非同期処理
コレクションの並列処理を簡潔に記述できます。
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 並列処理の実行
long sum = numbers.parallelStream()
.filter(n -> n % 2 == 0) // 偶数のフィルタリング
.mapToLong(n -> {
try {
Thread.sleep(100); // 重い処理をシミュレート
return n * n;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0;
}
})
.sum();
System.out.println("Sum of squares of even numbers: " + sum);
}
}
- データの並列処理
- 関数型プログラミングスタイルの処理
- 簡潔な実装が必要な場合
2.6 Spring Framework @Asyncによる宣言的非同期処理
Spring Frameworkを使用したアプリケーションでの非同期処理実装方法です。
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.initialize();
return executor;
}
}
@Service
public class AsyncService {
@Async
public CompletableFuture<String> asyncMethod() {
try {
Thread.sleep(1000);
return CompletableFuture.completedFuture("Async process completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
}
- 宣言的な非同期処理の実装
- Spring Frameworkの機能との統合
- AOP based implementation
2.7 ReactiveプログラミングによるノンブロッキングI/O
Project ReactorやRxJavaを使用した、リアクティブな非同期処理の実装です。
public class ReactiveExample {
public static void main(String[] args) {
// Fluxを使用したストリーム処理
Flux.just(1, 2, 3, 4, 5)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(
n -> System.out.println("Received: " + n),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
// バックプレッシャーの実装
Flux.range(1, 100)
.onBackpressureBuffer(10)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
request(1);
}
});
}
}
- バックプレッシャーのサポート
- ノンブロッキングI/O
- データストリームの宣言的な処理
- 高度なエラーハンドリング
各実装パターンには、それぞれ適した使用場面があります。以下の表で比較してみましょう。
| 実装パターン | 複雑さ | スケーラビリティ | 用途 |
|---|---|---|---|
| Thread/Runnable | 低 | 低 | シンプルな非同期処理 |
| ExecutorService | 中 | 中 | スレッドプール管理が必要な場合 |
| Future/Callable | 中 | 中 | 結果が必要な非同期処理 |
| CompletableFuture | 高 | 高 | 複雑な非同期処理チェーン |
| ParallelStream | 低 | 中 | コレクションの並列処理 |
| Spring @Async | 低 | 中 | Springアプリケーション |
| Reactive | 高 | 高 | ストリーム処理、高負荷システム |
3.実践的な非同期処理の実装テクニック
3.1 効率的なスレッドプールの設計方法
スレッドプールの適切な設計は、非同期処理のパフォーマンスに直接影響を与えます。
スレッドプール設計の基本原則
public class OptimizedThreadPoolExample {
public static ExecutorService createOptimizedThreadPool() {
// プロセッサ数の取得
int processors = Runtime.getRuntime().availableProcessors();
// スレッドプールの設定
ThreadPoolExecutor executor = new ThreadPoolExecutor(
processors, // コアスレッド数
processors * 2, // 最大スレッド数
60L, TimeUnit.SECONDS, // アイドルスレッドの生存時間
new LinkedBlockingQueue<>(1000), // タスクキューの最大サイズ
new ThreadPoolExecutor.CallerRunsPolicy() // 拒否ポリシー
);
// スレッドプールの監視設定
executor.setRejectedExecutionHandler((r, e) -> {
System.out.println("Task rejected: " + r.toString());
throw new RejectedExecutionException("Task rejected");
});
return executor;
}
// スレッドプールの使用例
public static void main(String[] args) {
ExecutorService executor = createOptimizedThreadPool();
try {
// タスクの実行
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
System.out.println("Task executing in: " +
Thread.currentThread().getName());
return "Task result";
});
}
} finally {
executor.shutdown();
}
}
}
スレッドプール設計のベストプラクティス
1. 適切なスレッド数の設定
● CPU集中型タスク:プロセッサ数 + 1
● I/O集中型タスク:プロセッサ数 * 2
● 混合タスク:プロセッサ数 * 1.5(四捨五入)
2. キューサイズの最適化
● メモリ制約を考慮
● タスクの優先順位付け
● バックプレッシャーの実装
3.2 例外処理のベストプラクティス
非同期処理における例外は、メインスレッドとは異なるコンテキストで発生するため、適切な処理が重要です。
public class AsyncExceptionHandlingExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
})
.handle((result, ex) -> {
if (ex != null) {
// 例外のログ記録
logger.error("Async operation failed", ex);
// 代替値の返却
return "Fallback value";
}
return result;
})
.whenComplete((result, ex) -> {
// 完了時の処理(成功/失敗両方で実行)
System.out.println("Operation completed with result: " + result);
});
// タイムアウト処理の実装
try {
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// タイムアウト時の処理
future.cancel(true);
}
}
// 構造化された例外処理
@Async
public CompletableFuture<String> processAsync() {
return CompletableFuture.supplyAsync(() -> {
try {
// 危険な処理
return performRiskyOperation();
} catch (Exception e) {
// ビジネス例外への変換
throw new BusinessException("Operation failed", e);
}
}).exceptionally(ex -> {
if (ex instanceof BusinessException) {
return handleBusinessException((BusinessException) ex);
} else {
return handleSystemException(ex);
}
});
}
}
例外処理のポイント
1. 例外の種類に応じた処理
● 一時的な例外:リトライ
● 永続的な例外:代替処理
● システム例外:ログ記録とモニタリング
2. グレースフルデグラデーション
● フォールバック値の提供
● 部分的な結果の許容
● ユーザーへの適切な通知
3.3 デッドロックを防ぐための設計パターン
デッドロックは非同期処理における重大な問題の一つです。以下の設計パターンで防止できます。
public class DeadlockPreventionExample {
// リソースの順序付けによるデッドロック防止
public class ResourceManager {
private final Object resource1 = new Object();
private final Object resource2 = new Object();
public void process() {
// リソースの獲得順序を固定
synchronized (resource1) {
synchronized (resource2) {
// 安全な処理
}
}
}
}
// タイムアウトによるデッドロック防止
public boolean processWithTimeout(Lock lock1, Lock lock2) {
try {
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
// 処理実行
return true;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
// 非同期処理でのデッドロック防止
public CompletableFuture<String> safeAsyncProcess() {
return CompletableFuture
.supplyAsync(() -> "Step 1")
.thenApplyAsync(result -> result + " -> Step 2")
.orTimeout(3, TimeUnit.SECONDS) // タイムアウト設定
.exceptionally(ex -> "Process failed: " + ex.getMessage());
}
}
デッドロック防止のベストプラクティス
1. リソースの順序付け
● 一貫した順序でのロック取得
● リソースの階層構造の定義
● グローバルな順序付けの維持
2. タイムアウトの活用
● ロック取得のタイムアウト設定
● 処理全体のタイムアウト設定
● デッドロック検出と解決
3. ロックの最小化
● ロックの範囲を最小限に
● 非ブロッキング処理の活用
● イミュータブルなデータ構造の使用
これらの実装テクニックを適切に組み合わせることで、堅牢な非同期処理システムを構築できます。
4.非同期処理のパフォーマンスチューニング
4.1 スレッド数の最適化手法
スレッド数の最適化は、システムのパフォーマンスを大きく左右します。
public class ThreadOptimizationExample {
public static class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
private final AtomicLong totalTasksCompleted = new AtomicLong(0);
private final AtomicLong totalProcessingTime = new AtomicLong(0);
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
}
// スレッドプールのパフォーマンス指標を計測
public void monitorPerformance() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
System.out.println("===== Thread Pool Statistics =====");
System.out.println("Active Threads: " + executor.getActiveCount());
System.out.println("Pool Size: " + executor.getPoolSize());
System.out.println("Queue Size: " + executor.getQueue().size());
System.out.println("Completed Tasks: " + totalTasksCompleted.get());
// 平均処理時間の計算
long completedTasks = totalTasksCompleted.get();
if (completedTasks > 0) {
double avgProcessingTime =
(double) totalProcessingTime.get() / completedTasks;
System.out.println("Avg Processing Time: " +
String.format("%.2f ms", avgProcessingTime));
}
}, 0, 5, TimeUnit.SECONDS);
}
// タスク実行時間の記録
public void recordTaskCompletion(long processingTime) {
totalTasksCompleted.incrementAndGet();
totalProcessingTime.addAndGet(processingTime);
}
}
// 最適化されたスレッドプールの作成
public static ThreadPoolExecutor createOptimizedThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
// タスク実行前の処理
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// タスク実行後の処理
super.afterExecute(r, t);
}
};
}
}
スレッド数最適化のポイント
1. システムリソースに基づく調整
● CPU使用率の監視
● メモリ使用量の確認
● I/O待ち時間の考慮
2. 動的なスレッド数調整
● 負荷に応じた増減
● キュー長に基づく調整
● レイテンシの監視
4.2 メモリ使用量のモニタリングと制御
非同期処理におけるメモリリークを防ぎ、効率的なメモリ使用を実現します。
public class MemoryMonitoringExample {
private static final Runtime runtime = Runtime.getRuntime();
// メモリ使用状況のモニタリング
public static void monitorMemoryUsage() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
System.out.println("===== Memory Statistics =====");
System.out.println("Used Memory: " +
formatSize(usedMemory));
System.out.println("Free Memory: " +
formatSize(freeMemory));
System.out.println("Total Memory: " +
formatSize(totalMemory));
System.out.println("Max Memory: " +
formatSize(runtime.maxMemory()));
}, 0, 10, TimeUnit.SECONDS);
}
// メモリサイズのフォーマット
private static String formatSize(long bytes) {
return String.format("%.2f MB", bytes / (1024.0 * 1024.0));
}
// メモリリーク防止のためのWeakReferenceの使用例
public static class CacheManager<K, V> {
private final Map<K, WeakReference<V>> cache = new ConcurrentHashMap<>();
public void put(K key, V value) {
cache.put(key, new WeakReference<>(value));
}
public V get(K key) {
WeakReference<V> ref = cache.get(key);
if (ref != null) {
V value = ref.get();
if (value == null) {
cache.remove(key);
}
return value;
}
return null;
}
}
}
メモリ管理のベストプラクティス
1. リソースのクリーンアップ
● AutoCloseableの実装
● try-with-resourcesの使用
● 明示的なリソース解放
2. メモリリークの防止
● WeakReferenceの適切な使用
● キャッシュのライフサイクル管理
● 循環参照の回避
4.3 ボトルネック特定と改善方法
パフォーマンスボトルネックを特定し、効果的な改善を行います。
public class PerformanceAnalyzer {
// パフォーマンス測定用のアノテーション
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Measure {
String value() default "";
}
// アスペクトによるパフォーマンス測定
@Aspect
@Component
public static class PerformanceAspect {
private static final Logger logger =
LoggerFactory.getLogger(PerformanceAspect.class);
@Around("@annotation(measure)")
public Object measureExecutionTime(ProceedingJoinPoint joinPoint,
Measure measure) throws Throwable {
long startTime = System.nanoTime();
try {
return joinPoint.proceed();
} finally {
long duration = System.nanoTime() - startTime;
logger.info("{}: {} took {} ms",
measure.value(),
joinPoint.getSignature().getName(),
duration / 1_000_000.0);
}
}
}
// ボトルネック分析用のメトリクス収集
public static class MetricsCollector {
private final Map<String, Histogram> metrics = new ConcurrentHashMap<>();
public void recordExecutionTime(String operation, long duration) {
metrics.computeIfAbsent(operation,
k -> new Histogram(new UniformReservoir()))
.update(duration);
}
public void printStatistics() {
metrics.forEach((operation, histogram) -> {
System.out.println("===== " + operation + " Statistics =====");
System.out.println("Mean: " + histogram.getSnapshot().getMean());
System.out.println("99th Percentile: " +
histogram.getSnapshot().get99thPercentile());
System.out.println("Max: " + histogram.getSnapshot().getMax());
});
}
}
}
パフォーマンス改善のポイント
1. 測定と分析
● メトリクスの継続的な収集
● パフォーマンスプロファイリング
● ボトルネックの特定
2. 最適化戦略
● バッチ処理の活用
● キャッシュの適切な使用
● 非同期処理の粒度調整
3. 継続的なモニタリング
● アラートの設定
● トレンド分析
● 予防的な対応
これらのチューニング技術を適切に組み合わせることで、効率的な非同期処理システムを実現できます。
5.実際のプロジェクトでの活用例
5.1 大規模データ処理での非同期処理の実装例
大量のデータを効率的に処理する実装例を見ていきましょう。
public class LargeDataProcessingExample {
public class DataProcessor {
private final ExecutorService executor;
private final int batchSize;
private final ConcurrentLinkedQueue<ProcessingResult> results;
public DataProcessor(int threadCount, int batchSize) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.batchSize = batchSize;
this.results = new ConcurrentLinkedQueue<>();
}
// 大規模データの非同期バッチ処理
public CompletableFuture<List<ProcessingResult>> processLargeDataset(
List<DataRecord> records) {
// データを適切なサイズのバッチに分割
List<List<DataRecord>> batches = splitIntoBatches(records, batchSize);
// 各バッチを非同期で処理
List<CompletableFuture<ProcessingResult>> futures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(
() -> processBatch(batch), executor))
.collect(Collectors.toList());
// 全バッチの処理完了を待機
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private ProcessingResult processBatch(List<DataRecord> batch) {
ProcessingResult result = new ProcessingResult();
try {
// バッチ処理のメイン処理
for (DataRecord record : batch) {
result.addProcessedRecord(processRecord(record));
}
} catch (Exception e) {
result.setError(e);
}
return result;
}
// 進捗モニタリング機能
public class ProcessingMonitor {
private final AtomicLong totalProcessed = new AtomicLong(0);
private final long totalRecords;
public ProcessingMonitor(long totalRecords) {
this.totalRecords = totalRecords;
}
public void updateProgress(int processedCount) {
long current = totalProcessed.addAndGet(processedCount);
double progress = (current * 100.0) / totalRecords;
System.out.printf("Progress: %.2f%% (%d/%d records)%n",
progress, current, totalRecords);
}
}
}
}
大規模データ処理のポイント
1. バッチ処理の最適化
● 適切なバッチサイズの決定
● メモリ使用量の制御
● 進捗モニタリング
2. エラーハンドリング
● 部分的な失敗の許容
● リトライ機構の実装
● エラーログの集約
5.2 WebアプリケーションでのAsync Processing活用術
Webアプリケーションにおける非同期処理の実装例です。
@RestController
@RequestMapping("/api")
public class AsyncWebServiceExample {
private final AsyncTaskExecutor taskExecutor;
private final TaskRepository taskRepository;
// 非同期処理を行うAPIエンドポイント
@PostMapping("/tasks")
public ResponseEntity<TaskResponse> submitTask(
@RequestBody TaskRequest request) {
// タスクの登録
Task task = taskRepository.save(
Task.builder()
.status(TaskStatus.PENDING)
.request(request)
.build());
// 非同期処理の開始
CompletableFuture.runAsync(() -> {
try {
processTask(task);
} catch (Exception e) {
handleTaskError(task, e);
}
}, taskExecutor);
// 即座にレスポンスを返す
return ResponseEntity.accepted()
.body(new TaskResponse(task.getId(), TaskStatus.PENDING));
}
// タスクのステータス確認API
@GetMapping("/tasks/{taskId}")
public ResponseEntity<TaskStatus> getTaskStatus(
@PathVariable Long taskId) {
return taskRepository.findById(taskId)
.map(task -> ResponseEntity.ok(task.getStatus()))
.orElse(ResponseEntity.notFound().build());
}
// WebSocketによるリアルタイム通知
@MessageMapping("/tasks/{taskId}/progress")
public class TaskProgressNotifier {
private final SimpMessagingTemplate messagingTemplate;
public void notifyProgress(Long taskId, int progress) {
String destination = "/topic/tasks/" + taskId + "/progress";
messagingTemplate.convertAndSend(destination,
new ProgressUpdate(taskId, progress));
}
}
}
Webアプリケーションでの実装ポイント
1. APIの設計
● 非同期APIのインターフェース設計
● ステータス確認の仕組み
● WebSocketによる進捗通知
2. エラー処理
● タイムアウト設定
● リトライ戦略
● クライアントへの通知
5.3 マイクロサービスでの非同期通信パターン
マイクロサービスアーキテクチャにおける非同期通信の実装例です。
@Service
public class AsyncMicroserviceCommunicationExample {
// イベント発行側の実装
public class OrderService {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
private final OrderRepository orderRepository;
public CompletableFuture<OrderResult> processOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
// 注文の処理
OrderEvent event = createOrderEvent(order);
// イベントの発行
return kafkaTemplate.send("orders", event)
.thenApply(result -> {
orderRepository.save(order);
return new OrderResult(
order.getId(), OrderStatus.PROCESSING);
})
.exceptionally(ex -> {
// エラーハンドリング
order.setStatus(OrderStatus.ERROR);
orderRepository.save(order);
throw new OrderProcessingException(
"Failed to process order", ex);
});
});
}
}
// イベント受信側の実装
@Service
public class InventoryService {
@KafkaListener(topics = "orders")
public void handleOrderEvent(OrderEvent event) {
try {
// 在庫の確認と更新
updateInventory(event);
// 結果イベントの発行
publishInventoryUpdatedEvent(event);
} catch (Exception e) {
// 補償トランザクションの実行
compensateFailedInventoryUpdate(event);
}
}
}
// サーキットブレーカーパターンの実装
@Service
public class ResilientServiceCaller {
private final CircuitBreaker circuitBreaker;
public CompletableFuture<ServiceResponse> callService(
ServiceRequest request) {
return circuitBreaker.executeAsync(() ->
actualServiceCall(request))
.thenApply(this::processResponse)
.exceptionally(this::handleFailure);
}
}
}
マイクロサービス実装のポイント
1. メッセージング設計
● イベント駆動アーキテクチャ
● 非同期メッセージングパターン
● 冪等性の確保
2. 分散トランザクション
● Sagaパターンの実装
● 補償トランザクション
● データの整合性確保
3. レジリエンス
● サーキットブレーカー
● リトライポリシー
● フォールバック処理
これらの実装例は、実際のプロジェクトで活用できる具体的なパターンとテクニックを示しています。プロジェクトの要件や制約に応じて、適切なパターンを選択し、カスタマイズすることが重要です。
まとめと今後の学習方針
本記事のまとめ
1. 非同期処理の基礎
● 同期・非同期の違いを理解
● 適切なユースケースの把握
● 基本的な実装パターンの習得
2. 実装テクニック
● 7つの実装パターンの使い分け
● 効率的なスレッドプール設計
● 適切な例外処理の実装
3. パフォーマンス最適化
● スレッド数の最適化
● メモリ使用量の制御
● ボトルネックの特定と改善
4. 実践的な活用
● 大規模データ処理での実装
● Webアプリケーションでの活用
● マイクロサービスでの応用
今後の学習のために
1. 深く学ぶべき領域
● リアクティブプログラミング
● 分散システムの設計パターン
● パフォーマンスチューニング手法
2. 推奨される学習リソース
● Java Concurrency in Practice(書籍)
● Spring Framework公式ドキュメント
● Project Reactor documentation
3. 実践的なステップ
● 小規模なプロジェクトでの試験的導入
● パフォーマンス計測の習慣化
● コードレビューでの知見共有
非同期処理の実装は、継続的な学習と実践が重要です。本記事で紹介した内容を基礎として、実際のプロジェクトでの経験を積み重ねることで、より深い理解と実装スキルを身につけることができます。

