スレッド間の通信:promiseとfutureによる安全な値の受け渡し

複数のスレッドを用いて処理を並行して実行する際、あるスレッド(ワーカースレッド)が計算した結果や処理の成否を、別のスレッド(メインスレッド)が受け取りたいという状況は頻繁に発生します。

この記事では、標準ライブラリが提供するpromisefutureという仕組みを使い、スレッド間で安全に一度だけ値や例外を渡すための基本的な方法を解説します。


目次

基本的な役割

promisefutureは、常にペアで機能する、片道通行の通信チャネルを構築します。

promise — 値を約束する側

promiseは、将来的に値を設定することを「約束」するオブジェクトです。通常、値を生成する側のスレッド(ワーカースレッド)がこのオブジェクトの所有権を持ちます。処理が完了したら、promiseオブジェクトを介して結果の値、あるいは発生した例外を設定します。

  • set_value(): 正常に処理が完了した場合に、結果の値を設定します。
  • set_exception(): 処理中にエラーが発生した場合に、例外を設定します。

future — 値を受け取る側

futureは、promiseによって設定される「未来」の値を取得するためのオブジェクトです。通常、結果を待つ側のスレッド(メインスレッド)がこのオブジェクトを保持します。

  • get(): promiseによって値が設定されるまで待機し、設定された値または例外を取得します。

実践的なコード例

以下のコードは、ワーカースレッドがファイルの行数を数え、その結果をメインスレッドに返す処理をシミュレートしたものです。

#include <iostream>
#include <thread>
#include <future>
#include <string>
#include <chrono>
#include <stdexcept>

// ファイル処理をシミュレートし、結果をpromiseを通じて返す関数
void count_file_lines(std::promise<int> line_count_promise, const std::string& file_path) {
    try {
        // 時間のかかるファイルI/Oをシミュレート
        std::this_thread::sleep_for(std::chrono::seconds{2});

        if (file_path == "valid_data.csv") {
            // 1. 処理が成功した場合、結果の値をpromiseに設定
            line_count_promise.set_value(1024);
        } else {
            throw std::runtime_error("ファイルが見つかりません。");
        }
    } catch(...) {
        // 2. エラーが発生した場合、promiseに例外を設定
        line_count_promise.set_exception(std::current_exception());
    }
}

int main() {
    // 3. promiseオブジェクトを作成
    std::promise<int> file_promise;

    // 4. promiseから対となるfutureを取得
    std::future<int> result_future = file_promise.get_future();

    // 5. ワーカースレッドを起動し、promiseの所有権を渡す
    std::thread worker(count_file_lines, std::move(file_promise), "valid_data.csv");

    std::cout << "メインスレッド: ファイル処理をバックグラウンドで開始しました。\n";

    // 6. futureを介してワーカースレッドからの結果を待機し、取得
    try {
        int lines = result_future.get(); // set_valueが呼ばれるまでここで待機
        std::cout << "メインスレッド: 処理完了。行数: " << lines << std::endl;
    } catch (const std::exception& e) {
        std::cout << "メインスレッド: エラーが発生しました: " << e.what() << std::endl;
    }

    // 7. ワーカースレッドの処理が完全に終了するのを待つ
    worker.join();
    return 0;
}

コードの流れの解説

  1. 処理の成功を通知: ワーカースレッドは、処理が成功すると受け取ったpromiseオブジェクトのset_value()を呼び出し、結果の値(1024)を通信チャネルに設定します。
  2. 処理の失敗を通知: ワーカースレッドで例外が発生すると、catchブロックで捕捉され、set_exception()を通じてその例外が通信チャネルに設定されます。
  3. promiseの作成: メインスレッドは、まずpromise<int>型のオブジェクトを作成します。これは「将来的にint型の値を渡す」という約束の起点となります。
  4. futureの取得: 次に、get_future()メソッドを呼び出して、作成したpromiseに紐付けられたfutureオブジェクトを取得します。
  5. promiseの所有権移動: promiseはコピーできないため、std::moveを用いてワーカースレッドに所有権を完全に渡します。これにより、ワーカースレッドだけが値を設定できるようになります。
  6. 結果の待機と取得: メインスレッドはfutureオブジェクトのget()メソッドを呼び出します。この呼び出しは、ワーカースレッドがset_value()またはset_exception()を呼び出すまで処理をブロック(待機)します。結果が設定されると、get()はその値または例外を返します。
  7. スレッドの合流: メインスレッドは、ワーカースレッドがその処理をすべて完了し、安全に終了できるようにjoin()で待機します。

まとめ

promisefutureは、スレッドを直接管理する際に、スレッド間で結果を安全に受け渡すための基本的な部品です。この仕組みを利用することで、非同期処理の結果を同期的、かつ安全に待機するコードを明確に記述することができます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次