スレッド間の連携を円滑に:condition_variableによる待機と通知の作法

マルチスレッドのプログラムでは、あるスレッドが特定の条件が満たされるまで処理を待機し、別のスレッドがその条件を満たしたときに通知を送る、という連携が必要になる場面が多くあります。

この記事では、そのようなスレッド間のイベント待機を効率的に実現するための仕組みである**condition_variable**について、その基本的な考え方と安全な使い方を解説します。


目次

condition_variableの仕組み

condition_variableは、非効率なビジーループ(条件を繰り返しチェックし続けること)を避けるための仕組みです。スレッドは条件が満たされるまでCPUを消費せずにスリープし、他のスレッドからの通知によって目覚めることができます。

この仕組みは、常に以下の3つの要素と連携して動作します。

  1. 共有データ: スレッド間で共有され、待機条件そのものを表す変数(例: キューのサイズ、フラグなど)。
  2. Mutex: 共有データを複数のスレッドから安全に読み書きするために保護するロック。
  3. condition_variable: スレッドをスリープさせたり、起床させたりするための通知機構。

実践:スレッドセーフなキューの実装

condition_variableの最も典型的な使用例が、生産者消費者問題です。以下のコードは、一方のスレッド(生産者)がデータをキューに追加し、もう一方のスレッド(消費者)がデータを取り出す、というスレッドセーフなキューを実装したものです。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <vector>
#include <string>

// スレッドセーフな有界キュー
template <typename T>
class BlockingQueue {
public:
    explicit BlockingQueue(size_t max_size) : capacity_{max_size} {}

    // キューにデータを追加する(生産者側)
    void push(T new_item) {
        std::unique_lock<std::mutex> lock{mtx_};
        // キューが満杯でない、という条件が満たされるまで待機する
        cv_can_push_.wait(lock, [this] { return queue_.size() < capacity_; });
        
        queue_.push_back(std::move(new_item));
        
        // アイテムが追加されたことを、待機しているかもしれない消費者に通知する
        cv_can_pop_.notify_one();
    }

    // キューからデータを取り出す(消費者側)
    T pop() {
        std::unique_lock<std::mutex> lock{mtx_};
        // キューが空でない、という条件が満たされるまで待機する
        cv_can_pop_.wait(lock, [this] { return !queue_.empty(); });
        
        T item = std::move(queue_.front());
        queue_.pop_front();
        
        // キューに空きができたことを、待機しているかもしれない生産者に通知する
        cv_can_push_.notify_one();
        return item;
    }

private:
    std::mutex mtx_;
    std::condition_variable cv_can_push_; // 生産者が待機・通知を受けるための変数
    std::condition_variable cv_can_pop_;  // 消費者が待機・通知を受けるための変数
    std::deque<T> queue_;
    size_t capacity_;
};

// 生産者スレッドの処理
void producer(BlockingQueue<std::string>& bq) {
    for (int i = 0; i < 5; ++i) {
        std::string item = "Item " + std::to_string(i);
        std::cout << "[Producer] Pushing " << item << std::endl;
        bq.push(item);
        std::this_thread::sleep_for(std::chrono::milliseconds{200});
    }
}

int main() {
    BlockingQueue<std::string> bq(2); // 最大容量2のキュー

    // 生産者スレッドを起動
    std::thread producer_thread(producer, std::ref(bq));

    // メインスレッドが消費者として5つのアイテムを取り出す
    for (int i = 0; i < 5; ++i) {
        std::cout << "[Consumer] Popping item..." << std::endl;
        std::string item = bq.pop();
        std::cout << "[Consumer] Popped: " << item << std::endl;
    }

    producer_thread.join();
    return 0;
}

コードのポイント解説

std::unique_lockの必要性

condition_variablewait関数は、引数としてstd::unique_lockを要求します。これは、wait関数が内部でロックを一時的に解除し、通知を受けて起床した際に再び再取得するという複雑な動作を行うためです。単純なstd::lock_guardにはこの機能がありません。

述語付きのwaitとスプリアスウェイクアップ

waitを呼び出す際は、必ず**述語(待機条件をチェックするラムダ式や関数)**を第二引数に渡すべきです。

// キューが満杯でない、という条件が満たされるまで待機
cv_can_push_.wait(lock, [this] { return queue_.size() < capacity_; });

これは、スプリアスウェイクアップ(通知がないのにスレッドが稀に起床してしまう現象)対策として不可欠です。述語付きのwaitは、起床した際に必ず述語を再評価し、条件が偽であれば(スプリアスウェイクアップであれば)再びスリープ状態に戻ります。

notify_one()による通知

notify_one()は、この条件変数で待機しているスレッドのうち、最大で1つを起床させます。今回の例では、一つのアイテムが追加されても、それを取り出せる消費者は一人で十分なため、notify_one()が効率的です。もし複数のスレッドを一度に起床させたい場合はnotify_all()を使用します。


まとめ

condition_variableは、スレッド間で効率的な待機と通知を実現するための標準的な仕組みです。これを利用することで、CPUリソースを無駄に消費することなく、スレッド間の協調動作を必要とする複雑なロジックを安全に構築できます。

利用する際は、以下の2つの原則を必ず守ってください。

  1. 待機条件となる共有データは、必ずMutexで保護する。
  2. wait関数は、スプリアスウェイクアップに備えて必ず述語と共に使用する。
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次