Laravel Upsertとは:基礎から実践まで
データベース操作の新標準:Upsertの仕組みと特徴
LaravelのUpsertは、既存レコードの更新と新規レコードの挿入を1回のクエリで実行できる強力な機能です。この機能により、複雑なデータベース操作を簡潔に実装できます。
基本的な構文は以下の通りです:
Model::upsert([ ['id' => 1, 'name' => 'John', 'email' => 'john@example.com'], ['id' => 2, 'name' => 'Jane', 'email' => 'jane@example.com'] ], ['email'], ['name']);
上記のコードでは:
- 第1引数:更新または挿入するデータの配列
- 第2引数:ユニーク判定に使用するカラム(この例では’email’)
- 第3引数:更新対象のカラム(この例では’name’)
Upsertが提供する主なメリットは以下の通りです:
- 処理の自動化
- 存在チェックと更新/挿入の判断を自動で行う
- 開発者が条件分岐を書く必要がない
- パフォーマンスの向上
- 単一クエリでの処理により、データベースアクセスを最小化
- 大量データ処理時の実行速度が向上
- データの整合性確保
- トランザクション単位での処理
- レースコンディションのリスクを軽減
従来の実装方法との比較:なぜUpsertが必要なのか
従来のアプローチでは、以下のような実装が一般的でした:
// 従来の方法 foreach ($records as $record) { $model = Model::firstWhere('email', $record['email']); if ($model) { $model->update([ 'name' => $record['name'] ]); } else { Model::create($record); } }
この実装方法には以下の問題点があります:
- パフォーマンスの問題
- レコードごとに個別のSQLクエリが発行される
- データ量に比例して処理時間が増加
- データベース接続のオーバーヘッドが大きい
- 信頼性の問題
- 複数のプロセスが同時に実行された場合のデータ整合性が保証されない
- エラー発生時のロールバック処理が複雑
- 実装の煩雑さ
- 条件分岐による可読性の低下
- エラーハンドリングのコードが複雑化
- メンテナンスコストの増加
これに対し、Upsertを使用した実装では:
try { Model::upsert($records, ['email'], ['name']); } catch (\Exception $e) { Log::error('Upsert failed: ' . $e->getMessage()); throw $e; }
以下のような改善が得られます:
- コードの簡潔化
- 条件分岐が不要
- エラーハンドリングが簡素化
- 保守性の向上
- 処理の効率化
- 単一のSQLクエリで完結
- バルク処理による高速化
- サーバーリソースの効率的な利用
- 信頼性の向上
- トランザクションの自動管理
- データ整合性の確保
- エラー時の一貫性保持
Upsertは特に以下のようなユースケースで効果を発揮します:
- APIを介したデータ同期処理
- バッチ処理による大量データの更新
- マスターデータのインポート処理
- 定期的なデータ更新処理
次のセクションでは、これらの基礎知識を踏まえた上で、具体的な実装手順について解説していきます。
Upsertの基本実装手順
モデルでのupsertメソッドの使い方
Eloquentモデルでupsertメソッドを実装する際の基本的な手順を説明します。具体的な実装例として、ECサイトの商品管理システムを想定したコードを見ていきましょう。
// app/Models/Product.php class Product extends Model { protected $fillable = [ 'sku', // 商品コード 'name', // 商品名 'price', // 価格 'stock', // 在庫数 'category_id' // カテゴリID ]; protected $casts = [ 'price' => 'integer', 'stock' => 'integer', ]; }
upsertメソッドの基本的な使用方法は以下の通りです:
public function updateProductInventory(array $products) { return Product::upsert( $products, // 更新・挿入するデータの配列 ['sku'], // ユニーク判定に使用するカラム ['price', 'stock'] // 更新対象のカラム ); } // 使用例 $inventoryData = [ [ 'sku' => 'PROD-001', 'name' => 'テスト商品A', 'price' => 1000, 'stock' => 100 ], [ 'sku' => 'PROD-002', 'name' => 'テスト商品B', 'price' => 2000, 'stock' => 50 ] ]; $this->updateProductInventory($inventoryData);
uniqueBy認証の正しい設定方法
uniqueBy認証は、レコードの一意性を保証する重要な要素です。以下のパターンに応じた実装方法を解説します:
- 単一カラムでの認証
// マイグレーションでの設定 public function up() { Schema::create('products', function (Blueprint $table) { $table->id(); $table->string('sku')->unique(); // 単一カラムのユニーク制約 $table->string('name'); $table->integer('price'); $table->integer('stock'); $table->timestamps(); }); } // upsertでの使用 Product::upsert($products, ['sku'], ['price', 'stock']);
- 複合キーでの認証
// マイグレーションでの設定 public function up() { Schema::create('products', function (Blueprint $table) { $table->id(); $table->string('sku'); $table->unsignedBigInteger('shop_id'); $table->string('name'); $table->integer('price'); $table->integer('stock'); $table->timestamps(); // 複合ユニーク制約 $table->unique(['sku', 'shop_id']); }); } // upsertでの使用 Product::upsert($products, ['sku', 'shop_id'], ['price', 'stock']);
更新対象カラムの指定テクニック
更新対象カラムの指定は、データの整合性とパフォーマンスに直接影響します。以下のような実装パターンを活用できます:
- 動的な更新カラムの指定
class ProductController extends Controller { public function bulkUpdate(Request $request) { $updateColumns = collect(['name', 'price', 'stock']) ->filter(fn($column) => $request->has("update_{$column}")) ->toArray(); if (empty($updateColumns)) { throw new \InvalidArgumentException('更新対象のカラムが指定されていません。'); } return Product::upsert( $request->products, ['sku'], $updateColumns ); } }
- タイムスタンプの自動管理
class ProductService { public function bulkUpsertWithTimestamps(array $products) { $now = now(); $productsWithTimestamps = array_map(function ($product) use ($now) { return array_merge($product, [ 'updated_at' => $now, 'created_at' => $now ]); }, $products); return Product::upsert( $productsWithTimestamps, ['sku'], ['name', 'price', 'stock', 'updated_at'] ); } }
実装時の重要なポイント:
- データ検証
class ProductValidator { public static function validateForUpsert(array $products): bool { $rules = [ '*.sku' => 'required|string|max:50', '*.name' => 'required|string|max:255', '*.price' => 'required|integer|min:0', '*.stock' => 'required|integer|min:0' ]; $validator = Validator::make($products, $rules); if ($validator->fails()) { throw new ValidationException($validator); } return true; } }
- エラーハンドリング
try { DB::beginTransaction(); // データ検証 ProductValidator::validateForUpsert($products); // upsert実行 Product::upsert($products, ['sku'], ['price', 'stock']); DB::commit(); } catch (ValidationException $e) { DB::rollBack(); Log::warning('Validation failed: ' . $e->getMessage()); throw $e; } catch (\Exception $e) { DB::rollBack(); Log::error('Upsert failed: ' . $e->getMessage()); throw $e; }
これらの基本実装を理解した上で、次のセクションでは実践的な活用方法について解説していきます。
実践的なUpsert活用手順
大量データ一括処理での効率的な実装方法
大規模なデータ処理では、メモリ使用量とパフォーマンスの最適化が重要です。以下の実装例では、これらの課題に対処する方法を示します。
class BulkUpsertService { private const CHUNK_SIZE = 1000; private const RETRY_COUNT = 3; public function processBulkData(array $records) { $processedCount = 0; $failedChunks = []; // チャンク単位での処理 foreach (array_chunk($records, self::CHUNK_SIZE) as $index => $chunk) { try { DB::beginTransaction(); // チャンク処理の実行 $this->processChunk($chunk); $processedCount += count($chunk); DB::commit(); // 進捗ログ Log::info("Processed chunk {$index}: {$processedCount} records completed"); } catch (\Exception $e) { DB::rollBack(); $failedChunks[] = [ 'chunk_index' => $index, 'error' => $e->getMessage() ]; Log::error("Chunk {$index} failed: " . $e->getMessage()); } } // 失敗したチャンクの再処理 if (!empty($failedChunks)) { $this->retryFailedChunks($failedChunks, $records); } return [ 'processed' => $processedCount, 'failed' => count($failedChunks) ]; } private function processChunk(array $chunk) { // メモリ使用量の最適化 $preparedData = collect($chunk) ->map(fn($record) => $this->prepareRecord($record)) ->toArray(); Product::upsert( $preparedData, ['sku'], ['name', 'price', 'stock', 'updated_at'] ); } private function retryFailedChunks(array $failedChunks, array $originalRecords) { foreach ($failedChunks as $failure) { $chunkStart = $failure['chunk_index'] * self::CHUNK_SIZE; $chunk = array_slice( $originalRecords, $chunkStart, self::CHUNK_SIZE ); // リトライ処理 $retryCount = 0; while ($retryCount < self::RETRY_COUNT) { try { DB::beginTransaction(); $this->processChunk($chunk); DB::commit(); break; } catch (\Exception $e) { DB::rollBack(); $retryCount++; if ($retryCount === self::RETRY_COUNT) { Log::error("Final retry failed for chunk {$failure['chunk_index']}"); event(new ChunkProcessingFailedEvent($chunk, $e)); } sleep(1); // リトライ間隔 } } } } }
リレーション付きデータの統合性を確立するUpsert手法
関連テーブル間のデータ整合性を保ちながらUpsertを実行する方法を示します。
class RelationalUpsertService { public function upsertProductWithCategories(array $productsData) { DB::beginTransaction(); try { // カテゴリデータの抽出と更新 $categories = collect($productsData) ->pluck('category') ->unique() ->map(fn($name) => ['name' => $name]) ->toArray(); // カテゴリのupsert Category::upsert($categories, ['name'], []); // カテゴリIDのマッピング取得 $categoryMap = Category::whereIn('name', array_column($categories, 'name')) ->pluck('id', 'name') ->toArray(); // 商品データの整形 $products = collect($productsData)->map(function ($item) use ($categoryMap) { return [ 'sku' => $item['sku'], 'name' => $item['name'], 'price' => $item['price'], 'stock' => $item['stock'], 'category_id' => $categoryMap[$item['category']] ?? null, 'updated_at' => now() ]; })->toArray(); // 商品のupsert Product::upsert( $products, ['sku'], ['name', 'price', 'stock', 'category_id', 'updated_at'] ); DB::commit(); return true; } catch (\Exception $e) { DB::rollBack(); Log::error('Relational upsert failed: ' . $e->getMessage()); throw $e; } } }
APIからのデータ同期における活用例
外部APIとのデータ同期を効率的に行うための実装例を示します。
class ApiSyncService { private $api; private $lastSyncKey = 'product_sync_timestamp'; public function __construct(ExternalApiClient $api) { $this->api = $api; } public function syncProducts() { $lastSync = Cache::get($this->lastSyncKey); $syncStart = now(); try { // 差分データの取得 $changes = $this->api->getProductChanges($lastSync); if (empty($changes)) { Log::info('No changes found since last sync'); return true; } // データの正規化 $normalizedData = $this->normalizeApiData($changes); // バッチ処理でのupsert $result = $this->processBatchUpsert($normalizedData); // 同期タイムスタンプの更新 Cache::put($this->lastSyncKey, $syncStart); event(new ProductSyncCompletedEvent($result)); return $result; } catch (ApiException $e) { Log::error('API sync failed: ' . $e->getMessage()); event(new ProductSyncFailedEvent($e)); throw $e; } } private function normalizeApiData(array $apiData): array { return collect($apiData)->map(function ($item) { return [ 'sku' => $item['product_code'], 'name' => $item['product_name'], 'price' => (int) $item['price'], 'stock' => (int) $item['inventory_count'], 'external_id' => $item['id'], 'synced_at' => now() ]; })->toArray(); } private function processBatchUpsert(array $data) { $result = [ 'total' => count($data), 'processed' => 0, 'failed' => 0 ]; foreach (array_chunk($data, 500) as $chunk) { try { DB::beginTransaction(); Product::upsert( $chunk, ['external_id'], ['sku', 'name', 'price', 'stock', 'synced_at'] ); DB::commit(); $result['processed'] += count($chunk); } catch (\Exception $e) { DB::rollBack(); $result['failed'] += count($chunk); Log::error('Batch processing failed: ' . $e->getMessage()); } } return $result; } }
これらの実装における重要なポイント:
- メモリ管理
- チャンク処理による大量データの効率的な処理
- 不要なデータの解放
- エラーハンドリング
- トランザクション管理
- リトライ機構の実装
- 適切なログ記録
- パフォーマンス最適化
- バッチサイズの適切な設定
- インデックスの活用
- キャッシュの利用
- データ整合性
- リレーションの適切な処理
- トランザクション境界の明確化
- バリデーションの実装
次のセクションでは、これらの実装をさらに最適化するためのパフォーマンスチューニングについて解説します。
Upsertのパフォーマンス最適化
インデックス設計によるUpsert処理の高速化
Upsert処理のパフォーマンスは、適切なインデックス設計に大きく依存します。以下に、効率的なインデックス設計とその実装方法を示します。
// マイグレーションでの最適なインデックス設計 public function up() { Schema::create('products', function (Blueprint $table) { $table->id(); $table->string('sku', 100); $table->string('name'); $table->integer('price'); $table->integer('stock'); $table->unsignedBigInteger('category_id'); $table->timestamps(); // 主要な検索・結合条件にインデックスを作成 $table->index('sku'); $table->index('category_id'); // 複合インデックスの作成 $table->index(['sku', 'category_id']); }); } // インデックスを活用したクエリの例 class ProductRepository { public function bulkUpsertWithIndex(array $products) { // インデックスを効果的に使用するためのソート $sortedProducts = collect($products) ->sortBy('sku') // インデックス順でソート ->values() ->toArray(); return Product::upsert( $sortedProducts, ['sku'], ['name', 'price', 'stock'] ); } }
トランザクション管理のベストプラクティス
大規模なUpsert処理では、適切なトランザクション管理が重要です。
class OptimizedUpsertService { private const CHUNK_SIZE = 1000; public function performOptimizedUpsert(array $records) { // デッドロック対策のためのロック取得 $lockKey = 'product_upsert_lock'; return Cache::lock($lockKey, 10)->block(5, function () use ($records) { DB::beginTransaction(); try { foreach (array_chunk($records, self::CHUNK_SIZE) as $chunk) { // メモリ使用量の監視 $memoryUsage = memory_get_usage(true); if ($memoryUsage > 100 * 1024 * 1024) { // 100MB以上の場合 gc_collect_cycles(); // 明示的なガベージコレクション } Product::upsert($chunk, ['sku'], [ 'name', 'price', 'stock', 'updated_at' ]); } DB::commit(); return true; } catch (\Exception $e) { DB::rollBack(); Log::error('Optimized upsert failed: ' . $e->getMessage()); throw $e; } }); } }
メモリ使用量を抑えた大規模データ処理手法
大規模データ処理時のメモリ管理と最適化テクニックを示します。
class MemoryOptimizedUpsertService { private const CHUNK_SIZE = 1000; private $progressBar; public function processLargeDataset(string $filepath) { $generator = $this->createDataGenerator($filepath); $totalProcessed = 0; foreach ($generator as $chunk) { $result = $this->processChunkWithMemoryOptimization($chunk); $totalProcessed += $result['processed']; // メモリ使用状況のログ記録 $this->logMemoryUsage(); } return $totalProcessed; } private function createDataGenerator(string $filepath): \Generator { $handle = fopen($filepath, 'r'); $headers = fgetcsv($handle); $chunk = []; while (($data = fgetcsv($handle)) !== false) { $chunk[] = array_combine($headers, $data); if (count($chunk) >= self::CHUNK_SIZE) { yield $chunk; $chunk = []; } } if (!empty($chunk)) { yield $chunk; } fclose($handle); } private function processChunkWithMemoryOptimization(array $chunk): array { $result = [ 'processed' => 0, 'failed' => 0 ]; try { DB::beginTransaction(); // データの正規化と不要なフィールドの削除 $optimizedChunk = collect($chunk) ->map(function ($record) { return array_intersect_key($record, array_flip([ 'sku', 'name', 'price', 'stock' ])); }) ->toArray(); Product::upsert( $optimizedChunk, ['sku'], ['name', 'price', 'stock'] ); DB::commit(); $result['processed'] = count($optimizedChunk); } catch (\Exception $e) { DB::rollBack(); $result['failed'] = count($chunk); Log::error('Chunk processing failed: ' . $e->getMessage()); } // 明示的なメモリ解放 unset($optimizedChunk); unset($chunk); return $result; } private function logMemoryUsage(): void { $memoryUsage = memory_get_usage(true); $peakMemory = memory_get_peak_usage(true); Log::info("Memory Usage: " . $this->formatBytes($memoryUsage) . ", Peak: " . $this->formatBytes($peakMemory)); } private function formatBytes($bytes): string { $units = ['B', 'KB', 'MB', 'GB']; $bytes = max($bytes, 0); $pow = floor(($bytes ? log($bytes) : 0) / log(1024)); $pow = min($pow, count($units) - 1); return round($bytes / (1024 ** $pow), 2) . ' ' . $units[$pow]; } }
パフォーマンス最適化における重要なポイント:
- インデックス最適化
- 適切なインデックス設計
- 複合インデックスの効果的な活用
- インデックスメンテナンスの考慮
- メモリ管理
- ジェネレータを使用した段階的なデータ読み込み
- 不要なデータの明示的な解放
- メモリ使用量のモニタリング
- トランザクション制御
- 適切なチャンクサイズの設定
- デッドロック対策
- ロック機構の活用
- 監視とロギング
- パフォーマンスメトリクスの記録
- メモリ使用状況の追跡
- エラー発生時の詳細なログ記録
これらの最適化テクニックを適切に組み合わせることで、大規模なUpsert処理でも安定したパフォーマンスを実現できます。
エラーハンドリングとデバッグ
よくあるUpsertエラーとその解決方法
Upsert処理で発生しがちなエラーとその対処法を具体的に解説します。
class UpsertErrorHandler { public function handleUpsert(array $records) { try { return $this->executeUpsert($records); } catch (QueryException $e) { // SQLエラーの詳細な解析 return $this->handleQueryException($e); } catch (ValidationException $e) { // バリデーションエラーの処理 return $this->handleValidationError($e); } catch (\Exception $e) { // その他の予期せぬエラー return $this->handleUnexpectedError($e); } } private function handleQueryException(QueryException $e) { $errorCode = $e->errorInfo[1]; switch ($errorCode) { case 1062: // Duplicate entry Log::warning('Duplicate key violation: ' . $e->getMessage()); throw new DuplicateKeyException( 'レコードが重複しています。一意制約を確認してください。', $e ); case 1452: // Foreign key constraint Log::error('Foreign key constraint violation: ' . $e->getMessage()); throw new ForeignKeyException( '関連するレコードが存在しません。', $e ); default: Log::error('Database error occurred: ' . $e->getMessage()); throw new DatabaseException( 'データベースエラーが発生しました。', $e ); } } private function handleValidationError(ValidationException $e) { $errors = collect($e->errors()) ->map(function ($messages, $field) { return [ 'field' => $field, 'messages' => $messages ]; }) ->values() ->toArray(); Log::warning('Validation failed: ' . json_encode($errors)); return [ 'status' => 'error', 'type' => 'validation', 'errors' => $errors ]; } }
デバッグに使えるログ出力設定
効果的なデバッグのためのログ設定とモニタリング方法を示します。
class UpsertLogger { private const LOG_CHANNEL = 'upsert'; public function __construct() { // カスタムログチャンネルの設定 config(['logging.channels.' . self::LOG_CHANNEL => [ 'driver' => 'daily', 'path' => storage_path('logs/upsert.log'), 'level' => 'debug', 'days' => 14, ]]); } public function logUpsertOperation(array $data, string $operation) { $context = [ 'operation' => $operation, 'record_count' => count($data), 'memory_usage' => $this->getMemoryUsage(), 'timestamp' => now()->toIso8601String() ]; Log::channel(self::LOG_CHANNEL)->info( 'Upsert operation executed', $context ); } public function logPerformanceMetrics(float $startTime, int $processedCount) { $endTime = microtime(true); $duration = $endTime - $startTime; $metrics = [ 'duration_seconds' => round($duration, 3), 'records_per_second' => round($processedCount / $duration, 2), 'total_records' => $processedCount, 'peak_memory' => $this->getMemoryUsage(true) ]; Log::channel(self::LOG_CHANNEL)->info( 'Performance metrics', $metrics ); } private function getMemoryUsage(bool $peak = false): string { $bytes = $peak ? memory_get_peak_usage(true) : memory_get_usage(true); return round($bytes / 1024 / 1024, 2) . 'MB'; } }
テストコードによる安定性の確保
Upsert処理の信頼性を高めるためのテスト実装例を示します。
class ProductUpsertTest extends TestCase { use RefreshDatabase; private $service; private $logger; protected function setUp(): void { parent::setUp(); $this->service = new ProductUpsertService(); $this->logger = new UpsertLogger(); } /** @test */ public function it_successfully_upserts_new_records() { // テストデータの準備 $products = [ [ 'sku' => 'TEST-001', 'name' => 'Test Product 1', 'price' => 1000 ], [ 'sku' => 'TEST-002', 'name' => 'Test Product 2', 'price' => 2000 ] ]; // Upsert実行 $result = $this->service->upsert($products); // アサーション $this->assertTrue($result); $this->assertDatabaseHas('products', [ 'sku' => 'TEST-001', 'name' => 'Test Product 1' ]); } /** @test */ public function it_handles_duplicate_records_correctly() { // 既存レコードの作成 Product::create([ 'sku' => 'TEST-001', 'name' => 'Original Name', 'price' => 1000 ]); // 更新データの準備 $updateData = [ [ 'sku' => 'TEST-001', 'name' => 'Updated Name', 'price' => 1500 ] ]; // Upsert実行 $this->service->upsert($updateData); // 更新の確認 $this->assertDatabaseHas('products', [ 'sku' => 'TEST-001', 'name' => 'Updated Name', 'price' => 1500 ]); } /** @test */ public function it_handles_validation_errors() { // 不正なデータの準備 $invalidProducts = [ [ 'sku' => '', // 必須項目の欠落 'name' => 'Test Product', 'price' => -100 // 不正な価格 ] ]; // 例外の発生を確認 $this->expectException(ValidationException::class); $this->service->upsert($invalidProducts); } /** @test */ public function it_processes_large_datasets_efficiently() { // 大量データの生成 $largeDataset = $this->generateLargeDataset(1000); // パフォーマンス計測 $startTime = microtime(true); $result = $this->service->upsert($largeDataset); $endTime = microtime(true); // 実行時間の検証 $duration = $endTime - $startTime; $this->assertLessThan( 10.0, // 10秒以内に完了すること $duration, 'Large dataset processing took too long' ); } private function generateLargeDataset(int $count): array { return array_map(function ($i) { return [ 'sku' => "BULK-" . str_pad($i, 6, '0', STR_PAD_LEFT), 'name' => "Bulk Product {$i}", 'price' => rand(100, 10000) ]; }, range(1, $count)); } }
デバッグとテストにおける重要なポイント:
- エラーハンドリング
- 具体的なエラーメッセージ
- エラータイプの適切な分類
- 例外の階層的な処理
- ログ記録
- 構造化されたログ形式
- パフォーマンスメトリクスの記録
- エラーコンテキストの保持
- テストカバレッジ
- 基本的なCRUD操作のテスト
- エッジケースの検証
- パフォーマンステスト
- デバッグ支援
- 詳細なエラー情報の提供
- トレース情報の記録
- 開発環境での詳細ログ
発展的なUpsert活用テクニック
条件付きUpsertの実装方法
特定の条件に基づいてUpsert処理を制御する高度な実装例を示します。
class ConditionalUpsertService { public function upsertWithConditions(array $records, array $conditions) { return DB::transaction(function () use ($records, $conditions) { $processingResults = []; foreach ($records as $record) { if ($this->shouldProcess($record, $conditions)) { $result = $this->processRecord($record); $processingResults[] = $result; } } return $processingResults; }); } private function shouldProcess(array $record, array $conditions): bool { // 価格変更率のチェック if (isset($conditions['max_price_change'])) { $existingProduct = Product::where('sku', $record['sku'])->first(); if ($existingProduct) { $priceChange = abs( ($record['price'] - $existingProduct->price) / $existingProduct->price * 100 ); if ($priceChange > $conditions['max_price_change']) { Log::warning("Price change exceeds threshold for SKU: {$record['sku']}"); return false; } } } // 在庫数の妥当性チェック if (isset($conditions['min_stock'])) { if ($record['stock'] < $conditions['min_stock']) { Log::warning("Stock below minimum threshold for SKU: {$record['sku']}"); return false; } } return true; } private function processRecord(array $record): array { try { Product::upsert( [$record], ['sku'], ['name', 'price', 'stock'] ); return [ 'sku' => $record['sku'], 'status' => 'success', 'message' => 'Record processed successfully' ]; } catch (\Exception $e) { return [ 'sku' => $record['sku'], 'status' => 'error', 'message' => $e->getMessage() ]; } } } // 使用例 $service = new ConditionalUpsertService(); $conditions = [ 'max_price_change' => 20, // 最大20%の価格変更まで許容 'min_stock' => 5 // 最小在庫数5以上が必要 ]; $results = $service->upsertWithConditions($products, $conditions);
キュー投入による非同期Upsert処理
大規模なUpsert処理を非同期で実行する実装例を示します。
// Jobクラスの定義 class ProcessUpsertJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; private $records; private $batchId; private $notifyEmail; public function __construct(array $records, string $batchId, ?string $notifyEmail) { $this->records = $records; $this->batchId = $batchId; $this->notifyEmail = $notifyEmail; } public function handle() { $processor = new BatchUpsertProcessor(); try { $result = $processor->process($this->records); // 処理結果の保存 UpsertBatch::create([ 'batch_id' => $this->batchId, 'processed_count' => $result['processed'], 'failed_count' => $result['failed'], 'status' => 'completed' ]); // 完了通知 if ($this->notifyEmail) { Mail::to($this->notifyEmail)->send( new UpsertCompletedMail($result) ); } } catch (\Exception $e) { // エラー情報の記録 UpsertBatch::create([ 'batch_id' => $this->batchId, 'status' => 'failed', 'error_message' => $e->getMessage() ]); throw $e; } } public function failed(\Throwable $exception) { Log::error("Upsert job failed: {$exception->getMessage()}"); if ($this->notifyEmail) { Mail::to($this->notifyEmail)->send( new UpsertFailedMail($this->batchId, $exception) ); } } } // キュー処理の実装 class QueuedUpsertService { private const CHUNK_SIZE = 1000; public function queueUpsertJobs(array $records, string $notifyEmail = null) { $batchId = Str::uuid()->toString(); // チャンク単位でジョブを作成 foreach (array_chunk($records, self::CHUNK_SIZE) as $chunk) { ProcessUpsertJob::dispatch($chunk, $batchId, $notifyEmail) ->onQueue('upserts') ->delay(now()->addSeconds(rand(1, 30))); // 負荷分散 } return $batchId; } public function checkBatchStatus(string $batchId): array { $batches = UpsertBatch::where('batch_id', $batchId)->get(); return [ 'total_processed' => $batches->sum('processed_count'), 'total_failed' => $batches->sum('failed_count'), 'completed_chunks' => $batches->where('status', 'completed')->count(), 'failed_chunks' => $batches->where('status', 'failed')->count(), 'is_completed' => $batches->every(fn($batch) => $batch->status === 'completed' || $batch->status === 'failed' ) ]; } }
複数のテーブルをまたぐUpsert操作の設計
関連テーブル間の整合性を保ちながらUpsert処理を行う実装例を示します。
class ComplexUpsertService { public function upsertProductWithRelations(array $productData) { return DB::transaction(function () use ($productData) { // カテゴリの処理 $category = $this->upsertCategory($productData['category']); // 製造元の処理 $manufacturer = $this->upsertManufacturer($productData['manufacturer']); // 商品の処理 $product = $this->upsertProduct(array_merge( $productData['product'], [ 'category_id' => $category->id, 'manufacturer_id' => $manufacturer->id ] )); // 価格履歴の記録 $this->recordPriceHistory($product, $productData['product']['price']); // 在庫情報の更新 $this->updateInventory($product, $productData['inventory']); return $product; }); } private function upsertCategory(array $categoryData) { Category::upsert( [$categoryData], ['name'], ['description', 'updated_at'] ); return Category::where('name', $categoryData['name'])->first(); } private function upsertManufacturer(array $manufacturerData) { Manufacturer::upsert( [$manufacturerData], ['code'], ['name', 'contact_info', 'updated_at'] ); return Manufacturer::where('code', $manufacturerData['code'])->first(); } private function upsertProduct(array $productData) { Product::upsert( [$productData], ['sku'], [ 'name', 'price', 'category_id', 'manufacturer_id', 'updated_at' ] ); return Product::where('sku', $productData['sku'])->first(); } private function recordPriceHistory(Product $product, float $price) { PriceHistory::create([ 'product_id' => $product->id, 'price' => $price, 'recorded_at' => now() ]); } private function updateInventory(Product $product, array $inventoryData) { Inventory::upsert( [ 'product_id' => $product->id, 'quantity' => $inventoryData['quantity'], 'warehouse_id' => $inventoryData['warehouse_id'] ], ['product_id', 'warehouse_id'], ['quantity', 'updated_at'] ); } } // 使用例 $service = new ComplexUpsertService(); $productData = [ 'category' => [ 'name' => 'Electronics', 'description' => 'Electronic devices and accessories' ], 'manufacturer' => [ 'code' => 'MFR-001', 'name' => 'Tech Corp', 'contact_info' => 'contact@techcorp.com' ], 'product' => [ 'sku' => 'PROD-001', 'name' => 'Smartphone X', 'price' => 89900 ], 'inventory' => [ 'quantity' => 100, 'warehouse_id' => 1 ] ]; $result = $service->upsertProductWithRelations($productData);
発展的な実装におけるポイント:
- 条件付き処理
- ビジネスルールの組み込み
- データの妥当性チェック
- 処理の柔軟な制御
- 非同期処理
- ジョブキューの活用
- バッチ処理の進捗管理
- エラーハンドリングと再試行
- 複雑なデータ構造
- リレーション管理
- トランザクション制御
- データの整合性確保
- 拡張性と保守性
- モジュール化された設計
- 再利用可能なコンポーネント
- 適切なエラー処理