【保存版】LaravelのJobクラスで実現する高速な非同期処理 – 実装から運用まで完全ガイド

Laravelのジョブ機能とは何か

重い処理を効率的に扱うためのジョブシステム

Webアプリケーションの開発において、ユーザーの操作に対するレスポンス速度は非常に重要です。しかし、画像処理や大量のメール送信、外部APIとの連携など、処理に時間がかかる機能を実装する必要が出てくることがあります。Laravelのジョブ機能は、このような重い処理を効率的に扱うための仕組みを提供します。

ジョブシステムの主な特徴は以下の通りです:

  • 処理の非同期実行: メインの処理フローから切り離して実行することができます
  • キューによる制御: 処理の順序や優先度を制御できます
  • 再試行機能: 失敗した処理を自動的に再実行できます
  • 分散処理: 複数のサーバーで処理を分散させることができます

例えば、ユーザー登録時にウェルカムメールを送信する処理を考えてみましょう:

// 従来の同期処理の場合
public function register(Request $request)
{
    $user = User::create($request->all());

    // メール送信が完了するまでレスポンスが返されない
    Mail::to($user->email)->send(new WelcomeMail($user));

    return response()->json(['message' => '登録完了']);
}

// ジョブを使用した非同期処理の場合
public function register(Request $request)
{
    $user = User::create($request->all());

    // メール送信をジョブとしてキューに投入
    SendWelcomeMailJob::dispatch($user);

    return response()->json(['message' => '登録完了']);
}

同期処理と非同期処理の違いを理解する

Laravelにおける同期処理と非同期処理の主な違いを理解することは、ジョブ機能を効果的に活用する上で重要です。

同期処理の特徴:

  1. リクエストを受けてから一連の処理が完了するまで、レスポンスが返されません
  2. 処理の実行順序が予測可能で、結果をすぐに取得できます
  3. エラーが発生した場合、即座に検知して対応できます
  4. サーバーリソースの使用効率が低下する可能性があります

非同期処理の特徴:

  1. 重い処理をバックグラウンドで実行し、レスポンスを即座に返すことができます
  2. 処理の実行順序を制御可能で、優先度を設定できます
  3. エラーが発生した場合の再試行機能があります
  4. サーバーリソースを効率的に使用できます

以下は、同期処理と非同期処理の処理時間の違いを示す図です:

【同期処理】
リクエスト受信 → ユーザー作成 → メール送信 → レスポンス返却
|--------------|--------------|--------------|
     0.1秒         0.1秒          2秒        = 合計2.2秒

【非同期処理】
リクエスト受信 → ユーザー作成 → レスポンス返却
|--------------|--------------|
     0.1秒         0.1秒                     = 合計0.2秒
            ↓
        メール送信(バックグラウンド)
        |--------------|
             2秒

なぜLaravelでジョブを使う必要があるのか

Laravelでジョブを使用する主なメリットは以下の通りです:

  1. アプリケーションのレスポンス向上
  • ユーザーの待ち時間を最小限に抑えることができます
  • 重い処理による画面のフリーズを防ぐことができます
  1. システムリソースの効率的な利用
  • CPU負荷の高い処理を分散できます
  • メモリ使用量をコントロールできます
  • データベース接続数を適切に管理できます
  1. スケーラビリティの確保
  • 処理量の増加に応じて、キューワーカーを増やすことができます
  • 複数のサーバーに処理を分散させることができます
  1. 信頼性の向上
   // 再試行回数とタイムアウトを指定できる
   class ProcessVideoJob implements ShouldQueue
   {
       public $tries = 3; // 失敗時に3回まで再試行
       public $timeout = 120; // 120秒でタイムアウト

       public function handle()
       {
           // 動画処理のロジック
       }

       public function failed($exception)
       {
           // 失敗時の処理
           Log::error('動画処理に失敗: ' . $exception->getMessage());
       }
   }

これらの機能により、Laravelのジョブシステムは以下のような場面で特に効果を発揮します:

  • 大量のメール送信処理
  • 画像・動画の処理
  • レポート生成
  • 外部APIとの連携
  • バッチ処理
  • データの集計・分析処理

実際の開発現場では、これらの機能を組み合わせることで、高いパフォーマンスと信頼性を両立したシステムを構築することができます。

Jobクラスの基本的な実装方法

artisanコマンドでJobクラスを生成する

Laravelでは、artisanコマンドを使用して簡単にJobクラスのひな形を生成することができます。基本的な生成コマンドは以下の通りです:

# 基本的なJobクラスの生成
php artisan make:job ProcessPodcast

# キュー投入可能なJobクラスを生成(--queueオプション)
php artisan make:job ProcessPodcast --queue

生成されるJobクラスの基本構造は以下のようになります:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * ジョブのインスタンス生成
     */
    public function __construct()
    {
        //
    }

    /**
     * ジョブの実行
     */
    public function handle()
    {
        //
    }
}

handleメソッドに処理を記述する

handleメソッドは、ジョブが実行される際に呼び出される中心的なメソッドです。具体的な実装例を見ていきましょう:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;
    protected $audioProcessor;

    /**
     * ジョブのインスタンス生成
     */
    public function __construct(Podcast $podcast)
    {
        // コンストラクタで必要なデータを受け取る
        $this->podcast = $podcast;
    }

    /**
     * ジョブの実行
     */
    public function handle(AudioProcessor $audioProcessor)
    {
        try {
            // 音声ファイルの処理
            $processedFile = $audioProcessor->process(
                $this->podcast->audio_file
            );

            // 処理結果の保存
            $this->podcast->update([
                'processed_file' => $processedFile,
                'processed_at' => now(),
                'status' => 'completed'
            ]);

            // ログの記録
            Log::info('Podcast processed successfully', [
                'podcast_id' => $this->podcast->id
            ]);
        } catch (\Exception $e) {
            // エラー処理
            Log::error('Failed to process podcast', [
                'podcast_id' => $this->podcast->id,
                'error' => $e->getMessage()
            ]);

            throw $e;
        }
    }
}

このコードの重要なポイント:

  1. 依存性の注入handleメソッドの引数でAudioProcessorを注入しています
  2. 例外処理:try-catchブロックでエラーをハンドリングしています
  3. ログ記録:処理の進行状況やエラーをログに記録しています
  4. モデルの更新:処理結果をデータベースに保存しています

ジョブをディスパッチする方法

ジョブをキューに投入(ディスパッチ)する方法は複数あります:

// 方法1: dispatchメソッドを使用
ProcessPodcast::dispatch($podcast);

// 方法2: 遅延実行を指定
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));

// 方法3: 特定のキューを指定
ProcessPodcast::dispatch($podcast)->onQueue('processing');

// 方法4: チェーン実行を指定
ProcessPodcast::withChain([
    new OptimizePodcast($podcast),
    new ReleasePodcast($podcast)
])->dispatch($podcast);

// 方法5: コントローラーでの使用例
public function store(Request $request)
{
    $podcast = Podcast::create($request->validated());

    // バリデーション済みのデータを基にジョブをディスパッチ
    ProcessPodcast::dispatch($podcast)
        ->onQueue('processing')
        ->delay(now()->addMinutes(5));

    return response()->json([
        'message' => 'Podcast processing has been queued',
        'podcast_id' => $podcast->id
    ]);
}

ディスパッチ時の主なオプション:

  1. delay(): 実行を遅延させる
   ProcessPodcast::dispatch($podcast)
       ->delay(now()->addHours(2)); // 2時間後に実行
  1. onQueue(): 特定のキューを指定
   ProcessPodcast::dispatch($podcast)
       ->onQueue('high-priority'); // 優先度の高いキューで実行
  1. withChain(): 複数のジョブを連続実行
   ProcessPodcast::withChain([
       new OptimizePodcast($podcast),
       new NotifyUser($podcast->user)
   ])->dispatch($podcast);
  1. afterCommit(): データベーストランザクション後に実行
   ProcessPodcast::dispatch($podcast)
       ->afterCommit(); // トランザクション完了後に実行

これらのオプションを組み合わせることで、柔軟なジョブの実行制御が可能になります。実際の開発では、アプリケーションの要件に応じて適切なオプションを選択してください。

実践的なジョブ処理の設定方法

キューの設定とドライバーの選択

Laravelでは、複数のキュードライバーを利用できます。プロジェクトの要件に応じて適切なドライバーを選択することが重要です。

設定はconfig/queue.phpで行います:

return [
    'default' => env('QUEUE_CONNECTION', 'redis'),

    'connections' => [
        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'retry_after' => 90,
            'after_commit' => false,
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => env('REDIS_QUEUE', 'default'),
            'retry_after' => 90,
            'block_for' => null,
            'after_commit' => false,
        ],

        'sqs' => [
            'driver' => 'sqs',
            'key' => env('AWS_ACCESS_KEY_ID'),
            'secret' => env('AWS_SECRET_ACCESS_KEY'),
            'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
            'queue' => env('SQS_QUEUE', 'default'),
            'suffix' => env('SQS_SUFFIX'),
            'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
            'after_commit' => false,
        ],
    ],
];

主なドライバーの特徴比較:

ドライバー特徴適用場面注意点
Database– 設定が簡単
– 追加のサービス不要
小〜中規模のアプリケーション– スケーラビリティに制限あり
– DBの負荷増加
Redis– 高速
– スケーラブル
中〜大規模のアプリケーション– Redisサーバーの管理が必要
SQS– 高い信頼性
– 無制限のスケール
エンタープライズアプリケーション– コストが発生
– 設定がやや複雑

ジョブの優先度を制御する

ジョブの優先度制御は、効率的なリソース利用のために重要です。Laravelでは複数の方法で優先度を制御できます:

  1. キュー名による優先度付け
// キューの定義(config/queue.php)
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => ['high', 'default', 'low'],
    'retry_after' => 90,
],

// ジョブのディスパッチ
HighPriorityJob::dispatch()->onQueue('high');
RegularJob::dispatch()->onQueue('default');
LowPriorityJob::dispatch()->onQueue('low');

// キューワーカーの起動
php artisan queue:work --queue=high,default,low
  1. ジョブクラスでの優先度設定
class HighPriorityJob implements ShouldQueue
{
    public $queue = 'high';

    // キューの実行順序を制御
    public function middleware()
    {
        return [new WithoutOverlapping('high-priority-job')];
    }
}
  1. 動的な優先度設定
class ProcessOrder implements ShouldQueue
{
    protected $order;

    public function __construct(Order $order)
    {
        $this->order = $order;
    }

    public function handle()
    {
        // 注文金額に応じて優先度を変更
        if ($this->order->total > 100000) {
            $this->onQueue('high');
        }

        // 処理ロジック
    }
}

タイムアウトと再試行の設定

ジョブの信頼性を高めるために、適切なタイムアウトと再試行の設定が重要です:

class ProcessLargeFile implements ShouldQueue
{
    // 再試行回数の設定
    public $tries = 3;

    // タイムアウトの設定(秒)
    public $timeout = 120;

    // 再試行までの待機時間をカスタマイズ
    public function backoff()
    {
        return [60, 120, 180]; // 1分、2分、3分後に再試行
    }

    // 再試行の条件を制御
    public function retryUntil()
    {
        return now()->addHours(1); // 1時間まで再試行を続ける
    }

    // 失敗時の処理
    public function failed(\Throwable $exception)
    {
        Log::error('ファイル処理に失敗', [
            'exception' => $exception->getMessage(),
            'file' => $this->filePath
        ]);

        // 管理者に通知
        Notification::route('mail', 'admin@example.com')
            ->notify(new JobFailedNotification($this, $exception));
    }
}

実装のベストプラクティス:

  1. 段階的な再試行間隔の設定
public function backoff()
{
    // 指数関数的な待機時間の増加
    return [
        10,  // 最初の再試行
        30,  // 2回目の再試行
        60,  // 3回目の再試行
        120, // 4回目の再試行
    ];
}
  1. 条件付きの再試行ロジック
public function failed(\Throwable $exception)
{
    if ($exception instanceof TemporaryException) {
        // 一時的なエラーの場合は再試行
        $this->release(30); // 30秒後に再試行
    } else {
        // 永続的なエラーの場合は失敗として処理
        $this->delete();
        Log::error('永続的なエラーが発生', [
            'exception' => $exception
        ]);
    }
}
  1. リソース管理の最適化
public function handle()
{
    // メモリ制限の設定
    ini_set('memory_limit', '512M');

    // タイムアウトの動的設定
    set_time_limit(300); // 5分

    // 大きなファイルの処理
    $this->processLargeFile();
}

これらの設定を適切に組み合わせることで、信頼性の高いジョブ処理システムを構築できます。システムの規模や要件に応じて、これらの値を調整してください。

エラーハンドリングとログ管理

失敗したジョブを適切に処理する

ジョブの失敗は避けられない問題です。適切なエラーハンドリングを実装することで、システムの信頼性を高めることができます。

  1. 基本的なエラーハンドリング
class ProcessVideoJob implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $video;
    public $tries = 3;

    public function __construct(Video $video)
    {
        $this->video = $video;
    }

    public function handle()
    {
        try {
            // 動画処理ロジック
            $this->processVideo();
        } catch (VideoProcessingException $e) {
            // 動画処理特有のエラー処理
            $this->handleVideoProcessingError($e);
            throw $e;
        } catch (\Exception $e) {
            // その他の予期せぬエラー
            $this->handleUnexpectedError($e);
            throw $e;
        }
    }

    protected function handleVideoProcessingError($e)
    {
        // エラー状態を記録
        $this->video->update([
            'status' => 'processing_failed',
            'error_message' => $e->getMessage()
        ]);

        // 管理者に通知
        Notification::route('slack', config('notifications.slack_webhook'))
            ->notify(new VideoProcessingFailedNotification($this->video, $e));
    }

    protected function handleUnexpectedError($e)
    {
        Log::error('Unexpected error in video processing', [
            'video_id' => $this->video->id,
            'error' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);
    }

    public function failed(\Throwable $exception)
    {
        // 最終的な失敗時の処理
        $this->video->update(['status' => 'permanently_failed']);

        // ユーザーに通知
        $this->video->user->notify(new VideoProcessingFailedNotification($this->video));
    }
}
  1. 条件付きの再試行ロジック
class ImportDataJob implements ShouldQueue
{
    public function handle()
    {
        try {
            $response = Http::timeout(30)->get('external-api.com/data');

            if ($response->failed()) {
                // レスポンスコードに基づく再試行判断
                if ($response->status() >= 500) {
                    // サーバーエラーの場合は再試行
                    $this->release(30); // 30秒後に再試行
                    return;
                }

                if ($response->status() === 429) {
                    // レート制限の場合は待機時間を長めに
                    $this->release(300); // 5分後に再試行
                    return;
                }
            }

            // 正常なレスポンスの処理
            $this->processResponse($response);

        } catch (\Exception $e) {
            $this->handleException($e);
        }
    }
}

ログを活用したデバッグ方法

効果的なログ管理は問題の早期発見と解決に不可欠です:

  1. 構造化ログの実装
class ProcessOrderJob implements ShouldQueue
{
    protected $order;

    public function handle()
    {
        Log::info('Starting order processing', [
            'job_id' => $this->job->getJobId(),
            'order_id' => $this->order->id,
            'customer_id' => $this->order->customer_id,
            'amount' => $this->order->total_amount,
            'items_count' => $this->order->items->count()
        ]);

        try {
            // 処理の各ステップをログ
            Log::debug('Validating order items');
            $this->validateItems();

            Log::debug('Processing payment');
            $this->processPayment();

            Log::debug('Updating inventory');
            $this->updateInventory();

            Log::info('Order processing completed', [
                'order_id' => $this->order->id,
                'processing_time' => $this->job->getTimeElapsed()
            ]);

        } catch (\Exception $e) {
            Log::error('Order processing failed', [
                'order_id' => $this->order->id,
                'error' => $e->getMessage(),
                'trace' => $e->getTraceAsString(),
                'step' => $this->getCurrentStep(),
                'memory_usage' => memory_get_usage(true)
            ]);
            throw $e;
        }
    }
}
  1. カスタムログチャンネルの設定
// config/logging.php
'channels' => [
    'jobs' => [
        'driver' => 'daily',
        'path' => storage_path('logs/jobs.log'),
        'level' => 'debug',
        'days' => 14,
    ],
    'failed_jobs' => [
        'driver' => 'stack',
        'channels' => ['daily', 'slack'],
        'ignore_exceptions' => false,
    ],
],

監視システムとの連携方法

ジョブの実行状況を効果的に監視するための設定:

  1. Horizonによる監視
// config/horizon.php
return [
    'metrics' => [
        'trim_snapshots' => [
            'job' => 24,     // 24時間分のジョブメトリクスを保持
            'queue' => 24,   // 24時間分のキューメトリクスを保持
        ],
    ],

    'monitoring' => [
        'failed' => [
            ProcessOrderJob::class => [
                // 失敗回数が10回を超えたら通知
                'threshold' => 10,
                'notification' => \App\Notifications\FailedJobsNotification::class,
                'recipients' => ['slack'],
            ],
        ],
    ],
];
  1. カスタム監視の実装
class JobMonitoringService
{
    public function trackJobExecution(Job $job, float $startTime)
    {
        $executionTime = microtime(true) - $startTime;

        // メトリクスの記録
        Redis::hincrby("job_metrics:{$job->getName()}", 'total_executions', 1);
        Redis::hincrbyfloat("job_metrics:{$job->getName()}", 'total_time', $executionTime);

        // 実行時間の閾値チェック
        if ($executionTime > config("job_thresholds.{$job->getName()}", 30)) {
            $this->notifySlowJob($job, $executionTime);
        }
    }

    protected function notifySlowJob($job, $executionTime)
    {
        Notification::route('slack', config('notifications.slack_webhook'))
            ->notify(new SlowJobNotification($job, $executionTime));
    }
}
  1. メトリクス収集の自動化
class TrackJobPerformance
{
    public function handle($job, $next)
    {
        $startTime = microtime(true);
        $memoryBefore = memory_get_usage();

        try {
            $result = $next($job);

            // 成功時のメトリクス記録
            $this->recordMetrics($job, $startTime, $memoryBefore, 'success');

            return $result;
        } catch (\Exception $e) {
            // 失敗時のメトリクス記録
            $this->recordMetrics($job, $startTime, $memoryBefore, 'failed');
            throw $e;
        }
    }

    protected function recordMetrics($job, $startTime, $memoryBefore, $status)
    {
        $metrics = [
            'execution_time' => microtime(true) - $startTime,
            'memory_usage' => memory_get_usage() - $memoryBefore,
            'status' => $status,
            'queue' => $job->queue ?? 'default',
            'timestamp' => now(),
        ];

        // メトリクスの保存
        Redis::hset(
            "job_metrics:{$job->getName()}:" . now()->format('Y-m-d'),
            (string) now()->timestamp,
            json_encode($metrics)
        );
    }
}

これらの実装により、ジョブの実行状況を詳細に把握し、問題が発生した際に速やかに対応することが可能になります。

パフォーマンス最適化のベストプラクティス

キューワーカーの適切な設定

キューワーカーの設定は、ジョブ処理のパフォーマンスに直接影響を与えます。以下に、最適な設定方法を説明します。

  1. ワーカープロセスの設定
# 基本的なワーカー起動
php artisan queue:work

# 複数のキューを優先度順に処理
php artisan queue:work --queue=high,default,low

# メモリ制限とタイムアウトの設定
php artisan queue:work --memory=1024 --timeout=60

# 最大試行回数の設定
php artisan queue:work --tries=3
  1. Supervisorの設定例
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/var/log/worker.log
stopwaitsecs=3600
  1. 最適なワーカー数の計算
class WorkerOptimizer
{
    public function calculateOptimalWorkers()
    {
        $cpuCores = $this->getCPUCores();
        $availableMemory = $this->getAvailableMemory();
        $averageJobMemory = $this->getAverageJobMemory();

        // CPU使用率を考慮したワーカー数
        $workersByCPU = $cpuCores * 2;

        // メモリ使用量を考慮したワーカー数
        $workersByMemory = floor($availableMemory / ($averageJobMemory * 1.2));

        // 低い方の値を採用
        return min($workersByCPU, $workersByMemory);
    }

    private function getAverageJobMemory()
    {
        return cache()->remember('avg_job_memory', 3600, function () {
            return DB::table('job_metrics')
                ->where('created_at', '>', now()->subDay())
                ->avg('memory_peak');
        });
    }
}

バッチ処理による効率化

大量のジョブを効率的に処理するためのバッチ処理の実装方法:

  1. バッチジョブの基本実装
class ImportProductsBatch implements ShouldQueue
{
    use Batchable;

    protected $products;
    protected $chunkSize = 1000;

    public function __construct(Collection $products)
    {
        $this->products = $products;
    }

    public function handle()
    {
        // チャンク単位での処理
        $this->products->chunk($this->chunkSize)->each(function ($chunk) {
            DB::transaction(function () use ($chunk) {
                foreach ($chunk as $product) {
                    Product::create($product);
                }
            });

            // メモリ解放
            gc_collect_cycles();
        });
    }
}
  1. Bus Batchの活用
class ProductImportController
{
    public function import(Request $request)
    {
        $products = $this->parseProductsFile($request->file('products'));

        $batch = Bus::batch([])
            ->then(function (Batch $batch) {
                Log::info('All products imported successfully', [
                    'total_jobs' => $batch->totalJobs,
                    'execution_time' => $batch->finishedAt->diffInSeconds($batch->createdAt)
                ]);
            })
            ->catch(function (Batch $batch, \Throwable $e) {
                Log::error('Product import failed', [
                    'failed_jobs' => $batch->failedJobs,
                    'error' => $e->getMessage()
                ]);
            })
            ->finally(function (Batch $batch) {
                // クリーンアップ処理
                Cache::tags(['import'])->flush();
            });

        // チャンク単位でジョブを追加
        $products->chunk(1000)->each(function ($chunk) use ($batch) {
            $batch->add(new ImportProductsBatch($chunk));
        });

        return response()->json([
            'batch_id' => $batch->id,
            'total_jobs' => $batch->totalJobs
        ]);
    }
}

メモリ使用量の最適化手法

メモリリークを防ぎ、効率的なメモリ使用を実現するための手法:

  1. メモリ使用量の監視と最適化
class MemoryOptimizedJob implements ShouldQueue
{
    use InteractsWithQueue, Queueable;

    protected $initialMemory;
    protected $peakMemory;

    public function handle()
    {
        $this->initialMemory = memory_get_usage();

        try {
            // 大量のデータを処理
            $this->processLargeDataSet();

            // メモリ使用量をログに記録
            $this->logMemoryUsage();

        } finally {
            // 明示的なメモリ解放
            $this->cleanupMemory();
        }
    }

    protected function processLargeDataSet()
    {
        // カーソルを使用して大量データを効率的に処理
        User::cursor()->each(function ($user) {
            // ユーザーごとの処理
            $this->processUser($user);

            // 定期的なメモリ解放
            if ($this->shouldCollectGarbage()) {
                $this->collectGarbage();
            }
        });
    }

    protected function shouldCollectGarbage()
    {
        $currentMemory = memory_get_usage();
        return $currentMemory > $this->initialMemory * 1.5;
    }

    protected function collectGarbage()
    {
        gc_collect_cycles();
        $this->peakMemory = max($this->peakMemory, memory_get_peak_usage());
    }

    protected function logMemoryUsage()
    {
        Log::info('Job memory usage', [
            'initial_memory' => $this->formatMemory($this->initialMemory),
            'peak_memory' => $this->formatMemory($this->peakMemory),
            'final_memory' => $this->formatMemory(memory_get_usage())
        ]);
    }

    protected function formatMemory($bytes)
    {
        return round($bytes / 1024 / 1024, 2) . 'MB';
    }
}
  1. メモリ効率の良いデータ処理
class ChunkedDataProcessor
{
    public function processLargeFile($filePath)
    {
        // ファイルを少しずつ読み込む
        $handle = fopen($filePath, 'r');
        $buffer = [];
        $count = 0;

        while (!feof($handle)) {
            $line = fgets($handle);
            $buffer[] = $this->processLine($line);
            $count++;

            // バッファが一定サイズに達したら処理
            if ($count >= 1000) {
                $this->processBatch($buffer);
                $buffer = [];
                $count = 0;
                gc_collect_cycles();
            }
        }

        // 残りのバッファを処理
        if (!empty($buffer)) {
            $this->processBatch($buffer);
        }

        fclose($handle);
    }

    protected function processBatch(array $batch)
    {
        // バッチ処理のロジック
        dispatch(new ProcessDataBatchJob($batch));
    }
}
  1. リソース管理のベストプラクティス
class ResourceEfficientJob implements ShouldQueue
{
    public function handle()
    {
        // メモリ制限の設定
        ini_set('memory_limit', '512M');

        // データベース接続の最適化
        DB::connection()->unsetEventDispatcher();

        // クエリログの無効化(本番環境)
        if (app()->environment('production')) {
            DB::disableQueryLog();
        }

        // 大きなコレクションの効率的な処理
        $this->processLargeCollection();
    }

    protected function processLargeCollection()
    {
        $query = LargeModel::query();

        // チャンク処理でメモリ使用を抑制
        $query->chunk(1000, function ($records) {
            foreach ($records as $record) {
                // レコードの処理
                $this->processRecord($record);
            }

            // チャンク処理後のクリーンアップ
            DB::connection()->disconnect();
            gc_collect_cycles();
        });
    }
}

これらの最適化手法を適切に組み合わせることで、効率的で安定したジョブ処理システムを構築できます。

本番環境での運用とスケーリング

Supervisorを使用したプロセス管理

本番環境でのジョブワーカーを安定して運用するために、Supervisorの適切な設定が重要です。

  1. Supervisorの基本設定
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work redis --sleep=3 --tries=3 --timeout=300
autostart=true
autorestart=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/var/log/supervisor/worker.log
stopwaitsecs=300

# 複数キューの設定例
[program:high-priority-worker]

process_name=%(program_name)s_%(process_num)02d command=php /var/www/html/artisan queue:work redis –queue=high –sleep=3 numprocs=4

[program:default-worker]

process_name=%(program_name)s_%(process_num)02d command=php /var/www/html/artisan queue:work redis –queue=default –sleep=3 numprocs=6

[program:low-priority-worker]

process_name=%(program_name)s_%(process_num)02d command=php /var/www/html/artisan queue:work redis –queue=low –sleep=3 numprocs=2

  1. モニタリングスクリプトの実装
class SupervisorMonitor
{
    public function checkWorkers()
    {
        $output = shell_exec('supervisorctl status');
        $workers = $this->parseWorkerStatus($output);

        foreach ($workers as $worker) {
            if ($worker['status'] !== 'RUNNING') {
                $this->notifyAdministrator($worker);
            }

            // メモリ使用量のチェック
            $this->checkWorkerMemory($worker['pid']);
        }
    }

    protected function checkWorkerMemory($pid)
    {
        $memory = shell_exec("ps -o rss= -p {$pid}");
        $memoryMB = intval($memory) / 1024;

        if ($memoryMB > 256) { // 256MB以上使用している場合
            Log::warning("Worker memory usage high", [
                'pid' => $pid,
                'memory_usage' => $memoryMB . 'MB'
            ]);
        }
    }
}

水平スケーリングの実現方法

システムの負荷に応じて処理能力を柔軟に調整する方法を解説します。

  1. 動的なワーカー管理
class QueueScaler
{
    protected $redis;
    protected $supervisor;

    public function __construct()
    {
        $this->redis = Redis::connection();
        $this->supervisor = new SupervisorClient();
    }

    public function adjustWorkers()
    {
        $queueSize = $this->getQueueSize();
        $currentWorkers = $this->getCurrentWorkers();

        // キューサイズに基づいてワーカー数を調整
        $optimalWorkers = $this->calculateOptimalWorkers($queueSize);

        if ($optimalWorkers !== $currentWorkers) {
            $this->updateWorkerCount($optimalWorkers);
        }
    }

    protected function calculateOptimalWorkers($queueSize)
    {
        // 基本的な計算ロジック
        $baseWorkers = 4;
        $workersPerThousandJobs = 2;

        $optimal = $baseWorkers + floor($queueSize / 1000) * $workersPerThousandJobs;

        // 上限と下限の設定
        return min(max($optimal, 2), 16);
    }

    protected function updateWorkerCount($count)
    {
        $command = "supervisorctl update && ";
        $command .= "supervisorctl start laravel-worker:* && ";
        $command .= "supervisorctl status";

        exec($command, $output);

        Log::info('Worker count updated', [
            'new_count' => $count,
            'supervisor_output' => $output
        ]);
    }
}
  1. 負荷分散の設定
// config/horizon.php
return [
    'environments' => [
        'production' => [
            'supervisor-1' => [
                'connection' => 'redis',
                'queue' => ['high', 'default'],
                'balance' => 'simple',
                'processes' => 10,
                'tries' => 3,
            ],
            'supervisor-2' => [
                'connection' => 'redis',
                'queue' => ['low'],
                'balance' => 'auto',
                'processes' => 5,
                'tries' => 1,
            ],
        ],
    ],
];

障害発生時の対応手順

システムの安定性を維持するための障害対応プロセスを説明します。

  1. 障害検知と自動復旧
class JobFailureHandler
{
    public function handle($event)
    {
        $job = $event->job;
        $exception = $event->exception;

        // 障害情報の記録
        $this->logFailure($job, $exception);

        // 重大度の判定
        if ($this->isCriticalFailure($exception)) {
            $this->handleCriticalFailure($job, $exception);
        } else {
            $this->handleNormalFailure($job, $exception);
        }
    }

    protected function handleCriticalFailure($job, $exception)
    {
        // 管理者への通知
        Notification::route('slack', config('notifications.slack_webhook'))
            ->notify(new CriticalJobFailureNotification($job, $exception));

        // ワーカーの再起動が必要か判断
        if ($this->shouldRestartWorker($exception)) {
            $this->restartWorker($job->getConnectionName());
        }

        // 関連ジョブの一時停止
        $this->pauseRelatedJobs($job);
    }

    protected function pauseRelatedJobs($job)
    {
        // 同じタイプのジョブを一時的に停止
        Redis::set('pause_jobs:' . $job->getConnectionName(), true, 'EX', 300);
    }
}
  1. 復旧手順の自動化
class RecoveryManager
{
    public function executeRecoveryProcedure()
    {
        // 1. 失敗したジョブの分析
        $failedJobs = $this->analyzeFailedJobs();

        // 2. リカバリー可能なジョブの選別
        $recoverableJobs = $this->filterRecoverableJobs($failedJobs);

        // 3. ジョブの再実行
        foreach ($recoverableJobs as $job) {
            $this->retryJob($job);
        }

        // 4. システム状態の確認
        $this->verifySystemState();
    }

    protected function retryJob($job)
    {
        try {
            Artisan::call('queue:retry', ['id' => $job->id]);

            Log::info('Job recovered successfully', [
                'job_id' => $job->id,
                'queue' => $job->queue
            ]);

        } catch (\Exception $e) {
            Log::error('Recovery failed', [
                'job_id' => $job->id,
                'error' => $e->getMessage()
            ]);
        }
    }

    protected function verifySystemState()
    {
        // キューの健全性チェック
        $queueHealth = $this->checkQueueHealth();

        // ワーカーの状態確認
        $workerHealth = $this->checkWorkerHealth();

        // メモリ使用量の確認
        $memoryHealth = $this->checkMemoryUsage();

        if (!$queueHealth || !$workerHealth || !$memoryHealth) {
            $this->notifySystemUnhealthy([
                'queue_health' => $queueHealth,
                'worker_health' => $workerHealth,
                'memory_health' => $memoryHealth
            ]);
        }
    }
}

これらの実装により、本番環境での安定した運用とスケーラビリティを確保することができます。システムの規模や要件に応じて、これらの設定を適切にカスタマイズしてください。