【Python】multiprocessing.Queueでタイムアウト付きの読み書きを行う

目次

概要

multiprocessing.Queue はプロセスセーフで便利ですが、デフォルトの put() や get() は、条件が満たされるまで(空きができるまで、またはデータが入るまで)無限に待機します。

システムがフリーズしたように見えるのを防ぐため、timeout 引数を指定して「一定時間待ってもダメなら諦める(例外を出す)」という処理を実装することが重要です。

本記事では、タイムアウトの設定方法と、発生する例外(queue.Full, queue.Empty)の正しい捕捉方法を解説します。

仕様(入出力)

  • 入力:
    • maxsize が設定されたキュー。
    • 待機時間(秒数)。
  • 出力:
    • 成功時: データの追加または取得。
    • 失敗時(時間切れ): queue.Full または queue.Empty 例外の送出。
  • 前提:
    • 例外クラスを使用するために、標準ライブラリの queue モジュールをインポートする必要があります。

メソッド仕様

メソッド引数例動作発生する例外
putq.put(val, timeout=2)2秒待ってもキューに空きができなければ例外。queue.Full
getq.get(timeout=2)2秒待ってもデータが来なければ例外。queue.Empty

基本の使い方

必ず import queue を行い、try-except ブロックで囲んで使用します。

import multiprocessing
import queue  # 例外クラス用

q = multiprocessing.Queue(maxsize=1)

# --- 書き込み(Put) ---
try:
    # 満杯なら3秒待つ
    q.put("Data", timeout=3)
except queue.Full:
    print("書き込みタイムアウト")

# --- 読み込み(Get) ---
try:
    # 空なら3秒待つ
    data = q.get(timeout=3)
except queue.Empty:
    print("読み込みタイムアウト")

コード全文

「満杯で書き込めないケース」と「空で読み込めないケース」の両方を検証するコードです。

Python

import multiprocessing
import queue
import time

def put_worker(q):
    """キューにデータを入れ続けるワーカー"""
    try:
        # 1つ目は入る
        q.put("Item 1", timeout=1)
        print("[PutWorker] Item 1 追加成功")
        
        # 2つ目は満杯なので待機が発生し、タイムアウトするはず
        print("[PutWorker] Item 2 追加待機中...")
        q.put("Item 2", timeout=2)
        print("[PutWorker] Item 2 追加成功")
        
    except queue.Full:
        print(">> [PutWorker] エラー: タイムアウト(キューが満杯です)")

def get_worker(q):
    """キューからデータを取り出し続けるワーカー"""
    try:
        # 何も入っていない状態で取得を試みる
        print("[GetWorker] データ取得待機中...")
        data = q.get(timeout=2)
        print(f"[GetWorker] 取得成功: {data}")
        
    except queue.Empty:
        print(">> [GetWorker] エラー: タイムアウト(データが来ませんでした)")

def main():
    print("--- 1. Put Timeout Test ---")
    # サイズ1のキューを作成
    q_full = multiprocessing.Queue(maxsize=1)
    
    # ワーカープロセスで実行
    p1 = multiprocessing.Process(target=put_worker, args=(q_full,))
    p1.start()
    p1.join()

    print("\n--- 2. Get Timeout Test ---")
    # 空のキューを作成
    q_empty = multiprocessing.Queue()
    
    p2 = multiprocessing.Process(target=get_worker, args=(q_empty,))
    p2.start()
    p2.join()

if __name__ == "__main__":
    main()

実行結果例

--- 1. Put Timeout Test ---
[PutWorker] Item 1 追加成功
[PutWorker] Item 2 追加待機中...
>> [PutWorker] エラー: タイムアウト(キューが満杯です)

--- 2. Get Timeout Test ---
[GetWorker] データ取得待機中...
>> [GetWorker] エラー: タイムアウト(データが来ませんでした)

カスタムポイント

  • ノンブロッキング (block=False)
    • timeout=0 と同じ意味ですが、明示的に「待たない」ことを示したい場合は q.put_nowait(obj)q.get_nowait() メソッドを使用します。これらも同様に queue.Full / queue.Empty を送出します。
  • リトライ処理
    • タイムアウトが発生した場合に、ログを出力してから再試行するループ構造にすることも可能です。

注意点

  1. 例外クラスの場所
    • よくある間違いとして、multiprocessing.Fullmultiprocessing.Queue.Full と書いてしまうことがあります。
    • 正しくは標準モジュール queuequeue.Fullqueue.Empty です。
  2. タイムアウトの精度
    • タイムアウト時間は概算です。システム負荷が高い場合、指定した時間を多少超過してから例外が発生することがあります。
  3. デッドロック回避
    • タイムアウトを設定することで、相互待ちによる完全なデッドロック(永久停止)は回避できますが、処理自体は失敗するため、その後のリカバリ処理(再送やエラー終了)を記述する必要があります。

応用

「データが取れたらラッキー、取れなければデフォルト値を使う」という安全な取得関数の例です。

import multiprocessing
import queue

def safe_get(q, timeout=0.5, default=None):
    try:
        return q.get(timeout=timeout)
    except queue.Empty:
        return default

if __name__ == "__main__":
    q = multiprocessing.Queue()
    
    # データがないのでデフォルト値が返る
    val = safe_get(q, default="No Data")
    print(f"結果: {val}")

まとめ

timeout を適切に設定することで、堅牢なマルチプロセスアプリケーションを作成できます。

  • 向く場面: 相手プロセスの応答保証がない場合、フリーズを絶対に避けたい場合。
  • 変更ポイント: 必ず import queue を行い、適切な例外をキャッチしてください。
  • 注意点: タイムアウトは「異常系」として扱うか、「フローの一部」として扱うか(ポーリングなど)を設計時に決めておきましょう。

無限待機はバグの温床になりやすいため、本番運用するコードでは原則としてタイムアウトを設定することを推奨します。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次