目次
概要
Pythonの multiprocessing.Queue は、複数のプロセス間でデータを安全にやり取りするためのFIFO(先入れ先出し)キューです。
threading モジュールのキューと同様のインターフェースを持ちますが、こちらは内部でパイプとロックを使用しており、プロセスセーフ(複数のプロセスから同時にアクセスしてもデータが壊れない)に設計されています。
本記事では、基本的な使い方と、キューを介してデータの競合を防ぐ実装パターンを解説します。
仕様(入出力)
multiprocessing.Queue の基本仕様
- 初期化:
Queue(maxsize=0)maxsize: キューに入れられるアイテム数の上限。0以下の場合は無制限(メモリの許す限り)。
主要メソッド一覧
| メソッド | 構文 | 説明 | ブロック(待機) |
| 追加 | q.put(obj, block=True, timeout=None) | オブジェクトをキューに追加します。満杯時は空くまで待機します。 | あり |
| 取得 | q.get(block=True, timeout=None) | オブジェクトをキューから取り出します。空の場合は入るまで待機します。 | あり |
| サイズ | q.qsize() | 現在のアイテム数を返します。※macOSなど一部OSでは使用不可(NotImplementedError)。 | なし |
| 空判定 | q.empty() | キューが空なら True を返します。信頼性は低めです。 | なし |
| 満杯判定 | q.full() | キューが満杯なら True を返します。 | なし |
基本の使い方
プロセスを生成する前にキューを作成し、それを引数として各プロセスに渡します。
from multiprocessing import Queue
# キューの作成
q = Queue()
# データの追加
q.put("Data A")
q.put("Data B")
# データの取得(入れた順に出てくる)
print(q.get()) # -> Data A
print(q.get()) # -> Data B
コード全文
ここでは、キューを「処理権(トークン)」の受け渡し場所として使い、ロック(Lock)を使わずにカウントアップの競合を防ぐコードを紹介します。
キューの中に常に「1つのボール(現在の値)」だけを入れておき、それを各プロセスが奪い合う(取り出したプロセスだけが計算できる)仕組みです。
import multiprocessing
import time
import os
def increment_worker(queue: multiprocessing.Queue, loops: int):
"""
キューから値を取り出し、加算して戻すワーカープロセス。
キューの get() がブロッキングするため、自動的に排他制御される。
"""
pid = os.getpid()
for _ in range(loops):
# 1. 値を取得(空なら待機=他のプロセスが処理中)
value = queue.get()
# 2. 計算処理(擬似的な重い処理)
time.sleep(0.01)
value += 1
# 3. 更新した値をキューに戻す(次のプロセスが取得可能になる)
queue.put(value)
# ログ出力(頻繁すぎると見づらいため、適度な間隔で)
# print(f"[{pid}] updated to {value}")
print(f"[{pid}] 処理完了")
def main():
# プロセスセーフなキューを作成
counter_queue = multiprocessing.Queue()
# 初期値 0 を投入
counter_queue.put(0)
print("--- 処理開始 ---")
processes = []
num_processes = 2
loops = 50
# プロセスを生成して開始
for _ in range(num_processes):
p = multiprocessing.Process(
target=increment_worker,
args=(counter_queue, loops)
)
processes.append(p)
p.start()
# 全プロセスの終了待機
for p in processes:
p.join()
# 最終結果を取り出す
final_result = counter_queue.get()
print("--- 全処理完了 ---")
print(f"最終カウンタ値: {final_result}")
expected = num_processes * loops
if final_result == expected:
print(f">> 成功: {expected} になりました。")
else:
print(f">> 失敗: {expected} ではありません。")
if __name__ == "__main__":
main()
実行結果例
--- 処理開始 ---
[12345] 処理完了
[12346] 処理完了
--- 全処理完了 ---
最終カウンタ値: 100
>> 成功: 100 になりました。
カスタムポイント
- タイムアウトの設定
q.get(timeout=3)のように秒数を指定することで、無限待機によるフリーズを防げます。タイムアウト時はqueue.Empty例外が発生します。
- ノンブロッキング操作
q.put_nowait(obj)やq.get_nowait()を使うと、待機せずに即座に実行を試みます。失敗時はqueue.Fullやqueue.Empty例外が出ます。
- JoinableQueue
- タスク管理を行いたい場合は
multiprocessing.JoinableQueueを使用します。q.task_done()とq.join()が使えるようになり、「キュー内の全タスクが完了するまで待つ」処理が可能になります。
- タスク管理を行いたい場合は
注意点
- qsize() のプラットフォーム依存
q.qsize()は macOS (Mac OS X) など一部のプラットフォームでは実装されておらず、呼び出すとNotImplementedErrorが発生します。移植性を重視するコードでは使用を避けてください。
- デッドロック
- キューが満杯の状態で
put()しようとしたプロセスと、空の状態でget()しようとしたプロセスが互いに待ち合うような構造を作らないよう注意が必要です。 - 特に、大量のデータを
putしたままプロセスをjoinすると、バッファが溢れてデッドロックすることがあります(コンシューマーがデータを吸い出す必要があります)。
- キューが満杯の状態で
- 転送コスト
- プロセス間のデータ転送は
pickle(直列化)を使って行われます。巨大なデータ構造を頻繁にキューに入れると、シリアライズ/デシリアライズのオーバーヘッドで速度が低下します。
- プロセス間のデータ転送は
応用
Producer-Consumer(生産者・消費者)パターンの基本形です。
生産者がデータを生成してキューに入れ、消費者がそれを取り出して処理します。終了シグナルとして None を送るテクニックを使用しています。
import multiprocessing
import time
def producer(q):
for i in range(5):
data = f"Job-{i}"
print(f"[Prod] 生成: {data}")
q.put(data)
time.sleep(0.5)
# 終了シグナル
q.put(None)
def consumer(q):
while True:
data = q.get()
if data is None:
break
print(f"[Cons] 処理: {data}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()
まとめ
multiprocessing.Queue は、複雑になりがちなプロセス間通信をシンプルに解決します。
- 向く場面: データのバッファリング、生産者・消費者モデルの実装、処理結果の集約。
- 変更ポイント: タイムアウトや
maxsizeを適切に設定し、システムの安定性を高めてください。 - 注意点: macOSでの
qsize()非対応や、ピクル化可能なオブジェクトしか送れない点に留意しましょう。
共有メモリ (Value, Array) よりも直感的で安全に扱えるため、まずは Queue での設計を検討することをお勧めします。
