Nettyとは?基礎から理解する非同期ネットワークフレームワーク
Javaの標準ネットワークAPIの限界と課題
従来のJavaネットワークプログラミングでは、java.netパッケージを使用して以下のような実装を行っていました:
// 従来の同期型サーバー実装
public class TraditionalServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
// クライアント接続を待機(ブロッキング)
Socket clientSocket = serverSocket.accept();
// 1接続に対して1スレッドを割り当て
new Thread(() -> {
try {
handleClient(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
private static void handleClient(Socket socket) throws IOException {
// クライアント処理(入出力処理もブロッキング)
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String line;
while ((line = in.readLine()) != null) {
out.println("サーバーレスポンス: " + line);
}
}
}
この実装には以下の課題があります:
- スケーラビリティの制限
- 接続ごとにスレッドを生成
- 大量接続時のメモリ消費が増大
- コンテキストスイッチのオーバーヘッド
- ブロッキング処理による非効率性
- I/O処理中はスレッドがブロック
- リソースの効率的な利用ができない
- レスポンス時間の増大
Nettyが解決する5つの技術課題
Nettyは以下の課題を効果的に解決します:
- 非同期I/Oの実現
- イベント駆動型アーキテクチャの採用
- ノンブロッキングI/Oの実装
- 効率的なリソース利用
- メモリ管理の最適化
- ByteBufによる効率的なメモリ管理
- プーリングによるメモリ再利用
- ゼロコピー転送のサポート
- プロトコル対応の簡素化
- 豊富な組み込みプロトコル
- カスタムプロトコルの容易な実装
- プロトコル変換の柔軟性
- スレッド管理の最適化
- EventLoopによる効率的なスレッド利用
- スレッドプールの自動管理
- マルチコア活用の最適化
- エラーハンドリングの改善
- 統一的な例外処理の仕組み
- デバッグ情報の充実
- トラブルシューティングの容易さ
Nettyのアーキテクチャと主要要素
Nettyの基本的なアーキテクチャは以下の要素で構成されています:
// Nettyを使用した基本的なサーバー実装
public class NettyServer {
public static void main(String[] args) throws Exception {
// イベントループグループの設定
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// パイプラインの設定
ch.pipeline().addLast(
new StringDecoder(),
new StringEncoder(),
new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(
ChannelHandlerContext ctx, String msg) {
// メッセージ処理
ctx.writeAndFlush("応答: " + msg);
}
});
}
});
// サーバーの起動
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
// リソースの解放
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
主要コンポーネントの説明:
- EventLoopGroup
- ボスグループ:新規接続の受付
- ワーカーグループ:接続済みチャネルの処理
- Channel
- ネットワーク操作の抽象化
- 非同期I/Oの実現
- バッファ管理の効率化
- ChannelPipeline
- チャネルハンドラの管理
- データ処理の流れ制御
- プロトコル変換の実装
- ByteBuf
- 効率的なメモリ管理
- 参照カウント方式
- プーリングによる再利用
このアーキテクチャにより、Nettyは高性能で柔軟なネットワークアプリケーションの開発を可能にしています。
Netty実践入門:基本的な実装パターン
シンプルなTCPサーバーの実装手順
基本的なTCPサーバーを実装する手順を、段階的に説明します。
1. サーバー実装の基本構造
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
// イベントループグループの作成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// サーバーブートストラップの設定
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoServerHandler());
}
});
// サーバーの起動
ChannelFuture f = b.bind(port).sync();
System.out.println("サーバーが起動しました。ポート: " + port);
// シャットダウン待機
f.channel().closeFuture().sync();
} finally {
// リソースの解放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
2. ハンドラーの実装
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 受信データの処理
ByteBuf in = (ByteBuf) msg;
try {
// データの読み取りと処理
System.out.println("受信メッセージ: " + in.toString(CharsetUtil.UTF_8));
// エコーバック
ctx.write(in);
} finally {
// 明示的な解放は不要(writeで転送)
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 書き込みバッファのフラッシュ
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// エラーハンドリング
cause.printStackTrace();
ctx.close();
}
}
効率的なイベントハンドラーの設計方法
イベントハンドラーの設計では、以下のパターンを活用します:
1. パイプライン処理の実装
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// デコーダーの追加
p.addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in,
List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(in.readInt());
}
});
// エンコーダーの追加
p.addLast(new MessageToByteEncoder<Integer>() {
@Override
protected void encode(ChannelHandlerContext ctx,
Integer msg,
ByteBuf out) {
out.writeInt(msg);
}
});
// ビジネスロジックハンドラーの追加
p.addLast(new BusinessLogicHandler());
}
}
2. 状態管理パターン
public class StatefulHandler extends ChannelInboundHandlerAdapter {
private enum State {
WAIT_LENGTH, WAIT_CONTENT
}
private State currentState = State.WAIT_LENGTH;
private int expectedLength;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
switch (currentState) {
case WAIT_LENGTH:
if (in.readableBytes() >= 4) {
expectedLength = in.readInt();
currentState = State.WAIT_CONTENT;
}
break;
case WAIT_CONTENT:
if (in.readableBytes() >= expectedLength) {
ByteBuf content = in.readBytes(expectedLength);
// コンテンツの処理
currentState = State.WAIT_LENGTH;
}
break;
}
}
}
ByteBufを使った効率的なデータ処理
ByteBufの効率的な使用方法を示します:
1. ByteBuf操作の基本パターン
public class ByteBufHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
// 読み取り操作
while (buf.isReadable()) {
System.out.print((char) buf.readByte());
}
System.out.println();
// 新しいバッファの作成
ByteBuf response = ctx.alloc().buffer();
response.writeBytes("応答メッセージ".getBytes());
ctx.writeAndFlush(response);
} finally {
// リソースの解放
buf.release();
}
}
}
2. 効率的なメモリ利用
public class MemoryEfficientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBufAllocator POOLED_ALLOCATOR =
PooledByteBufAllocator.DEFAULT;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// プールされたバッファの使用
ByteBuf buf = POOLED_ALLOCATOR.directBuffer();
try {
// ゼロコピー転送の実装
ByteBuf input = (ByteBuf) msg;
buf.writeBytes(input, input.readerIndex(), input.readableBytes());
// データ処理
processBuffer(buf);
// 応答送信
ctx.writeAndFlush(buf.retain());
} finally {
buf.release();
}
}
private void processBuffer(ByteBuf buf) {
// バッファの処理ロジック
// 複合バッファの使用例
CompositeByteBuf compositeBuf = POOLED_ALLOCATOR.compositeBuffer();
compositeBuf.addComponent(true, buf.retain());
compositeBuf.release();
}
}
これらの実装パターンを理解し適切に使用することで、効率的で安定したNettyアプリケーションを構築できます。特に、ByteBufの適切な管理とイベントハンドラーの設計は、アプリケーションのパフォーマンスと保守性に大きく影響します。
実践的なNettyアプリケーション開発
WebSocketのサーバー実装例と解説
WebSocketサーバーの実装例を示します。このサーバーはリアルタイムな双方向通信を実現します。
public class WebSocketServer {
private final int port;
public WebSocketServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer());
Channel ch = b.bind(port).sync().channel();
System.out.println("WebSocketサーバーを起動しました。ws://localhost:" + port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// WebSocketに必要なハンドラーの設定
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
pipeline.addLast(new WebSocketFrameHandler());
}
}
class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) frame).text();
System.out.println("受信メッセージ: " + request);
// エコーレスポンスの送信
ctx.channel().writeAndFlush(
new TextWebSocketFrame("サーバーレスポンス: " + request));
} else {
String message = "未サポートのフレームタイプ: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
}
HTTPサーバーの構築とRESTful APIの実装
HTTPサーバーとRESTful APIの実装例を示します。
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
Channel ch = b.bind(port).sync().channel();
System.out.println("HTTPサーバーを起動しました。http://localhost:" + port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new HttpServerHandler());
}
}
class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
// URIに基づくルーティング
String uri = request.uri();
HttpMethod method = request.method();
if ("/api/users".equals(uri) && method.equals(HttpMethod.GET)) {
handleGetUsers(ctx);
} else if ("/api/users".equals(uri) && method.equals(HttpMethod.POST)) {
handleCreateUser(ctx, request);
} else {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
}
}
private void handleGetUsers(ChannelHandlerContext ctx) {
// ユーザー一覧のJSONレスポンス例
List<User> users = Arrays.asList(
new User("user1", "John"),
new User("user2", "Jane")
);
try {
String json = objectMapper.writeValueAsString(users);
sendJsonResponse(ctx, json);
} catch (JsonProcessingException e) {
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
private void sendJsonResponse(ChannelHandlerContext ctx, String json) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(json, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,
response.content().readableBytes());
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
プロトコルバッファを使用した効率的な通信
Protocol Buffersを使用した効率的な通信の実装例を示します。
// プロトコル定義(example.proto)
syntax = "proto3";
package example;
message User {
string id = 1;
string name = 2;
int32 age = 3;
}
message UserList {
repeated User users = 1;
}
public class ProtobufServer {
private final int port;
public ProtobufServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ProtobufServerInitializer());
Channel ch = b.bind(port).sync().channel();
System.out.println("Protobufサーバーを起動しました。port: " + port);
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class ProtobufServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Protocol Buffersのエンコーダー/デコーダーを追加
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(User.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new ProtobufServerHandler());
}
}
class ProtobufServerHandler extends SimpleChannelInboundHandler<User> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) {
// Protocol Buffersメッセージの処理
System.out.println("受信ユーザー: " + user.getName());
// レスポンスの作成と送信
User response = User.newBuilder()
.setId(user.getId())
.setName("応答: " + user.getName())
.setAge(user.getAge())
.build();
ctx.writeAndFlush(response);
}
}
これらの実装例は、実際のアプリケーション開発で必要となる主要なプロトコルとの統合方法を示しています。各実装において、以下の点に注意が必要です:
- WebSocket実装のポイント:
- WebSocketハンドシェイクの自動処理
- フレームタイプに応じた適切な処理
- 継続的な接続管理
- RESTful API実装のポイント:
- HTTPメソッドとURIに基づくルーティング
- JSONシリアライゼーション/デシリアライゼーション
- 適切なステータスコードとヘッダーの設定
- Protocol Buffers実装のポイント:
- メッセージ定義の管理
- エンコード/デコードの効率化
- バージョニングの考慮
これらのプロトコル実装を適切に組み合わせることで、様々なユースケースに対応できる柔軟なアプリケーションを構築できます。
Nettyアプリケーションの性能最適化
メモリリークを防ぐためのベストプラクティス
Nettyアプリケーションでメモリリークを防ぐための主要なポイントと実装例を示します。
public class MemoryLeakPreventionHandler extends ChannelInboundHandlerAdapter {
// メッセージキューの最大サイズを制限
private static final int MAX_MESSAGES = 1000;
private final Queue<ByteBuf> messageQueue = new ConcurrentLinkedQueue<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
// メッセージキューのサイズチェック
if (messageQueue.size() >= MAX_MESSAGES) {
// 古いメッセージを破棄
ByteBuf oldMsg = messageQueue.poll();
if (oldMsg != null) {
oldMsg.release();
}
}
// メッセージの処理とキューへの追加
processMessage(buf.retain());
messageQueue.offer(buf);
} finally {
// 参照カウントの適切な管理
buf.release();
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// コネクション終了時のクリーンアップ
clearMessageQueue();
}
private void clearMessageQueue() {
ByteBuf msg;
while ((msg = messageQueue.poll()) != null) {
msg.release();
}
}
private void processMessage(ByteBuf buf) {
// メッセージ処理ロジック
}
}
メモリリーク防止のための主要なポイント:
- ByteBuf管理
- 参照カウントの適切な管理
- release()メソッドの確実な呼び出し
- try-finallyブロックの活用
- リソース管理
- キューやバッファのサイズ制限
- 適切なクリーンアップ処理
- メモリプールの活用
スレッドモデルのチューニングテクニック
Nettyのスレッドモデルを最適化するための実装例と設定方法を示します。
public class OptimizedServer {
private final int port;
private final int bossThreads;
private final int workerThreads;
public OptimizedServer(int port, int bossThreads, int workerThreads) {
this.port = port;
this.bossThreads = bossThreads;
this.workerThreads = workerThreads;
}
public void start() throws Exception {
// スレッドプールの最適化設定
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads,
new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads,
new DefaultThreadFactory("worker"));
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new OptimizedServerInitializer());
// 高度な設定
b.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 64 * 1024));
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
スレッドモデル最適化のポイント:
- スレッド数の設定
- CPUコア数に基づく適切なスレッド数
- ワークロードに応じた調整
- モニタリングに基づく最適化
- イベントループの設定
- 適切なイベントループグループの分割
- タスク実行の優先順位付け
- バックプレッシャーの制御
パフォーマンスモニタリングと問題解決
パフォーマンスモニタリングと問題解決のための実装例を示します。
public class PerformanceMonitorHandler extends ChannelDuplexHandler {
private static final Logger logger =
LoggerFactory.getLogger(PerformanceMonitorHandler.class);
private final AtomicLong totalProcessingTime = new AtomicLong(0);
private final AtomicLong totalMessages = new AtomicLong(0);
private final Map<ChannelId, Long> processingStartTimes =
new ConcurrentHashMap<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 処理開始時間の記録
processingStartTimes.put(ctx.channel().id(), System.nanoTime());
// メッセージ処理の統計
totalMessages.incrementAndGet();
ctx.fireChannelRead(msg);
}
@Override
public void flush(ChannelHandlerContext ctx) {
Long startTime = processingStartTimes.remove(ctx.channel().id());
if (startTime != null) {
long processingTime = System.nanoTime() - startTime;
totalProcessingTime.addAndGet(processingTime);
// パフォーマンスメトリクスの記録
if (totalMessages.get() % 1000 == 0) {
logPerformanceMetrics();
}
}
ctx.flush();
}
private void logPerformanceMetrics() {
long avgProcessingTime = totalProcessingTime.get() /
Math.max(1, totalMessages.get());
logger.info("Performance Metrics:");
logger.info("Total Messages: {}", totalMessages.get());
logger.info("Average Processing Time: {} ns", avgProcessingTime);
}
}
モニタリングと問題解決のポイント:
- メトリクス収集
- スループットの測定
- レイテンシーの監視
- リソース使用率の追跡
- 問題検出と解決
- ボトルネックの特定
- メモリリークの検出
- パフォーマンス低下の原因分析
- チューニングのアプローチ
- 段階的な最適化
- A/Bテストによる検証
- 継続的なモニタリング
これらの最適化テクニックを適切に組み合わせることで、高性能で安定したNettyアプリケーションを実現できます。
実運用に向けたNettyの設計パターン
マイクロサービスでのNetty活用事例
マイクロサービスアーキテクチャでのNetty活用例を示します。
public class MicroserviceNettyServer {
private final int port;
private final ServiceRegistry serviceRegistry;
private final LoadBalancer loadBalancer;
public MicroserviceNettyServer(int port) {
this.port = port;
this.serviceRegistry = new ServiceRegistry();
this.loadBalancer = new LoadBalancer();
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MicroserviceChannelInitializer(
serviceRegistry, loadBalancer));
// サービス登録
registerService();
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} finally {
unregisterService();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class MicroserviceChannelInitializer
extends ChannelInitializer<SocketChannel> {
private final ServiceRegistry serviceRegistry;
private final LoadBalancer loadBalancer;
MicroserviceChannelInitializer(
ServiceRegistry serviceRegistry, LoadBalancer loadBalancer) {
this.serviceRegistry = serviceRegistry;
this.loadBalancer = loadBalancer;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new MicroserviceHandler(
serviceRegistry, loadBalancer));
}
}
}
Spring WebFluxとNettyの統合方法
Spring WebFluxとNettyを統合する実装例を示します。
@Configuration
public class NettyWebFluxConfig {
@Bean
public HttpHandler httpHandler(WebFluxProperties properties) {
return WebHttpHandlerBuilder
.applicationContext(context)
.build();
}
@Bean
public ReactorHttpHandlerAdapter handlerAdapter(HttpHandler httpHandler) {
return new ReactorHttpHandlerAdapter(httpHandler);
}
@Bean
public DisposableServer nettyServer(
ReactorHttpHandlerAdapter handlerAdapter,
@Value("${server.port:8080}") int port) {
return HttpServer.create()
.port(port)
.handle(handlerAdapter)
.bind()
.block();
}
}
@RestController
@RequestMapping("/api")
public class ReactiveController {
@GetMapping("/data")
public Flux<DataItem> getData() {
return Flux.interval(Duration.ofMillis(100))
.map(i -> new DataItem("item-" + i))
.take(10);
}
@PostMapping("/process")
public Mono<Response> processData(@RequestBody Mono<Request> request) {
return request
.flatMap(this::processRequest)
.timeout(Duration.ofSeconds(5));
}
}
本番環境での障害対策とトラブルシューティング
本番環境での障害対策とトラブルシューティングの実装例を示します。
public class ProductionReadyServer {
private static final Logger logger =
LoggerFactory.getLogger(ProductionReadyServer.class);
private final MetricsRegistry metricsRegistry;
private final CircuitBreaker circuitBreaker;
private final RetryPolicy retryPolicy;
public ProductionReadyServer() {
this.metricsRegistry = new MetricsRegistry();
this.circuitBreaker = new CircuitBreaker();
this.retryPolicy = new RetryPolicy();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 例外の分類と適切な処理
if (cause instanceof ConnectTimeoutException) {
handleTimeout(ctx, (ConnectTimeoutException) cause);
} else if (cause instanceof MemoryLeakException) {
handleMemoryLeak(ctx, (MemoryLeakException) cause);
} else {
handleUnexpectedError(ctx, cause);
}
}
private void handleTimeout(
ChannelHandlerContext ctx, ConnectTimeoutException e) {
// タイムアウト時の処理
if (retryPolicy.shouldRetry()) {
retryOperation(ctx);
} else {
sendErrorResponse(ctx, HttpResponseStatus.GATEWAY_TIMEOUT);
}
metricsRegistry.incrementTimeoutCounter();
}
private void handleMemoryLeak(
ChannelHandlerContext ctx, MemoryLeakException e) {
// メモリリーク検出時の処理
logger.error("Memory leak detected", e);
System.gc(); // 明示的なGC呼び出し
metricsRegistry.incrementMemoryLeakCounter();
ctx.close();
}
private class HealthCheckHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HealthCheckRequest) {
// ヘルスチェックの実行
performHealthCheck(ctx);
} else {
ctx.fireChannelRead(msg);
}
}
private void performHealthCheck(ChannelHandlerContext ctx) {
HealthStatus status = new HealthStatus();
status.setMemoryUsage(getMemoryUsage());
status.setThreadStats(getThreadStats());
status.setConnectionCount(getActiveConnections());
ctx.writeAndFlush(status);
}
}
}
本番環境での運用ポイント:
- 障害検出と対策
- 自動ヘルスチェック
- サーキットブレーカーパターン
- リトライメカニズム
- モニタリングと警告
- メトリクス収集
- ログ管理
- アラート設定
- スケーラビリティ対策
- 水平スケーリング
- ロードバランシング
- セッション管理
これらの設計パターンと運用手法を適切に組み合わせることで、安定した本番環境の運用が可能になります。