概要
asyncio.Queue は、非同期処理(コルーチン)間でデータをやり取りするためのFIFO(先入れ先出し)キューです。
リストなどの通常のデータ構造とは異なり、操作がコルーチンセーフ(非同期処理において安全)に設計されています。これにより、ロック(Lock)を明示的に使わなくても、複数のタスクから同時にデータの追加や取り出しを行ってもデータの整合性が保たれます。
本記事では、基本的なメソッドの使い方と、キューを利用して変数の競合を防ぐパターンを紹介します。
仕様(入出力)
- 入力:
- キューの最大サイズ(
maxsize)。0以下なら無限。 - キューに入れる任意のオブジェクト。
- キューの最大サイズ(
- 出力:
- キューから取り出したオブジェクト。
- 現在のキューの格納数などの状態。
主要メソッド一覧
| メソッド | 構文 | 説明 | ブロック(待機) |
| 追加 | await queue.put(item) | アイテムをキューの最後尾に追加します。キューが満杯の場合は空きができるまで待機します。 | あり |
| 取得 | await queue.get() | アイテムをキューの先頭から取得して削除します。キューが空の場合はアイテムが入るまで待機します。 | あり |
| サイズ | queue.qsize() | 現在キューに入っているアイテム数を返します。 | なし |
| 空判定 | queue.empty() | キューが空であれば True を返します。 | なし |
| 満杯判定 | queue.full() | キューが満杯であれば True を返します。 | なし |
基本の使い方
まずは単純にデータを入れ、順番に取り出す基本的な動作です。
put と get は非同期メソッドなので await が必要です。
import asyncio
async def main():
# キューの作成
q = asyncio.Queue()
print("--- データの追加 ---")
await q.put("Data A")
await q.put("Data B")
print(f"現在のサイズ: {q.qsize()}")
print("--- データの取得 ---")
# 入れた順に出てきます(FIFO)
item1 = await q.get()
print(f"取得: {item1}")
item2 = await q.get()
print(f"取得: {item2}")
# empty判定
if q.empty():
print("キューは空です")
if __name__ == "__main__":
asyncio.run(main())
コード全文
ここでは、asyncio.Lock を使わずに、キュー自体を状態の受け渡し場所として使うことで、計算の競合(レースコンディション)を防ぐ例を紹介します。
キューの中に常に「最新のカウンタ値」というボールが1つだけ入っており、それを2つのタスクが奪い合って更新して戻すイメージです。
import asyncio
async def increment_worker(queue: asyncio.Queue, worker_name: str, count: int):
"""
キューから現在の値を取り出し、加算して戻す処理を繰り返すワーカー。
Args:
queue (asyncio.Queue): 共有のキュー
worker_name (str): ログ用識別子
count (int): ループ回数
"""
for _ in range(count):
# 1. キューから値を取り出す(空なら待機)
# ここで「処理権」を獲得したことになる
value = await queue.get()
# 2. 何らかの処理(コンテキストスイッチを挟む)
await asyncio.sleep(0.01)
# 3. 加算して新しい値をキューに戻す
new_value = value + 1
await queue.put(new_value)
# デバッグ出力(量が多いため省略可)
# print(f"[{worker_name}] updated to {new_value}")
async def main():
# 共有キューの作成
# このパターンでは常に1つの値しか入らないため maxsize は意識しなくて良い
queue = asyncio.Queue()
# 初期値 0 を投入
await queue.put(0)
print("--- インクリメント競走開始 ---")
# 2つのタスクで合計100回 (50 + 50) インクリメントを実行
# ロックを使わずとも、queue.get() で直列化されるため競合しない
await asyncio.gather(
increment_worker(queue, "Worker-A", 50),
increment_worker(queue, "Worker-B", 50)
)
# 最終結果を取り出す
final_result = await queue.get()
print("--- 処理完了 ---")
print(f"最終カウンタ値: {final_result} (期待値: 100)")
if __name__ == "__main__":
asyncio.run(main())
実行結果例
Plaintext
--- インクリメント競走開始 ---
--- 処理完了 ---
最終カウンタ値: 100 (期待値: 100)
カスタムポイント
- maxsize の設定
q = asyncio.Queue(maxsize=10)のようにサイズ制限を設けることで、メモリの使いすぎを防いだり、生産者(Producer)と消費者(Consumer)の速度差を制御(バックプレッシャー)したりできます。
- タスク完了の管理 (
task_done)- コンシューマーパターンでは、処理が終わったら
queue.task_done()を呼び、メイン側でawait queue.join()を使うことで、「キューに入れた全タスクが処理されるまで待つ」という制御が可能です。
- コンシューマーパターンでは、処理が終わったら
- タイムアウト付き取得
await asyncio.wait_for(q.get(), timeout=1.0)を使うことで、一定時間データが来なければ諦める処理を実装できます。
注意点
- スレッドセーフではない
asyncio.Queueはあくまで asyncioのイベントループ内でのみ 安全です。threadingモジュールを使った別スレッドからput/getする場合はjanusライブラリなどのスレッドセーフな非同期キューを使用するか、call_soon_threadsafeを介する必要があります。
- デッドロック
maxsizeが設定されている状態で、キューが満杯の時にputするとブロックされます。取り出す側のタスクが動いていないと、永久に待ち続けることになります。
- get_nowait / put_nowait
- 待機せずに即座に処理する
q.get_nowait()などもありますが、キューが空/満杯の場合に即座に例外(asyncio.QueueEmpty/asyncio.QueueFull)を送出します。例外処理が必要です。
- 待機せずに即座に処理する
応用
典型的な「生産者・消費者(Producer-Consumer)パターン」の実装例です。
生産者がジョブを作り、複数の消費者が並行して処理します。task_done と join を使用しています。
import asyncio
import random
async def producer(queue: asyncio.Queue, n: int):
"""ジョブを生成してキューに入れる"""
for i in range(n):
item = f"Job-{i}"
await queue.put(item)
print(f"[Producer] Added {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
async def consumer(queue: asyncio.Queue, worker_id: int):
"""キューからジョブを取り出して処理する"""
while True:
# キューからアイテムを取得
item = await queue.get()
# 処理実行
print(f" [Worker-{worker_id}] Processing {item}...")
await asyncio.sleep(random.uniform(0.2, 0.5)) # 処理時間のシミュレーション
print(f" [Worker-{worker_id}] Finished {item}")
# 処理完了をキューに通知
queue.task_done()
async def main_worker_pattern():
queue = asyncio.Queue()
# コンシューマー(ワーカー)を3つ起動
# デーモンタスクとしてバックグラウンドで走らせる
workers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# プロデューサーが10個のジョブを投入
await producer(queue, 10)
print("--- 全ジョブ投入完了、処理待ち ---")
# キュー内のアイテムが全て task_done() されるまで待機
await queue.join()
print("--- 全ジョブ完了 ---")
# ワーカータスクをキャンセルして終了
for w in workers:
w.cancel()
if __name__ == "__main__":
asyncio.run(main_worker_pattern())
まとめ
asyncio.Queue は、非同期プログラムにおけるデータパイプラインの要です。
- 向く場面: 生産者と消費者の速度が違う場合のバッファリング、ロックを使わないデータの排他制御、タスク間のメッセージパッシング。
- 変更ポイント: 状況に応じて
maxsizeを設定し、フロー制御を行ってください。 - 注意点: マルチスレッド環境での利用と混同しないようにしましょう。
複雑なロック制御を避け、キューを通じてデータを渡す設計にすることで、バグの少ないクリーンな非同期コードが書けます。
