目次
概要
asyncio.Queue の put() や get() メソッドは、キューが満杯または空の場合、条件が満たされるまで無期限に待機(ブロック)し続けます。 しかし、実際のアプリケーションでは「一定時間データが来なければ諦める」「書き込めなければエラーにする」といった制御が必要です。 本記事では、標準の asyncio.wait_for() 関数を使用して、キュー操作にタイムアウト制限を設ける方法を解説します。
仕様(入出力)
- 入力:
- サイズ制限のある
asyncio.Queue。 - タイムアウト時間(秒)。
- サイズ制限のある
- 出力:
- 制限時間内に処理が完了すれば、通常通り値を返却(または格納完了)。
- 制限時間を超えた場合、
asyncio.TimeoutErrorを捕捉してエラーメッセージを表示。
- 動作仕様:
wait_for(coroutine, timeout)でラップすることで、指定秒数経過後にコルーチンを強制キャンセルし、例外を送出させる。
基本の使い方
await queue.get() や await queue.put() を asyncio.wait_for の第一引数に渡します。
try:
# 3秒待っても取得できなければ TimeoutError が発生
item = await asyncio.wait_for(queue.get(), timeout=3.0)
except asyncio.TimeoutError:
print("時間切れです")
コード全文
「満杯で書き込めない場合(Put Timeout)」と「空で読み出せない場合(Get Timeout)」の2つのシナリオを検証するコードです。
import asyncio
async def test_put_timeout():
"""
キューが満杯の状態で put しようとしてタイムアウトする例
"""
print("\n--- Put Timeout Test ---")
# サイズ1のキューを作成
q = asyncio.Queue(maxsize=1)
# 1つ目を入れる(これは即座に成功する)
await q.put("Item 1")
print("1つ目のアイテムを追加しました。キューは満杯です。")
try:
print("2つ目のアイテムを追加しようとしています(待機上限: 1.5秒)...")
# 満杯なのでここで待機が発生する
await asyncio.wait_for(q.put("Item 2"), timeout=1.5)
except asyncio.TimeoutError:
print(">> タイムアウト: 制限時間内にキューの空きが出ませんでした。")
else:
print(">> 成功: アイテムを追加できました。")
async def test_get_timeout():
"""
キューが空の状態で get しようとしてタイムアウトする例
"""
print("\n--- Get Timeout Test ---")
q = asyncio.Queue()
# 何も入っていない状態で取得を試みる
try:
print("アイテムを取得しようとしています(待機上限: 1.5秒)...")
item = await asyncio.wait_for(q.get(), timeout=1.5)
print(f">> 成功: {item} を取得しました。")
except asyncio.TimeoutError:
print(">> タイムアウト: 制限時間内にアイテムが届きませんでした。")
async def main():
# 書き込みタイムアウトの検証
await test_put_timeout()
# 読み込みタイムアウトの検証
await test_get_timeout()
if __name__ == "__main__":
asyncio.run(main())
実行結果例
--- Put Timeout Test ---
1つ目のアイテムを追加しました。キューは満杯です。
2つ目のアイテムを追加しようとしています(待機上限: 1.5秒)...
>> タイムアウト: 制限時間内にキューの空きが出ませんでした。
--- Get Timeout Test ---
アイテムを取得しようとしています(待機上限: 1.5秒)...
>> タイムアウト: 制限時間内にアイテムが届きませんでした。
カスタムポイント
- 例外処理後の振る舞い
TimeoutErrorをキャッチした後、リトライするのか、デフォルト値を使うのか、処理全体を中断するのかを要件に合わせて記述します。
- タイムアウト時間の定数化
TIMEOUT_SECONDS = 5.0のように定数で管理すると、システム全体のレスポンス仕様を変更しやすくなります。
注意点
- get_nowait / put_nowait との違い
q.get_nowait()は「1秒も待たず、即座に例外(QueueEmpty)を出す」メソッドです。wait_forは「指定時間は待つが、それでもダメなら例外を出す」メソッドです。少し待てばデータが来る可能性がある場合はwait_forが適しています。
- タスクのキャンセル
asyncio.TimeoutErrorが発生した時点で、待機していたq.put()やq.get()の処理はキャンセルされます。- これは「途中まで入れた」のような中途半端な状態にはならず、操作自体がなかったことになるため、データ整合性は保たれます。
- 負のタイムアウト値
timeoutに0や負の値を指定すると、get_nowait()とほぼ同じ挙動(即時タイムアウト)になります。
応用
「データが取得できればそれを使い、タイムアウトしたらデフォルト値(Noneなど)を採用して処理を継続する」ヘルパー関数の例です。
import asyncio
from typing import Any, Optional
async def get_or_default(queue: asyncio.Queue, timeout: float, default: Any = None) -> Any:
"""
指定時間内にキューから取得できればその値を、
できなければデフォルト値を返す関数。
"""
try:
return await asyncio.wait_for(queue.get(), timeout)
except asyncio.TimeoutError:
return default
async def main_application():
q = asyncio.Queue()
# データが来ないので None が返る
result = await get_or_default(q, timeout=1.0, default="No Data")
print(f"結果: {result}")
# 先にデータを入れておく
await q.put("Valid Data")
# データがあるので即座に返る
result = await get_or_default(q, timeout=1.0)
print(f"結果: {result}")
if __name__ == "__main__":
asyncio.run(main_application())
まとめ
asyncio.wait_for と asyncio.Queue を組み合わせることで、堅牢な非同期フローを構築できます。
- 向く場面: 外部システムからの応答待ち、バッファが空くのを待つ際のデッドロック防止。
- 変更ポイント: タイムアウト時間は、処理の重要度や許容できる遅延に合わせて調整してください。
- 注意点:
TimeoutErrorはasyncioモジュールのものを使用してください(Python標準のTimeoutErrorとは異なる場合があるためimport asyncio経由が確実です)。
無限待機によるアプリケーションのフリーズを防ぐため、外部要因に依存するキュー操作には適切なタイムアウトを設定しましょう。
