目次
概要
multiprocessing.Queue はプロセスセーフで便利ですが、デフォルトの put() や get() は、条件が満たされるまで(空きができるまで、またはデータが入るまで)無限に待機します。
システムがフリーズしたように見えるのを防ぐため、timeout 引数を指定して「一定時間待ってもダメなら諦める(例外を出す)」という処理を実装することが重要です。
本記事では、タイムアウトの設定方法と、発生する例外(queue.Full, queue.Empty)の正しい捕捉方法を解説します。
仕様(入出力)
- 入力:
maxsizeが設定されたキュー。- 待機時間(秒数)。
- 出力:
- 成功時: データの追加または取得。
- 失敗時(時間切れ):
queue.Fullまたはqueue.Empty例外の送出。
- 前提:
- 例外クラスを使用するために、標準ライブラリの
queueモジュールをインポートする必要があります。
- 例外クラスを使用するために、標準ライブラリの
メソッド仕様
| メソッド | 引数例 | 動作 | 発生する例外 |
| put | q.put(val, timeout=2) | 2秒待ってもキューに空きができなければ例外。 | queue.Full |
| get | q.get(timeout=2) | 2秒待ってもデータが来なければ例外。 | queue.Empty |
基本の使い方
必ず import queue を行い、try-except ブロックで囲んで使用します。
import multiprocessing
import queue # 例外クラス用
q = multiprocessing.Queue(maxsize=1)
# --- 書き込み(Put) ---
try:
# 満杯なら3秒待つ
q.put("Data", timeout=3)
except queue.Full:
print("書き込みタイムアウト")
# --- 読み込み(Get) ---
try:
# 空なら3秒待つ
data = q.get(timeout=3)
except queue.Empty:
print("読み込みタイムアウト")
コード全文
「満杯で書き込めないケース」と「空で読み込めないケース」の両方を検証するコードです。
Python
import multiprocessing
import queue
import time
def put_worker(q):
"""キューにデータを入れ続けるワーカー"""
try:
# 1つ目は入る
q.put("Item 1", timeout=1)
print("[PutWorker] Item 1 追加成功")
# 2つ目は満杯なので待機が発生し、タイムアウトするはず
print("[PutWorker] Item 2 追加待機中...")
q.put("Item 2", timeout=2)
print("[PutWorker] Item 2 追加成功")
except queue.Full:
print(">> [PutWorker] エラー: タイムアウト(キューが満杯です)")
def get_worker(q):
"""キューからデータを取り出し続けるワーカー"""
try:
# 何も入っていない状態で取得を試みる
print("[GetWorker] データ取得待機中...")
data = q.get(timeout=2)
print(f"[GetWorker] 取得成功: {data}")
except queue.Empty:
print(">> [GetWorker] エラー: タイムアウト(データが来ませんでした)")
def main():
print("--- 1. Put Timeout Test ---")
# サイズ1のキューを作成
q_full = multiprocessing.Queue(maxsize=1)
# ワーカープロセスで実行
p1 = multiprocessing.Process(target=put_worker, args=(q_full,))
p1.start()
p1.join()
print("\n--- 2. Get Timeout Test ---")
# 空のキューを作成
q_empty = multiprocessing.Queue()
p2 = multiprocessing.Process(target=get_worker, args=(q_empty,))
p2.start()
p2.join()
if __name__ == "__main__":
main()
実行結果例
--- 1. Put Timeout Test ---
[PutWorker] Item 1 追加成功
[PutWorker] Item 2 追加待機中...
>> [PutWorker] エラー: タイムアウト(キューが満杯です)
--- 2. Get Timeout Test ---
[GetWorker] データ取得待機中...
>> [GetWorker] エラー: タイムアウト(データが来ませんでした)
カスタムポイント
- ノンブロッキング (
block=False)timeout=0と同じ意味ですが、明示的に「待たない」ことを示したい場合はq.put_nowait(obj)やq.get_nowait()メソッドを使用します。これらも同様にqueue.Full/queue.Emptyを送出します。
- リトライ処理
- タイムアウトが発生した場合に、ログを出力してから再試行するループ構造にすることも可能です。
注意点
- 例外クラスの場所
- よくある間違いとして、
multiprocessing.Fullやmultiprocessing.Queue.Fullと書いてしまうことがあります。 - 正しくは標準モジュール
queueのqueue.Fullとqueue.Emptyです。
- よくある間違いとして、
- タイムアウトの精度
- タイムアウト時間は概算です。システム負荷が高い場合、指定した時間を多少超過してから例外が発生することがあります。
- デッドロック回避
- タイムアウトを設定することで、相互待ちによる完全なデッドロック(永久停止)は回避できますが、処理自体は失敗するため、その後のリカバリ処理(再送やエラー終了)を記述する必要があります。
応用
「データが取れたらラッキー、取れなければデフォルト値を使う」という安全な取得関数の例です。
import multiprocessing
import queue
def safe_get(q, timeout=0.5, default=None):
try:
return q.get(timeout=timeout)
except queue.Empty:
return default
if __name__ == "__main__":
q = multiprocessing.Queue()
# データがないのでデフォルト値が返る
val = safe_get(q, default="No Data")
print(f"結果: {val}")
まとめ
timeout を適切に設定することで、堅牢なマルチプロセスアプリケーションを作成できます。
- 向く場面: 相手プロセスの応答保証がない場合、フリーズを絶対に避けたい場合。
- 変更ポイント: 必ず
import queueを行い、適切な例外をキャッチしてください。 - 注意点: タイムアウトは「異常系」として扱うか、「フローの一部」として扱うか(ポーリングなど)を設計時に決めておきましょう。
無限待機はバグの温床になりやすいため、本番運用するコードでは原則としてタイムアウトを設定することを推奨します。
