【保存版】boto3でS3を操作する実践ガイド!基本から応用まで15のコード例で完全解説

boto3でS3を操作する基礎知識

boto3とは?AWS SDKの中の核を見極めるPythonライブラリ

boto3は、PythonからAWSのサービスを操作するための公式SDKです。このライブラリを使用することで、S3をはじめとする様々なAWSサービスをPythonコードから簡単に制御できます。

boto3の主な特徴:

  • AWS公式がメンテナンスする信頼性の高いライブラリ
  • 豊富なドキュメントとコミュニティサポート
  • 低レベルと高レベルの両方のインターフェースを提供
  • スレッドセーフな設計

boto3では、S3操作に関して以下の2つのインターフェースを提供しています:

  1. クライアント(低レベル)インターフェース
# クライアントインターフェースの作成
import boto3
s3_client = boto3.client('s3')
  1. リソース(高レベル)インターフェース
# リソースインターフェースの作成
import boto3
s3_resource = boto3.resource('s3')

S3の操作に必要な最低限の設定と認証方法

boto3でS3を操作するには、適切な認証情報とアクセス権限の設定が必要です。以下に主な設定方法を示します:

  1. 認証情報の設定
# 方法1: 環境変数を使用
# AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY を環境変数に設定

# 方法2: 認証情報ファイルを使用
# ~/.aws/credentials に以下の形式で保存
# [default]
# aws_access_key_id = YOUR_ACCESS_KEY
# aws_secret_access_key = YOUR_SECRET_KEY

# 方法3: コード内で直接指定(非推奨)
import boto3
s3 = boto3.client(
    's3',
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY'
)
  1. リージョンの設定
# 方法1: 環境変数を使用
# AWS_DEFAULT_REGION を設定

# 方法2: コード内で指定
s3 = boto3.client('s3', region_name='ap-northeast-1')

セキュリティのベストプラクティス:

  • 本番環境では環境変数やIAMロールを使用する
  • アクセスキーはコード内に直接記述しない
  • 必要最小限の権限を持つIAMポリシーを使用する
  • 定期的にアクセスキーをローテーション(更新)する

boto3の設定が完了したら、以下のコードで接続テストを行えます:

import boto3

def test_s3_connection():
    try:
        s3 = boto3.client('s3')
        response = s3.list_buckets()
        print("接続成功!バケット一覧:", [bucket['Name'] for bucket in response['Buckets']])
    except Exception as e:
        print("接続エラー:", str(e))

# 接続テストの実行
test_s3_connection()

これらの基礎知識を押さえることで、以降のセクションで説明する実践的なS3操作を確実に実装できるようになります。

boto3でS3を操作する基本的な使い方

バケットの作成・一覧取得・削除方法

S3の基本操作の中で最も重要なのが、バケットの管理です。以下に主要な操作のコード例を示します:

import boto3
from botocore.exceptions import ClientError

def bucket_operations():
    s3 = boto3.client('s3')

    # バケットの作成
    def create_bucket(bucket_name):
        try:
            # 東京リージョンの場合、LocationConstraintの指定が必要
            s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={
                    'LocationConstraint': 'ap-northeast-1'
                }
            )
            print(f"バケット {bucket_name} を作成しました")
        except ClientError as e:
            print(f"エラー: {e}")

    # バケット一覧の取得
    def list_buckets():
        try:
            response = s3.list_buckets()
            for bucket in response['Buckets']:
                print(f"バケット名: {bucket['Name']}, 作成日時: {bucket['CreationDate']}")
        except ClientError as e:
            print(f"エラー: {e}")

    # バケットの削除(空のバケットのみ)
    def delete_bucket(bucket_name):
        try:
            s3.delete_bucket(Bucket=bucket_name)
            print(f"バケット {bucket_name} を削除しました")
        except ClientError as e:
            print(f"エラー: {e}")

オブジェクトのアップロード・ダウンロード・削除方法

S3の主な用途はファイルの保存です。以下にファイル操作の基本的な実装を示します:

def object_operations(bucket_name):
    s3 = boto3.client('s3')

    # ファイルのアップロード
    def upload_file(file_path, object_key):
        try:
            # メタデータとタグを付与する例
            s3.upload_file(
                file_path, 
                bucket_name,
                object_key,
                ExtraArgs={
                    'Metadata': {'Creator': 'Dexall'},
                    'TagSet': [{'Key': 'Project', 'Value': 'Demo'}]
                }
            )
            print(f"ファイル {file_path} をアップロードしました")
        except ClientError as e:
            print(f"エラー: {e}")

    # ファイルのダウンロード
    def download_file(object_key, file_path):
        try:
            s3.download_file(bucket_name, object_key, file_path)
            print(f"ファイル {object_key} をダウンロードしました")
        except ClientError as e:
            print(f"エラー: {e}")

    # オブジェクトの削除
    def delete_object(object_key):
        try:
            s3.delete_object(Bucket=bucket_name, Key=object_key)
            print(f"オブジェクト {object_key} を削除しました")
        except ClientError as e:
            print(f"エラー: {e}")

フォルダ構造の作成と管理方法

S3には物理的なフォルダは存在しませんが、キーにプレフィックスを使用することでフォルダのような構造を実現できます:

def folder_operations(bucket_name):
    s3 = boto3.client('s3')

    # フォルダ構造の作成
    def create_folder(folder_name):
        try:
            # フォルダを表すための空のオブジェクトを作成
            s3.put_object(
                Bucket=bucket_name,
                Key=f"{folder_name}/"
            )
            print(f"フォルダ {folder_name} を作成しました")
        except ClientError as e:
            print(f"エラー: {e}")

    # 特定のフォルダ内のオブジェクト一覧取得
    def list_folder_contents(folder_name):
        try:
            response = s3.list_objects_v2(
                Bucket=bucket_name,
                Prefix=folder_name
            )
            if 'Contents' in response:
                for obj in response['Contents']:
                    print(f"オブジェクト: {obj['Key']}, サイズ: {obj['Size']} bytes")
            else:
                print("フォルダは空です")
        except ClientError as e:
            print(f"エラー: {e}")

# 使用例
if __name__ == "__main__":
    bucket_name = "example-bucket"

    # フォルダ作成
    create_folder("documents")

    # ファイルのアップロード(フォルダ内)
    upload_file("local_file.txt", "documents/file.txt")

    # フォルダ内容の確認
    list_folder_contents("documents")

これらの基本操作を組み合わせることで、S3上でのファイル管理を効率的に行うことができます。また、各操作にはエラーハンドリングを実装することで、より堅牢なシステムを構築できます。

実践的なS3操作テクニック

大容量ファイルの効率的な転送方法

大容量ファイルを扱う場合、マルチパートアップロードを使用することで、転送の信頼性と効率を向上させることができます:

import boto3
import os
from boto3.s3.transfer import TransferConfig

def large_file_operations(bucket_name):
    s3 = boto3.client('s3')

    # マルチパートアップロードの設定
    config = TransferConfig(
        multipart_threshold=1024 * 25,  # 25MB以上でマルチパート転送を使用
        max_concurrency=10,             # 最大同時アップロードプロセス数
        multipart_chunksize=1024 * 25,  # チャンクサイズを25MBに設定
        use_threads=True                # マルチスレッドの使用
    )

    def upload_large_file(file_path, object_key):
        try:
            file_size = os.stat(file_path).st_size
            with open(file_path, 'rb') as file_data:
                s3.upload_fileobj(
                    file_data,
                    bucket_name,
                    object_key,
                    Config=config,
                    Callback=ProgressPercentage(file_path)
                )
            print(f"ファイル {file_path} ({file_size} bytes) のアップロードが完了しました")
        except Exception as e:
            print(f"エラー: {e}")

# プログレス表示用のクラス
class ProgressPercentage:
    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0

    def __call__(self, bytes_amount):
        self._seen_so_far += bytes_amount
        percentage = (self._seen_so_far / self._size) * 100
        print(f"\rProgress: {percentage:.2f}%", end='')

バケットポリシーとアクセス制御の設定

セキュアなS3の運用には、適切なアクセス制御が不可欠です:

def security_controls(bucket_name):
    s3 = boto3.client('s3')

    # パブリックアクセスのブロック
    def block_public_access():
        try:
            s3.put_public_access_block(
                Bucket=bucket_name,
                PublicAccessBlockConfiguration={
                    'BlockPublicAcls': True,
                    'IgnorePublicAcls': True,
                    'BlockPublicPolicy': True,
                    'RestrictPublicBuckets': True
                }
            )
            print("パブリックアクセスをブロックしました")
        except Exception as e:
            print(f"エラー: {e}")

    # バケットポリシーの設定
    def set_bucket_policy():
        bucket_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowSpecificIP",
                    "Effect": "Allow",
                    "Principal": "*",
                    "Action": "s3:GetObject",
                    "Resource": f"arn:aws:s3:::{bucket_name}/*",
                    "Condition": {
                        "IpAddress": {
                            "aws:SourceIp": ["192.0.2.0/24"]  # 特定のIPアドレス範囲
                        }
                    }
                }
            ]
        }
        try:
            s3.put_bucket_policy(
                Bucket=bucket_name,
                Policy=json.dumps(bucket_policy)
            )
            print("バケットポリシーを設定しました")
        except Exception as e:
            print(f"エラー: {e}")

バージョニングと世代管理の実装方法

バージョニングを使用することで、オブジェクトの変更履歴を管理できます:

def version_management(bucket_name):
    s3 = boto3.client('s3')

    # バージョニングの有効化
    def enable_versioning():
        try:
            s3.put_bucket_versioning(
                Bucket=bucket_name,
                VersioningConfiguration={'Status': 'Enabled'}
            )
            print("バージョニングを有効化しました")
        except Exception as e:
            print(f"エラー: {e}")

    # 全バージョンの一覧取得
    def list_object_versions(prefix=''):
        try:
            response = s3.list_object_versions(
                Bucket=bucket_name,
                Prefix=prefix
            )
            if 'Versions' in response:
                for version in response['Versions']:
                    print(f"キー: {version['Key']}, "
                          f"バージョンID: {version['VersionId']}, "
                          f"最終更新: {version['LastModified']}")
        except Exception as e:
            print(f"エラー: {e}")

    # 特定バージョンの復元
    def restore_version(object_key, version_id):
        try:
            s3.copy_object(
                Bucket=bucket_name,
                CopySource={
                    'Bucket': bucket_name,
                    'Key': object_key,
                    'VersionId': version_id
                },
                Key=object_key
            )
            print(f"バージョン {version_id} を復元しました")
        except Exception as e:
            print(f"エラー: {e}")

これらの実践的なテクニックを活用することで、より堅牢で効率的なS3の運用が可能になります。特に大規模なシステムや重要なデータを扱う場合には、これらの機能を適切に組み合わせることが重要です。

エラーハンドリングとデバッグ

よくある例外とその対処方法

boto3を使用したS3操作で発生しやすい例外とその対処方法を解説します:

import boto3
from botocore.exceptions import ClientError, ParamValidationError
import logging

# ロガーの設定
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

class S3ErrorHandler:
    def __init__(self):
        self.s3 = boto3.client('s3')

    def handle_common_errors(self, operation_name):
        def decorator(func):
            def wrapper(*args, **kwargs):
                try:
                    return func(*args, **kwargs)
                except ClientError as e:
                    error_code = e.response['Error']['Code']
                    error_message = e.response['Error']['Message']

                    if error_code == 'NoSuchBucket':
                        logger.error(f"バケットが存在しません: {error_message}")
                    elif error_code == 'NoSuchKey':
                        logger.error(f"オブジェクトが存在しません: {error_message}")
                    elif error_code == 'AccessDenied':
                        logger.error(f"アクセスが拒否されました: {error_message}")
                    elif error_code == 'InvalidBucketName':
                        logger.error(f"無効なバケット名です: {error_message}")
                    else:
                        logger.error(f"予期せぬエラーが発生しました: {error_code} - {error_message}")

                    raise
                except ParamValidationError as e:
                    logger.error(f"パラメータが無効です: {str(e)}")
                    raise
                except Exception as e:
                    logger.error(f"予期せぬエラーが発生しました: {str(e)}")
                    raise
            return wrapper
        return decorator

    @handle_common_errors("upload_file")
    def safe_upload_file(self, file_path, bucket, key):
        self.s3.upload_file(file_path, bucket, key)
        logger.info(f"ファイル {file_path} を {bucket}/{key} にアップロードしました")

ロギングとモニタリングの実装方法

効果的なデバッグと監視のためのロギング実装例:

import logging
from datetime import datetime
import json

class S3Logger:
    def __init__(self, log_level=logging.INFO):
        # ロガーの設定
        self.logger = logging.getLogger('s3_operations')
        self.logger.setLevel(log_level)

        # ファイルハンドラーの設定
        fh = logging.FileHandler('s3_operations.log')
        fh.setLevel(log_level)

        # フォーマッターの設定
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

        # CloudWatchメトリクスの設定
        self.cloudwatch = boto3.client('cloudwatch')

    def log_operation(self, operation_name, bucket_name, object_key=None, **kwargs):
        """操作のログを記録し、メトリクスを送信する"""
        # 詳細なログの記録
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'operation': operation_name,
            'bucket': bucket_name,
            'object_key': object_key,
            'additional_info': kwargs
        }
        self.logger.info(json.dumps(log_data))

        # CloudWatchメトリクスの送信
        try:
            self.cloudwatch.put_metric_data(
                Namespace='S3Operations',
                MetricData=[
                    {
                        'MetricName': operation_name,
                        'Value': 1,
                        'Unit': 'Count',
                        'Dimensions': [
                            {
                                'Name': 'Bucket',
                                'Value': bucket_name
                            }
                        ]
                    }
                ]
            )
        except Exception as e:
            self.logger.error(f"メトリクス送信エラー: {str(e)}")

# 使用例
def main():
    s3_logger = S3Logger()
    error_handler = S3ErrorHandler()

    try:
        # ファイルのアップロード(エラーハンドリング付き)
        error_handler.safe_upload_file(
            'example.txt',
            'my-bucket',
            'folder/example.txt'
        )

        # 操作のログを記録
        s3_logger.log_operation(
            'upload_file',
            'my-bucket',
            'folder/example.txt',
            file_size=1024,
            content_type='text/plain'
        )
    except Exception as e:
        print(f"操作に失敗しました: {str(e)}")

このように、適切なエラーハンドリングとロギングを実装することで、問題の早期発見と迅速な対応が可能になります。また、CloudWatchと連携することで、運用状況の可視化と監視も実現できます。

パフォーマンス最適化とベストプラクティス

並列処理による転送速度の向上方法

大量のファイルを効率的に処理するための並列処理の実装例を示します:

import boto3
from concurrent.futures import ThreadPoolExecutor
import os
from typing import List, Dict
import time

class S3ParallelProcessor:
    def __init__(self, bucket_name: str, max_workers: int = 10):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name
        self.max_workers = max_workers

    def parallel_upload(self, file_list: List[str], prefix: str = '') -> Dict[str, bool]:
        """
        複数のファイルを並列でアップロードする

        Args:
            file_list: アップロードするファイルパスのリスト
            prefix: S3上のプレフィックス

        Returns:
            結果を示す辞書 {ファイルパス: 成功/失敗}
        """
        results = {}

        def upload_single_file(file_path: str) -> bool:
            try:
                object_key = os.path.join(prefix, os.path.basename(file_path))
                self.s3.upload_file(file_path, self.bucket_name, object_key)
                return True
            except Exception as e:
                print(f"エラー ({file_path}): {str(e)}")
                return False

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_file = {
                executor.submit(upload_single_file, file_path): file_path
                for file_path in file_list
            }

            for future in future_to_file:
                file_path = future_to_file[future]
                results[file_path] = future.result()

        return results

セッション再利用によるパフォーマンス向上

接続のオーバーヘッドを削減するためのセッション管理の実装:

class S3SessionManager:
    _instance = None
    _session = None

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def __init__(self):
        if self._session is None:
            self._session = boto3.Session()

        # クライアントの設定をカスタマイズ
        config = boto3.Config(
            retries = dict(
                max_attempts = 3,  # リトライ回数
            ),
            connect_timeout = 5,   # 接続タイムアウト
            read_timeout = 10      # 読み取りタイムアウト
        )

        self.s3 = self._session.client('s3', config=config)

    def get_client(self):
        return self.s3

# 使用例
def optimize_operations():
    # シングルトンパターンでセッションを再利用
    s3_manager = S3SessionManager.get_instance()
    s3 = s3_manager.get_client()

    # 以降の操作で同じセッションを再利用
    return s3

コスト最適化のためのS3ストレージクラスの利用

データのアクセスパターンに応じた最適なストレージクラスの選択と移行:

class S3StorageOptimizer:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name

    def optimize_storage_class(self, prefix: str = '', days_threshold: int = 30):
        """
        アクセス頻度に基づいてストレージクラスを最適化

        Args:
            prefix: 対象オブジェクトのプレフィックス
            days_threshold: Intelligent-Tieringに移行する日数の閾値
        """
        try:
            # オブジェクト一覧の取得
            paginator = self.s3.get_paginator('list_objects_v2')

            for page in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
                if 'Contents' not in page:
                    continue

                for obj in page['Contents']:
                    # 最終アクセス日時の確認
                    try:
                        response = self.s3.head_object(
                            Bucket=self.bucket_name,
                            Key=obj['Key']
                        )

                        # Intelligent-Tieringへの移行条件をチェック
                        if self._should_move_to_intelligent_tiering(response):
                            self._change_storage_class(
                                obj['Key'],
                                'INTELLIGENT_TIERING'
                            )
                    except Exception as e:
                        print(f"エラー (オブジェクト {obj['Key']}): {str(e)}")

        except Exception as e:
            print(f"ストレージ最適化エラー: {str(e)}")

    def _should_move_to_intelligent_tiering(self, object_metadata) -> bool:
        """Intelligent-Tieringへの移行判断"""
        # 現在のストレージクラスのチェック
        current_storage_class = object_metadata.get('StorageClass', 'STANDARD')
        if current_storage_class != 'STANDARD':
            return False

        # オブジェクトサイズのチェック(128KB以上が推奨)
        if object_metadata['ContentLength'] < 128 * 1024:
            return False

        return True

    def _change_storage_class(self, object_key: str, storage_class: str):
        """オブジェクトのストレージクラスを変更"""
        try:
            self.s3.copy_object(
                Bucket=self.bucket_name,
                CopySource={'Bucket': self.bucket_name, 'Key': object_key},
                Key=object_key,
                StorageClass=storage_class,
                MetadataDirective='COPY'
            )
            print(f"オブジェクト {object_key} を {storage_class} に移行しました")
        except Exception as e:
            print(f"ストレージクラス変更エラー: {str(e)}")

これらの最適化テクニックを適切に組み合わせることで、S3操作のパフォーマンスを大幅に向上させることができます。特に以下の点に注意してください:

  • 並列処理は適切なワーカー数を設定し、リソースの過負荷を避ける
  • セッションの再利用により、不要な接続オーバーヘッドを削減する
  • ストレージクラスの選択は、アクセスパターンとコストを考慮して決定する
  • 大規模なデータ転送時は、マルチパートアップロードと組み合わせて使用する

実践的なユースケース実装例

定期的なバックアップの自動化スクリプト

データベースバックアップやログファイルの自動バックアップを実装する例を示します:

import boto3
import os
import schedule
import time
from datetime import datetime
from typing import Optional, List
import logging

class S3BackupSystem:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name
        self.logger = self._setup_logger()

    def _setup_logger(self) -> logging.Logger:
        """ロギングの設定"""
        logger = logging.getLogger('S3BackupSystem')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger

    def backup_files(self, source_dir: str, prefix: str) -> bool:
        """
        指定ディレクトリのファイルをS3にバックアップ

        Args:
            source_dir: バックアップ対象のディレクトリパス
            prefix: S3上のプレフィックス
        """
        try:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

            # バックアップ対象ファイルの列挙
            files_to_backup = []
            for root, _, files in os.walk(source_dir):
                for file in files:
                    local_path = os.path.join(root, file)
                    relative_path = os.path.relpath(local_path, source_dir)
                    s3_key = f"{prefix}/{timestamp}/{relative_path}"
                    files_to_backup.append((local_path, s3_key))

            # ファイルのアップロード
            for local_path, s3_key in files_to_backup:
                self.s3.upload_file(local_path, self.bucket_name, s3_key)
                self.logger.info(f"バックアップ完了: {s3_key}")

            # 古いバックアップの削除
            self._cleanup_old_backups(prefix, retention_days=7)
            return True

        except Exception as e:
            self.logger.error(f"バックアップエラー: {str(e)}")
            return False

    def _cleanup_old_backups(self, prefix: str, retention_days: int):
        """
        指定日数より古いバックアップを削除

        Args:
            prefix: 検索対象のプレフィックス
            retention_days: 保持する日数
        """
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=prefix
            )

            if 'Contents' in response:
                current_time = datetime.now()
                for obj in response['Contents']:
                    age = (current_time - obj['LastModified'].replace(tzinfo=None)).days
                    if age > retention_days:
                        self.s3.delete_object(
                            Bucket=self.bucket_name,
                            Key=obj['Key']
                        )
                        self.logger.info(f"古いバックアップを削除: {obj['Key']}")

        except Exception as e:
            self.logger.error(f"クリーンアップエラー: {str(e)}")

def run_scheduled_backup():
    """スケジュールされたバックアップの実行"""
    backup_system = S3BackupSystem('my-backup-bucket')

    # 日次バックアップの設定
    schedule.every().day.at("01:00").do(
        backup_system.backup_files,
        '/var/log',
        'system_logs'
    )

    # 週次バックアップの設定
    schedule.every().monday.at("02:00").do(
        backup_system.backup_files,
        '/var/www',
        'website_backup'
    )

    while True:
        schedule.run_pending()
        time.sleep(60)

セキュアなファイル共有システムの構築

一時的なアクセス用の署名付きURLを生成するファイル共有システムの実装例:

import boto3
from datetime import datetime, timedelta
from typing import Optional, Dict
import secrets
import hashlib
from dataclasses import dataclass

@dataclass
class ShareConfig:
    """ファイル共有の設定情報"""
    expiration_hours: int = 24
    max_downloads: Optional[int] = None
    require_password: bool = False

class SecureFileSharing:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name
        self._access_log = {}  # 簡易的なアクセスログ(本番では永続化が必要)

    def create_share_link(
        self,
        file_key: str,
        config: ShareConfig
    ) -> Dict[str, str]:
        """
        ファイル共有用の署名付きURLを生成

        Args:
            file_key: 共有するファイルのキー
            config: 共有の設定

        Returns:
            共有情報を含む辞書
        """
        try:
            # ファイルの存在確認
            self.s3.head_object(Bucket=self.bucket_name, Key=file_key)

            # 共有用のユニークID生成
            share_id = secrets.token_urlsafe(16)

            # パスワードの生成(必要な場合)
            password = None
            if config.require_password:
                password = secrets.token_urlsafe(8)

            # 署名付きURLの生成
            url = self.s3.generate_presigned_url(
                'get_object',
                Params={
                    'Bucket': self.bucket_name,
                    'Key': file_key
                },
                ExpiresIn=config.expiration_hours * 3600
            )

            # 共有情報の記録
            self._access_log[share_id] = {
                'file_key': file_key,
                'created_at': datetime.now(),
                'expires_at': datetime.now() + timedelta(hours=config.expiration_hours),
                'max_downloads': config.max_downloads,
                'current_downloads': 0,
                'password_hash': hashlib.sha256(password.encode()).hexdigest() if password else None
            }

            share_info = {
                'share_id': share_id,
                'url': url,
                'expires_in_hours': config.expiration_hours
            }
            if password:
                share_info['password'] = password

            return share_info

        except Exception as e:
            raise ValueError(f"共有リンクの作成に失敗: {str(e)}")

    def verify_access(self, share_id: str, password: Optional[str] = None) -> bool:
        """
        共有リンクへのアクセス検証

        Args:
            share_id: 共有ID
            password: アクセスパスワード(設定されている場合)

        Returns:
            アクセス可否
        """
        if share_id not in self._access_log:
            return False

        share_info = self._access_log[share_id]

        # 有効期限のチェック
        if datetime.now() > share_info['expires_at']:
            return False

        # ダウンロード回数のチェック
        if (share_info['max_downloads'] and 
            share_info['current_downloads'] >= share_info['max_downloads']):
            return False

        # パスワードの検証
        if share_info['password_hash']:
            if not password:
                return False
            if hashlib.sha256(password.encode()).hexdigest() != share_info['password_hash']:
                return False

        # アクセスカウントの更新
        share_info['current_downloads'] += 1
        return True

# 使用例
def example_usage():
    sharing = SecureFileSharing('my-share-bucket')

    # セキュアな共有リンクの作成
    config = ShareConfig(
        expiration_hours=48,
        max_downloads=5,
        require_password=True
    )

    share_info = sharing.create_share_link(
        'documents/confidential.pdf',
        config
    )

    print(f"共有ID: {share_info['share_id']}")
    print(f"URL: {share_info['url']}")
    print(f"パスワード: {share_info.get('password')}")

    # アクセス検証
    is_valid = sharing.verify_access(
        share_info['share_id'],
        share_info['password']
    )
    print(f"アクセス検証結果: {is_valid}")

これらの実装例は、実際のビジネスシーンで発生する要件に対応できるよう設計されています。バックアップシステムでは、定期的なバックアップとクリーンアップを自動化し、ファイル共有システムでは、セキュリティを考慮した一時的なアクセス制御を実現しています。

実運用では、これらのコードをベースに、以下のような拡張が考えられます:

  • バックアップシステム
  • バックアップの暗号化機能の追加
  • 複数の保存世代管理
  • 障害通知機能の実装
  • ファイル共有システム
  • アクセスログの永続化
  • IP制限の追加
  • ファイルの暗号化/復号化機能
  • 監査ログの実装

これらの機能は、実際のプロジェクトの要件に応じて適宜追加することができます。