【Python】asyncio.Queueの待機処理にタイムアウトを設定する

目次

概要

asyncio.Queueput()get() メソッドは、キューが満杯または空の場合、条件が満たされるまで無期限に待機(ブロック)し続けます。 しかし、実際のアプリケーションでは「一定時間データが来なければ諦める」「書き込めなければエラーにする」といった制御が必要です。 本記事では、標準の asyncio.wait_for() 関数を使用して、キュー操作にタイムアウト制限を設ける方法を解説します。

仕様(入出力)

  • 入力:
    • サイズ制限のある asyncio.Queue
    • タイムアウト時間(秒)。
  • 出力:
    • 制限時間内に処理が完了すれば、通常通り値を返却(または格納完了)。
    • 制限時間を超えた場合、asyncio.TimeoutError を捕捉してエラーメッセージを表示。
  • 動作仕様:
    • wait_for(coroutine, timeout) でラップすることで、指定秒数経過後にコルーチンを強制キャンセルし、例外を送出させる。

基本の使い方

await queue.get()await queue.put()asyncio.wait_for の第一引数に渡します。

try:
    # 3秒待っても取得できなければ TimeoutError が発生
    item = await asyncio.wait_for(queue.get(), timeout=3.0)
except asyncio.TimeoutError:
    print("時間切れです")

コード全文

「満杯で書き込めない場合(Put Timeout)」と「空で読み出せない場合(Get Timeout)」の2つのシナリオを検証するコードです。

import asyncio

async def test_put_timeout():
    """
    キューが満杯の状態で put しようとしてタイムアウトする例
    """
    print("\n--- Put Timeout Test ---")
    # サイズ1のキューを作成
    q = asyncio.Queue(maxsize=1)
    
    # 1つ目を入れる(これは即座に成功する)
    await q.put("Item 1")
    print("1つ目のアイテムを追加しました。キューは満杯です。")

    try:
        print("2つ目のアイテムを追加しようとしています(待機上限: 1.5秒)...")
        # 満杯なのでここで待機が発生する
        await asyncio.wait_for(q.put("Item 2"), timeout=1.5)
        
    except asyncio.TimeoutError:
        print(">> タイムアウト: 制限時間内にキューの空きが出ませんでした。")
    else:
        print(">> 成功: アイテムを追加できました。")

async def test_get_timeout():
    """
    キューが空の状態で get しようとしてタイムアウトする例
    """
    print("\n--- Get Timeout Test ---")
    q = asyncio.Queue()
    
    # 何も入っていない状態で取得を試みる
    try:
        print("アイテムを取得しようとしています(待機上限: 1.5秒)...")
        item = await asyncio.wait_for(q.get(), timeout=1.5)
        print(f">> 成功: {item} を取得しました。")
        
    except asyncio.TimeoutError:
        print(">> タイムアウト: 制限時間内にアイテムが届きませんでした。")

async def main():
    # 書き込みタイムアウトの検証
    await test_put_timeout()
    
    # 読み込みタイムアウトの検証
    await test_get_timeout()

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

--- Put Timeout Test ---
1つ目のアイテムを追加しました。キューは満杯です。
2つ目のアイテムを追加しようとしています(待機上限: 1.5秒)...
>> タイムアウト: 制限時間内にキューの空きが出ませんでした。

--- Get Timeout Test ---
アイテムを取得しようとしています(待機上限: 1.5秒)...
>> タイムアウト: 制限時間内にアイテムが届きませんでした。

カスタムポイント

  • 例外処理後の振る舞い
    • TimeoutError をキャッチした後、リトライするのか、デフォルト値を使うのか、処理全体を中断するのかを要件に合わせて記述します。
  • タイムアウト時間の定数化
    • TIMEOUT_SECONDS = 5.0 のように定数で管理すると、システム全体のレスポンス仕様を変更しやすくなります。

注意点

  1. get_nowait / put_nowait との違い
    • q.get_nowait() は「1秒も待たず、即座に例外(QueueEmpty)を出す」メソッドです。
    • wait_for は「指定時間は待つが、それでもダメなら例外を出す」メソッドです。少し待てばデータが来る可能性がある場合は wait_for が適しています。
  2. タスクのキャンセル
    • asyncio.TimeoutError が発生した時点で、待機していた q.put()q.get() の処理はキャンセルされます。
    • これは「途中まで入れた」のような中途半端な状態にはならず、操作自体がなかったことになるため、データ整合性は保たれます。
  3. 負のタイムアウト値
    • timeout0 や負の値を指定すると、get_nowait() とほぼ同じ挙動(即時タイムアウト)になります。

応用

「データが取得できればそれを使い、タイムアウトしたらデフォルト値(Noneなど)を採用して処理を継続する」ヘルパー関数の例です。

import asyncio
from typing import Any, Optional

async def get_or_default(queue: asyncio.Queue, timeout: float, default: Any = None) -> Any:
    """
    指定時間内にキューから取得できればその値を、
    できなければデフォルト値を返す関数。
    """
    try:
        return await asyncio.wait_for(queue.get(), timeout)
    except asyncio.TimeoutError:
        return default

async def main_application():
    q = asyncio.Queue()
    
    # データが来ないので None が返る
    result = await get_or_default(q, timeout=1.0, default="No Data")
    print(f"結果: {result}")
    
    # 先にデータを入れておく
    await q.put("Valid Data")
    # データがあるので即座に返る
    result = await get_or_default(q, timeout=1.0)
    print(f"結果: {result}")

if __name__ == "__main__":
    asyncio.run(main_application())

まとめ

asyncio.wait_forasyncio.Queue を組み合わせることで、堅牢な非同期フローを構築できます。

  • 向く場面: 外部システムからの応答待ち、バッファが空くのを待つ際のデッドロック防止。
  • 変更ポイント: タイムアウト時間は、処理の重要度や許容できる遅延に合わせて調整してください。
  • 注意点: TimeoutErrorasyncio モジュールのものを使用してください(Python標準の TimeoutError とは異なる場合があるため import asyncio 経由が確実です)。

無限待機によるアプリケーションのフリーズを防ぐため、外部要因に依存するキュー操作には適切なタイムアウトを設定しましょう。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次