【Python】multiprocessing.Queueでプロセス間のデータ受け渡しを行う

目次

概要

Pythonの multiprocessing.Queue は、複数のプロセス間でデータを安全にやり取りするためのFIFO(先入れ先出し)キューです。

threading モジュールのキューと同様のインターフェースを持ちますが、こちらは内部でパイプとロックを使用しており、プロセスセーフ(複数のプロセスから同時にアクセスしてもデータが壊れない)に設計されています。

本記事では、基本的な使い方と、キューを介してデータの競合を防ぐ実装パターンを解説します。

仕様(入出力)

multiprocessing.Queue の基本仕様

  • 初期化: Queue(maxsize=0)
    • maxsize: キューに入れられるアイテム数の上限。0以下の場合は無制限(メモリの許す限り)。

主要メソッド一覧

メソッド構文説明ブロック(待機)
追加q.put(obj, block=True, timeout=None)オブジェクトをキューに追加します。満杯時は空くまで待機します。あり
取得q.get(block=True, timeout=None)オブジェクトをキューから取り出します。空の場合は入るまで待機します。あり
サイズq.qsize()現在のアイテム数を返します。※macOSなど一部OSでは使用不可(NotImplementedError)。なし
空判定q.empty()キューが空なら True を返します。信頼性は低めです。なし
満杯判定q.full()キューが満杯なら True を返します。なし

基本の使い方

プロセスを生成する前にキューを作成し、それを引数として各プロセスに渡します。

from multiprocessing import Queue

# キューの作成
q = Queue()

# データの追加
q.put("Data A")
q.put("Data B")

# データの取得(入れた順に出てくる)
print(q.get())  # -> Data A
print(q.get())  # -> Data B

コード全文

ここでは、キューを「処理権(トークン)」の受け渡し場所として使い、ロック(Lock)を使わずにカウントアップの競合を防ぐコードを紹介します。

キューの中に常に「1つのボール(現在の値)」だけを入れておき、それを各プロセスが奪い合う(取り出したプロセスだけが計算できる)仕組みです。

import multiprocessing
import time
import os

def increment_worker(queue: multiprocessing.Queue, loops: int):
    """
    キューから値を取り出し、加算して戻すワーカープロセス。
    キューの get() がブロッキングするため、自動的に排他制御される。
    """
    pid = os.getpid()
    
    for _ in range(loops):
        # 1. 値を取得(空なら待機=他のプロセスが処理中)
        value = queue.get()
        
        # 2. 計算処理(擬似的な重い処理)
        time.sleep(0.01)
        value += 1
        
        # 3. 更新した値をキューに戻す(次のプロセスが取得可能になる)
        queue.put(value)
        
        # ログ出力(頻繁すぎると見づらいため、適度な間隔で)
        # print(f"[{pid}] updated to {value}")

    print(f"[{pid}] 処理完了")

def main():
    # プロセスセーフなキューを作成
    counter_queue = multiprocessing.Queue()
    
    # 初期値 0 を投入
    counter_queue.put(0)
    
    print("--- 処理開始 ---")
    
    processes = []
    num_processes = 2
    loops = 50
    
    # プロセスを生成して開始
    for _ in range(num_processes):
        p = multiprocessing.Process(
            target=increment_worker,
            args=(counter_queue, loops)
        )
        processes.append(p)
        p.start()
    
    # 全プロセスの終了待機
    for p in processes:
        p.join()
    
    # 最終結果を取り出す
    final_result = counter_queue.get()
    
    print("--- 全処理完了 ---")
    print(f"最終カウンタ値: {final_result}")
    
    expected = num_processes * loops
    if final_result == expected:
        print(f">> 成功: {expected} になりました。")
    else:
        print(f">> 失敗: {expected} ではありません。")

if __name__ == "__main__":
    main()

実行結果例

--- 処理開始 ---
[12345] 処理完了
[12346] 処理完了
--- 全処理完了 ---
最終カウンタ値: 100
>> 成功: 100 になりました。

カスタムポイント

  • タイムアウトの設定
    • q.get(timeout=3) のように秒数を指定することで、無限待機によるフリーズを防げます。タイムアウト時は queue.Empty 例外が発生します。
  • ノンブロッキング操作
    • q.put_nowait(obj)q.get_nowait() を使うと、待機せずに即座に実行を試みます。失敗時は queue.Fullqueue.Empty 例外が出ます。
  • JoinableQueue
    • タスク管理を行いたい場合は multiprocessing.JoinableQueue を使用します。q.task_done()q.join() が使えるようになり、「キュー内の全タスクが完了するまで待つ」処理が可能になります。

注意点

  1. qsize() のプラットフォーム依存
    • q.qsize() は macOS (Mac OS X) など一部のプラットフォームでは実装されておらず、呼び出すと NotImplementedError が発生します。移植性を重視するコードでは使用を避けてください。
  2. デッドロック
    • キューが満杯の状態で put() しようとしたプロセスと、空の状態で get() しようとしたプロセスが互いに待ち合うような構造を作らないよう注意が必要です。
    • 特に、大量のデータを put したままプロセスを join すると、バッファが溢れてデッドロックすることがあります(コンシューマーがデータを吸い出す必要があります)。
  3. 転送コスト
    • プロセス間のデータ転送は pickle(直列化)を使って行われます。巨大なデータ構造を頻繁にキューに入れると、シリアライズ/デシリアライズのオーバーヘッドで速度が低下します。

応用

Producer-Consumer(生産者・消費者)パターンの基本形です。

生産者がデータを生成してキューに入れ、消費者がそれを取り出して処理します。終了シグナルとして None を送るテクニックを使用しています。

import multiprocessing
import time

def producer(q):
    for i in range(5):
        data = f"Job-{i}"
        print(f"[Prod] 生成: {data}")
        q.put(data)
        time.sleep(0.5)
    # 終了シグナル
    q.put(None)

def consumer(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f"[Cons] 処理: {data}")

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    
    p1.start(); p2.start()
    p1.join(); p2.join()

まとめ

multiprocessing.Queue は、複雑になりがちなプロセス間通信をシンプルに解決します。

  • 向く場面: データのバッファリング、生産者・消費者モデルの実装、処理結果の集約。
  • 変更ポイント: タイムアウトや maxsize を適切に設定し、システムの安定性を高めてください。
  • 注意点: macOSでの qsize() 非対応や、ピクル化可能なオブジェクトしか送れない点に留意しましょう。

共有メモリ (Value, Array) よりも直感的で安全に扱えるため、まずは Queue での設計を検討することをお勧めします。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次