【保存版】C++スレッド完全ガイド:並行処理の実装から最適化まで

C++でのスレッド実装の基礎知識

マルチスレッドプログラミングは現代のソフトウェア開発において不可欠なスキルです。C++11以降で導入された標準スレッドライブラリを使用することで、効率的な並行処理を実装できます。

std::threadクラスの特徴と基本的な使い方

std::threadは、C++の標準ライブラリで提供される基本的なスレッド管理クラスです。以下に基本的な使用方法と注意点を説明します。

#include <iostream>
#include <thread>
#include <vector>

// スレッドで実行する関数
void worker_function(int id) {
    std::cout << "Worker " << id << " starting\n";
    // 実際の処理をここに記述
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Worker " << id << " finished\n";
}

int main() {
    // スレッドの作成方法1: 関数ポインタを使用
    std::thread t1(worker_function, 1);

    // スレッドの作成方法2: ラムダ式を使用
    std::thread t2([](int id) {
        std::cout << "Lambda worker " << id << " executing\n";
    }, 2);

    // 重要: スレッドの終了を待機
    t1.join();
    t2.join();

    return 0;
}

スレッドの作成とライフサイクル管理の重要性

スレッドのライフサイクル管理は、安全なマルチスレッドプログラミングの基本です。適切な管理を怠ると、予期せぬ動作やクラッシュの原因となります。

#include <thread>
#include <stdexcept>

class ThreadGuard {
    std::thread& t;
public:
    // RAII原則に基づくスレッド管理
    explicit ThreadGuard(std::thread& t_) : t(t_) {}

    ~ThreadGuard() {
        if (t.joinable()) {
            t.join(); // スレッドの終了を確実に待機
        }
    }

    // コピー禁止
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void example_function() {
    std::thread t([]() {
        // スレッドの処理
    });

    ThreadGuard g(t); // RAIIによる自動管理

    // 例外が発生しても、ThreadGuardのデストラクタで
    // 適切にjoin()が呼ばれる
} // スコープを抜けると自動的にjoin()が呼ばれる

スレッドセーフなコード作成の原則

スレッドセーフなコードを書くための重要な原則を以下に示します:

  1. データの共有を最小限に抑える
  • 可能な限り各スレッドで独立したデータを使用
  • 必要な場合のみ共有リソースを使用
  1. 適切な同期機構の使用
  • ミューテックスによるデータ保護
  • アトミック操作の活用
  • スコープ付きロック(std::lock_guard, std::unique_lock)の使用
#include <mutex>
#include <thread>
#include <vector>

class ThreadSafeCounter {
    mutable std::mutex mutex_; // mutableでconst関数内での使用を許可
    int value_ = 0;

public:
    // スレッドセーフな値の増加
    void increment() {
        std::lock_guard<std::mutex> lock(mutex_);
        ++value_;
    }

    // スレッドセーフな値の取得
    int get() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return value_;
    }
};

// 使用例
void safe_counter_example() {
    ThreadSafeCounter counter;
    std::vector<std::thread> threads;

    // 複数スレッドからの安全なアクセス
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back([&counter]() {
            for (int j = 0; j < 1000; ++j) {
                counter.increment();
            }
        });
    }

    // すべてのスレッドの終了を待機
    for (auto& t : threads) {
        t.join();
    }

    // 最終的な値は必ず10000になる
    std::cout << "Final value: " << counter.get() << std::endl;
}

スレッドセーフなコードを書く際の重要なポイント:

  • RAIIパターンを活用してリソース管理を自動化
  • データ競合を防ぐための適切な同期機構の選択
  • デッドロックを防ぐためのロック取得順序の一貫性維持
  • 例外安全性の確保

これらの基本原則を理解し、適切に実装することで、信頼性の高いマルチスレッドアプリケーションを開発することができます。

実践的なスレッドプログラミング手法

マルチスレッドプログラミングでは、適切な同期機構の選択と実装が重要です。このセクションでは、実践的な同期処理の実装方法について解説します。

データ競合を防ぐミューテックスの実装方法

ミューテックスは、共有リソースへの排他的アクセスを制御する基本的な同期機構です。適切な使用方法を以下に示します。

#include <mutex>
#include <thread>
#include <queue>
#include <iostream>

template<typename T>
class ThreadSafeQueue {
    std::queue<T> queue_;
    mutable std::mutex mutex_;

public:
    // データの追加(スレッドセーフ)
    void push(T value) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(std::move(value));
    }

    // データの取得と削除(スレッドセーフ)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        value = std::move(queue_.front());
        queue_.pop();
        return true;
    }

    // サイズの取得(スレッドセーフ)
    size_t size() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.size();
    }
};

// 使用例
void thread_safe_queue_example() {
    ThreadSafeQueue<int> queue;

    // 生産者スレッド
    std::thread producer([&queue]() {
        for (int i = 0; i < 10; ++i) {
            queue.push(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    // 消費者スレッド
    std::thread consumer([&queue]() {
        int value;
        while (true) {
            if (queue.try_pop(value)) {
                std::cout << "Consumed: " << value << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
        }
    });

    producer.join();
    // 注意: この例では簡略化のため、consumerは強制終了します
    consumer.detach();
}

条件変数を使用した効率的な同期処理

条件変数は、スレッド間の通知と待機を効率的に実現する同期機構です。以下に実装例を示します。

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class BlockingQueue {
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable not_empty_;
    std::condition_variable not_full_;
    size_t capacity_;

public:
    explicit BlockingQueue(size_t capacity = 100) : capacity_(capacity) {}

    void push(T value) {
        std::unique_lock<std::mutex> lock(mutex_);
        // キューが満杯の場合、空きができるまで待機
        not_full_.wait(lock, [this]() { 
            return queue_.size() < capacity_; 
        });

        queue_.push(std::move(value));
        lock.unlock();
        // 待機中の消費者に通知
        not_empty_.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        // キューが空の場合、データが追加されるまで待機
        not_empty_.wait(lock, [this]() { 
            return !queue_.empty(); 
        });

        T value = std::move(queue_.front());
        queue_.pop();
        lock.unlock();
        // 待機中の生産者に通知
        not_full_.notify_one();
        return value;
    }
};

スレッドプールによるリソース管理の最適化

スレッドプールは、スレッドの生成・破棄コストを削減し、リソースを効率的に管理するための重要な設計パターンです。

#include <functional>
#include <future>
#include <thread>
#include <vector>

class ThreadPool {
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;

public:
    explicit ThreadPool(size_t threads) : stop_(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] {
                            return stop_ || !tasks_.empty();
                        });

                        if (stop_ && tasks_.empty()) {
                            return;
                        }

                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return res;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& worker : workers_) {
            worker.join();
        }
    }
};

// 使用例
void thread_pool_example() {
    ThreadPool pool(4); // 4つのワーカースレッドを作成
    std::vector<std::future<int>> results;

    // タスクの投入
    for (int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return i * i;
            })
        );
    }

    // 結果の取得
    for (auto& result : results) {
        std::cout << result.get() << ' ';
    }
}

実践的なスレッドプログラミングにおける重要なポイント:

  1. 適切な同期機構の選択
  • ミューテックス:単純な排他制御
  • 条件変数:スレッド間通知
  • アトミック操作:軽量な同期
  1. パフォーマンスの考慮
  • ロックの範囲を最小限に
  • 細粒度のロックと粗粒度のロックのトレードオフ
  • スレッドプールによるオーバーヘッド削減
  1. エラー処理とリソース管理
  • 例外安全性の確保
  • RAIIによるリソース管理
  • デッドロック防止の設計

これらの実践的な実装パターンを適切に活用することで、効率的で信頼性の高いマルチスレッドアプリケーションを開発することができます。

高度なスレッド制御テクニック

C++11以降で導入された高度なスレッド制御機構を使用することで、より効率的で堅牢な並行処理を実現できます。

future/promiseによる非同期処理の実装

future/promiseは、非同期処理の結果を効率的に取得するための機構です。以下に詳細な実装例を示します。

#include <future>
#include <thread>
#include <iostream>
#include <vector>
#include <stdexcept>

// 非同期で実行される計算処理の例
std::vector<int> calculate_parallel(const std::vector<int>& data, 
                                  size_t start, size_t end) {
    std::vector<int> result;
    for (size_t i = start; i < end; ++i) {
        // 重い計算処理をシミュレート
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        result.push_back(data[i] * data[i]);
    }
    return result;
}

// future/promiseを使用した非同期処理の実装例
class AsyncProcessor {
    std::vector<int> data_;
    std::vector<std::future<std::vector<int>>> futures_;

public:
    explicit AsyncProcessor(const std::vector<int>& data) : data_(data) {}

    void process_async() {
        const size_t num_threads = 4;
        const size_t chunk_size = data_.size() / num_threads;

        for (size_t i = 0; i < num_threads; ++i) {
            size_t start = i * chunk_size;
            size_t end = (i == num_threads - 1) ? data_.size() : 
                                                 (i + 1) * chunk_size;

            // 非同期タスクを開始
            futures_.push_back(
                std::async(std::launch::async,
                          calculate_parallel,
                          std::ref(data_),
                          start,
                          end)
            );
        }
    }

    std::vector<int> get_results() {
        std::vector<int> results;
        for (auto& future : futures_) {
            // 各タスクの結果を取得して結合
            auto chunk_result = future.get();
            results.insert(results.end(), 
                         chunk_result.begin(), 
                         chunk_result.end());
        }
        return results;
    }
};

// promiseを使用した例外処理の例
void process_with_exception_handling() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();

    std::thread worker([&promise] {
        try {
            // 例外が発生する可能性のある処理
            throw std::runtime_error("Something went wrong");
            promise.set_value(42);
        }
        catch (...) {
            // 例外をfutureに転送
            promise.set_exception(std::current_exception());
        }
    });

    try {
        int result = future.get();
        std::cout << "Result: " << result << std::endl;
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }

    worker.join();
}

atomic型を活用したロックフリープログラミング

アトミック操作を使用することで、ロックを使用せずに安全な並行処理を実現できます。

#include <atomic>
#include <thread>
#include <vector>

// ロックフリーなカウンター実装
class LockFreeCounter {
    std::atomic<int> value_;

public:
    LockFreeCounter() : value_(0) {}

    void increment() {
        value_.fetch_add(1, std::memory_order_relaxed);
    }

    void decrement() {
        value_.fetch_sub(1, std::memory_order_relaxed);
    }

    int get() const {
        return value_.load(std::memory_order_relaxed);
    }
};

// ロックフリーなスタック実装
template<typename T>
class LockFreeStack {
    struct Node {
        T data;
        std::atomic<Node*> next;

        Node(const T& data_) : data(data_), next(nullptr) {}
    };

    std::atomic<Node*> head_;

public:
    LockFreeStack() : head_(nullptr) {}

    void push(const T& data) {
        Node* new_node = new Node(data);
        new_node->next = head_.load(std::memory_order_relaxed);

        while (!head_.compare_exchange_weak(
            new_node->next,
            new_node,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    bool pop(T& result) {
        Node* old_head = head_.load(std::memory_order_relaxed);

        while (old_head && 
               !head_.compare_exchange_weak(
                   old_head,
                   old_head->next,
                   std::memory_order_acquire,
                   std::memory_order_relaxed)) {}

        if (old_head) {
            result = old_head->data;
            delete old_head;
            return true;
        }
        return false;
    }
};

メモリモデルとスレッドの相互作用の理解

C++のメモリモデルを理解し、適切な同期を行うことは、正しい並行プログラミングに不可欠です。

#include <atomic>
#include <thread>
#include <cassert>

// メモリオーダーの実践的な使用例
class MessageQueue {
    struct Message {
        int data;
        std::atomic<bool> ready;
        Message() : data(0), ready(false) {}
    };

    Message messages_[1024];
    std::atomic<int> write_index_{0};
    std::atomic<int> read_index_{0};

public:
    void send(int data) {
        int index = write_index_.fetch_add(1, std::memory_order_relaxed) % 1024;
        messages_[index].data = data;
        messages_[index].ready.store(true, std::memory_order_release);
    }

    bool receive(int& data) {
        int index = read_index_.load(std::memory_order_relaxed);
        if (!messages_[index].ready.load(std::memory_order_acquire)) {
            return false;
        }

        data = messages_[index].data;
        messages_[index].ready.store(false, std::memory_order_relaxed);
        read_index_.store((index + 1) % 1024, std::memory_order_relaxed);
        return true;
    }
};

// メモリバリアの実践的な使用例
void memory_barrier_example() {
    std::atomic<bool> flag{false};
    int shared_data = 0;

    std::thread producer([&]() {
        shared_data = 42;  // データの準備
        flag.store(true, std::memory_order_release);  // リリースバリア
    });

    std::thread consumer([&]() {
        while (!flag.load(std::memory_order_acquire)) {  // アクワイアバリア
            std::this_thread::yield();
        }
        assert(shared_data == 42);  // データが確実に見える
    });

    producer.join();
    consumer.join();
}

高度なスレッド制御における重要なポイント:

  1. future/promiseの適切な使用
  • 非同期処理結果の効率的な取得
  • 例外処理の統合
  • タスクのキャンセレーション管理
  1. アトミック操作の最適化
  • メモリオーダーの適切な選択
  • ロックフリーアルゴリズムの実装
  • パフォーマンスとスレッドセーフティのバランス
  1. メモリモデルの考慮
  • データ競合の防止
  • 適切な同期機構の選択
  • メモリバリアの効果的な使用

これらの高度なテクニックを適切に組み合わせることで、高性能で信頼性の高い並行処理システムを構築できます。ただし、これらの機能は慎重に使用する必要があり、適切な理解と十分なテストが不可欠です。

パフォーマンス最適化とデバッグ

マルチスレッドアプリケーションの開発では、パフォーマンスの最適化とデバッグが特に重要です。このセクションでは、実践的な最適化手法とデバッグテクニックを解説します。

スレッド処理のボトルネック特定と解決方法

マルチスレッドアプリケーションのパフォーマンス最適化には、適切なプロファイリングとボトルネックの特定が不可欠です。

#include <chrono>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <vector>

// パフォーマンス測定用のシンプルなプロファイラー
class ScopedTimer {
    std::chrono::high_resolution_clock::time_point start_;
    std::string operation_name_;

public:
    explicit ScopedTimer(std::string name) 
        : start_(std::chrono::high_resolution_clock::now())
        , operation_name_(std::move(name)) {}

    ~ScopedTimer() {
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::microseconds>
                       (end - start_).count();
        std::cout << operation_name_ << " took " 
                  << duration << " microseconds\n";
    }
};

// ロックの粒度を最適化した共有データ構造の例
template<typename T>
class OptimizedCache {
    struct Entry {
        T data;
        std::atomic<uint64_t> timestamp;
    };

    std::vector<Entry> cache_;
    std::vector<std::shared_mutex> mutexes_;  // 複数のミューテックス
    static constexpr size_t MUTEX_COUNT = 32;  // ミューテックスの数

    size_t get_mutex_index(size_t index) const {
        return index % MUTEX_COUNT;
    }

public:
    OptimizedCache(size_t size) 
        : cache_(size)
        , mutexes_(MUTEX_COUNT) {}

    void update(size_t index, const T& value) {
        size_t mutex_index = get_mutex_index(index);
        {
            std::unique_lock lock(mutexes_[mutex_index]);
            cache_[index].data = value;
            cache_[index].timestamp.store(
                std::chrono::system_clock::now().time_since_epoch().count(),
                std::memory_order_release
            );
        }
    }

    std::pair<T, uint64_t> read(size_t index) {
        size_t mutex_index = get_mutex_index(index);
        std::shared_lock lock(mutexes_[mutex_index]);
        return {
            cache_[index].data,
            cache_[index].timestamp.load(std::memory_order_acquire)
        };
    }
};

デッドロック防止のためのベストプラクティス

デッドロックの防止と検出は、マルチスレッドプログラミングにおける重要な課題です。

#include <mutex>
#include <thread>
#include <iostream>

// デッドロック検出用のロックガード
template<typename... Mutexes>
class DeadlockDetector {
    std::tuple<std::unique_lock<Mutexes>...> locks_;

public:
    DeadlockDetector(Mutexes&... mutexes) {
        // try_lockを使用して全てのロックを獲得
        if (!std::try_lock(mutexes...)) {
            // ロックの獲得に失敗した場合
            throw std::runtime_error("Potential deadlock detected!");
        }

        // ロックの獲得に成功した場合、所有権を移動
        locks_ = std::make_tuple(
            std::unique_lock<Mutexes>(mutexes, std::adopt_lock)...
        );
    }
};

// デッドロック防止のための階層的ロック
class HierarchicalMutex {
    std::mutex mutex_;
    const unsigned long hierarchy_value_;
    unsigned long previous_hierarchy_value_;
    static thread_local unsigned long this_thread_hierarchy_value;

    void check_for_hierarchy_violation() {
        if (this_thread_hierarchy_value <= hierarchy_value_) {
            throw std::runtime_error("Mutex hierarchy violated");
        }
    }

    void update_hierarchy_value() {
        previous_hierarchy_value_ = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_value_;
    }

public:
    explicit HierarchicalMutex(unsigned long value)
        : hierarchy_value_(value) {}

    void lock() {
        check_for_hierarchy_violation();
        mutex_.lock();
        update_hierarchy_value();
    }

    void unlock() {
        this_thread_hierarchy_value = previous_hierarchy_value_;
        mutex_.unlock();
    }

    bool try_lock() {
        check_for_hierarchy_violation();
        if (!mutex_.try_lock()) {
            return false;
        }
        update_hierarchy_value();
        return true;
    }
};

thread_local unsigned long 
    HierarchicalMutex::this_thread_hierarchy_value(ULONG_MAX);

マルチスレッドアプリケーションのデバッグ技法

効果的なデバッグには、適切なツールと手法の使用が不可欠です。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <sstream>
#include <cassert>

// デバッグ用のスレッド安全なロガー
class ThreadSafeLogger {
    std::mutex mutex_;
    std::stringstream log_;

public:
    template<typename T>
    void log(const T& message) {
        std::lock_guard<std::mutex> lock(mutex_);
        log_ << "[Thread " << std::this_thread::get_id() << "] "
             << message << std::endl;
    }

    std::string get_log() {
        std::lock_guard<std::mutex> lock(mutex_);
        return log_.str();
    }
};

// 条件変数のデバッグ支援クラス
class DebugConditionVariable {
    std::condition_variable cv_;
    std::mutex& mutex_;
    ThreadSafeLogger& logger_;
    std::string name_;

public:
    DebugConditionVariable(std::mutex& mutex, 
                          ThreadSafeLogger& logger,
                          std::string name)
        : mutex_(mutex)
        , logger_(logger)
        , name_(std::move(name)) {}

    template<typename Predicate>
    void wait(std::unique_lock<std::mutex>& lock, Predicate pred) {
        logger_.log(name_ + ": waiting");
        cv_.wait(lock, [this, pred]() {
            bool result = pred();
            logger_.log(name_ + ": predicate check returned " + 
                       std::to_string(result));
            return result;
        });
        logger_.log(name_ + ": resumed");
    }

    void notify_one() {
        logger_.log(name_ + ": notifying one");
        cv_.notify_one();
    }

    void notify_all() {
        logger_.log(name_ + ": notifying all");
        cv_.notify_all();
    }
};

// デバッグ用のアサーション
#ifdef _DEBUG
#define THREAD_ASSERT(condition, message) \
    do { \
        if (!(condition)) { \
            std::stringstream ss; \
            ss << "Assertion failed: " << message << "\n" \
               << "Thread ID: " << std::this_thread::get_id() << "\n" \
               << "File: " << __FILE__ << "\n" \
               << "Line: " << __LINE__; \
            throw std::runtime_error(ss.str()); \
        } \
    } while (0)
#else
#define THREAD_ASSERT(condition, message) ((void)0)
#endif

パフォーマンス最適化とデバッグにおける重要なポイント:

  1. パフォーマンス最適化
  • プロファイリングによるボトルネックの特定
  • ロックの粒度の最適化
  • メモリアクセスパターンの改善
  • キャッシュラインの考慮
  1. デッドロック防止
  • ロックの階層化
  • try_lockの活用
  • リソース獲得順序の一貫性維持
  • デッドロック検出機構の実装
  1. デバッグ技法
  • ロギング機構の活用
  • アサーションの適切な使用
  • デバッグビルドでの追加チェック
  • ツールの効果的な活用(Valgrind, ThreadSanitizer等)

これらの技術を適切に活用することで、高品質なマルチスレッドアプリケーションの開発が可能になります。ただし、デバッグと最適化は継続的なプロセスであり、常に新しい問題に対応できる準備が必要です。

実践的なユースケースと実装例

実際の開発現場で遭遇する具体的なシナリオに基づいて、効率的なマルチスレッド実装の例を紹介します。

並列データ処理の効率的な実装方法

大量のデータを効率的に処理する並列処理の実装例を示します。

#include <vector>
#include <thread>
#include <future>
#include <algorithm>
#include <numeric>
#include <functional>

// 並列データ処理のフレームワーク
template<typename Iterator, typename T, typename Function>
T parallel_reduce(Iterator first, Iterator last, T init, Function func) {
    const size_t length = std::distance(first, last);
    if (length == 0) {
        return init;
    }

    const size_t min_per_thread = 25;
    const size_t max_threads = 
        (length + min_per_thread - 1) / min_per_thread;
    const size_t hardware_threads = 
        std::thread::hardware_concurrency();
    const size_t num_threads = 
        std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    const size_t block_size = length / num_threads;

    std::vector<std::future<T>> futures(num_threads - 1);
    std::vector<std::thread> threads(num_threads - 1);

    Iterator block_start = first;
    for (size_t i = 0; i < num_threads - 1; ++i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);

        std::packaged_task<T(Iterator, Iterator)> task(
            [func](Iterator start, Iterator end) {
                return std::accumulate(start, end, T{}, func);
            }
        );

        futures[i] = task.get_future();
        threads[i] = std::thread(std::move(task), block_start, block_end);
        block_start = block_end;
    }

    T result = std::accumulate(block_start, last, init, func);

    for (size_t i = 0; i < num_threads - 1; ++i) {
        threads[i].join();
        result = func(result, futures[i].get());
    }

    return result;
}

// 使用例:並列データ集計
void parallel_processing_example() {
    std::vector<int> data(10000000);
    std::iota(data.begin(), data.end(), 0);  // 0から連番で初期化

    // 並列での合計計算
    auto sum = parallel_reduce(data.begin(), data.end(), 0,
        std::plus<int>());

    // 並列での最大値検索
    auto max = parallel_reduce(data.begin(), data.end(),
        std::numeric_limits<int>::min(),
        [](int a, int b) { return std::max(a, b); });
}

非同期I/O処理の最適化テクニック

I/O処理を効率的に行うための非同期処理実装例を示します。

#include <queue>
#include <thread>
#include <future>
#include <fstream>
#include <string>

// 非同期ファイル処理クラス
class AsyncFileProcessor {
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable cv_;
    std::thread worker_;
    bool stop_;

    void worker_thread() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                cv_.wait(lock, [this] {
                    return !tasks_.empty() || stop_;
                });

                if (stop_ && tasks_.empty()) {
                    return;
                }

                task = std::move(tasks_.front());
                tasks_.pop();
            }
            task();
        }
    }

public:
    AsyncFileProcessor() : stop_(false) {
        worker_ = std::thread(&AsyncFileProcessor::worker_thread, this);
    }

    ~AsyncFileProcessor() {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            stop_ = true;
        }
        cv_.notify_one();
        worker_.join();
    }

    // 非同期ファイル書き込み
    std::future<void> write_async(const std::string& filename,
                                 const std::string& data) {
        auto promise = std::make_shared<std::promise<void>>();
        auto future = promise->get_future();

        {
            std::lock_guard<std::mutex> lock(mutex_);
            tasks_.push([filename, data, promise]() {
                try {
                    std::ofstream file(filename);
                    file << data;
                    promise->set_value();
                }
                catch (...) {
                    promise->set_exception(std::current_exception());
                }
            });
        }
        cv_.notify_one();
        return future;
    }

    // 非同期ファイル読み込み
    std::future<std::string> read_async(const std::string& filename) {
        auto promise = std::make_shared<std::promise<std::string>>();
        auto future = promise->get_future();

        {
            std::lock_guard<std::mutex> lock(mutex_);
            tasks_.push([filename, promise]() {
                try {
                    std::ifstream file(filename);
                    std::stringstream buffer;
                    buffer << file.rdbuf();
                    promise->set_value(buffer.str());
                }
                catch (...) {
                    promise->set_exception(std::current_exception());
                }
            });
        }
        cv_.notify_one();
        return future;
    }
};

リアルタイムシステムでのスレッド活用事例

リアルタイムシステムにおける効率的なスレッド管理の実装例を示します。

#include <chrono>
#include <thread>
#include <atomic>
#include <vector>
#include <queue>

// リアルタイムタスクスケジューラ
class RealTimeScheduler {
    struct Task {
        std::function<void()> function;
        std::chrono::steady_clock::time_point schedule_time;
        std::chrono::milliseconds interval;
        bool recurring;

        bool operator>(const Task& other) const {
            return schedule_time > other.schedule_time;
        }
    };

    std::priority_queue<Task, std::vector<Task>, std::greater<>> tasks_;
    std::mutex mutex_;
    std::condition_variable cv_;
    std::atomic<bool> running_{true};
    std::thread scheduler_thread_;

    void scheduler_loop() {
        while (running_) {
            std::unique_lock<std::mutex> lock(mutex_);

            if (tasks_.empty()) {
                cv_.wait(lock);
                continue;
            }

            auto now = std::chrono::steady_clock::now();
            auto& task = tasks_.top();

            if (task.schedule_time <= now) {
                auto task_copy = task;
                tasks_.pop();

                if (task.recurring) {
                    task.schedule_time += task.interval;
                    tasks_.push(task);
                }

                lock.unlock();
                task_copy.function();
            } else {
                cv_.wait_until(lock, task.schedule_time);
            }
        }
    }

public:
    RealTimeScheduler() {
        scheduler_thread_ = std::thread(&RealTimeScheduler::scheduler_loop,
                                      this);
    }

    ~RealTimeScheduler() {
        running_ = false;
        cv_.notify_one();
        scheduler_thread_.join();
    }

    // 一回限りのタスクのスケジュール
    void schedule_once(std::function<void()> func,
                      std::chrono::steady_clock::time_point time) {
        std::lock_guard<std::mutex> lock(mutex_);
        tasks_.push(Task{
            std::move(func),
            time,
            std::chrono::milliseconds(0),
            false
        });
        cv_.notify_one();
    }

    // 定期的なタスクのスケジュール
    void schedule_recurring(std::function<void()> func,
                          std::chrono::milliseconds interval) {
        std::lock_guard<std::mutex> lock(mutex_);
        tasks_.push(Task{
            std::move(func),
            std::chrono::steady_clock::now() + interval,
            interval,
            true
        });
        cv_.notify_one();
    }
};

// 使用例
void realtime_system_example() {
    RealTimeScheduler scheduler;

    // センサーデータ収集タスク(100ms間隔)
    scheduler.schedule_recurring([]() {
        // センサーデータ収集処理
    }, std::chrono::milliseconds(100));

    // 制御ループタスク(50ms間隔)
    scheduler.schedule_recurring([]() {
        // 制御処理
    }, std::chrono::milliseconds(50));

    // 5秒後に実行される一回限りのタスク
    scheduler.schedule_once([]() {
        // 初期化処理
    }, std::chrono::steady_clock::now() + std::chrono::seconds(5));
}

実践的なマルチスレッドプログラミングにおける重要なポイント:

  1. 並列データ処理
  • データ分割の最適化
  • スレッド数の適切な選択
  • 結果の効率的な集約
  • エラーハンドリングの実装
  1. 非同期I/O処理
  • タスクキューの効率的な管理
  • リソースの適切な解放
  • エラー状態の伝播
  • キャンセレーション機能の実装
  1. リアルタイムシステム
  • タイミング制約の遵守
  • 優先度ベースのスケジューリング
  • デッドライン管理
  • リソース使用の最適化

これらの実装例は、実際の開発現場で遭遇する典型的な課題に対する解決策を提供します。ただし、各システムの要件に応じて適切にカスタマイズする必要があります。