【Python】asyncio.Queueでコルーチン間のデータ受け渡しを安全に行う

目次

概要

asyncio.Queue は、非同期処理(コルーチン)間でデータをやり取りするためのFIFO(先入れ先出し)キューです。

リストなどの通常のデータ構造とは異なり、操作がコルーチンセーフ(非同期処理において安全)に設計されています。これにより、ロック(Lock)を明示的に使わなくても、複数のタスクから同時にデータの追加や取り出しを行ってもデータの整合性が保たれます。

本記事では、基本的なメソッドの使い方と、キューを利用して変数の競合を防ぐパターンを紹介します。

仕様(入出力)

  • 入力:
    • キューの最大サイズ(maxsize)。0以下なら無限。
    • キューに入れる任意のオブジェクト。
  • 出力:
    • キューから取り出したオブジェクト。
    • 現在のキューの格納数などの状態。

主要メソッド一覧

メソッド構文説明ブロック(待機)
追加await queue.put(item)アイテムをキューの最後尾に追加します。キューが満杯の場合は空きができるまで待機します。あり
取得await queue.get()アイテムをキューの先頭から取得して削除します。キューが空の場合はアイテムが入るまで待機します。あり
サイズqueue.qsize()現在キューに入っているアイテム数を返します。なし
空判定queue.empty()キューが空であれば True を返します。なし
満杯判定queue.full()キューが満杯であれば True を返します。なし

基本の使い方

まずは単純にデータを入れ、順番に取り出す基本的な動作です。

put と get は非同期メソッドなので await が必要です。

import asyncio

async def main():
    # キューの作成
    q = asyncio.Queue()

    print("--- データの追加 ---")
    await q.put("Data A")
    await q.put("Data B")
    
    print(f"現在のサイズ: {q.qsize()}")

    print("--- データの取得 ---")
    # 入れた順に出てきます(FIFO)
    item1 = await q.get()
    print(f"取得: {item1}")
    
    item2 = await q.get()
    print(f"取得: {item2}")

    # empty判定
    if q.empty():
        print("キューは空です")

if __name__ == "__main__":
    asyncio.run(main())

コード全文

ここでは、asyncio.Lock を使わずに、キュー自体を状態の受け渡し場所として使うことで、計算の競合(レースコンディション)を防ぐ例を紹介します。

キューの中に常に「最新のカウンタ値」というボールが1つだけ入っており、それを2つのタスクが奪い合って更新して戻すイメージです。

import asyncio

async def increment_worker(queue: asyncio.Queue, worker_name: str, count: int):
    """
    キューから現在の値を取り出し、加算して戻す処理を繰り返すワーカー。
    
    Args:
        queue (asyncio.Queue): 共有のキュー
        worker_name (str): ログ用識別子
        count (int): ループ回数
    """
    for _ in range(count):
        # 1. キューから値を取り出す(空なら待機)
        #    ここで「処理権」を獲得したことになる
        value = await queue.get()
        
        # 2. 何らかの処理(コンテキストスイッチを挟む)
        await asyncio.sleep(0.01)
        
        # 3. 加算して新しい値をキューに戻す
        new_value = value + 1
        await queue.put(new_value)
        
        # デバッグ出力(量が多いため省略可)
        # print(f"[{worker_name}] updated to {new_value}")

async def main():
    # 共有キューの作成
    # このパターンでは常に1つの値しか入らないため maxsize は意識しなくて良い
    queue = asyncio.Queue()
    
    # 初期値 0 を投入
    await queue.put(0)
    
    print("--- インクリメント競走開始 ---")
    
    # 2つのタスクで合計100回 (50 + 50) インクリメントを実行
    # ロックを使わずとも、queue.get() で直列化されるため競合しない
    await asyncio.gather(
        increment_worker(queue, "Worker-A", 50),
        increment_worker(queue, "Worker-B", 50)
    )
    
    # 最終結果を取り出す
    final_result = await queue.get()
    
    print("--- 処理完了 ---")
    print(f"最終カウンタ値: {final_result} (期待値: 100)")

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

Plaintext

--- インクリメント競走開始 ---
--- 処理完了 ---
最終カウンタ値: 100 (期待値: 100)

カスタムポイント

  • maxsize の設定
    • q = asyncio.Queue(maxsize=10) のようにサイズ制限を設けることで、メモリの使いすぎを防いだり、生産者(Producer)と消費者(Consumer)の速度差を制御(バックプレッシャー)したりできます。
  • タスク完了の管理 (task_done)
    • コンシューマーパターンでは、処理が終わったら queue.task_done() を呼び、メイン側で await queue.join() を使うことで、「キューに入れた全タスクが処理されるまで待つ」という制御が可能です。
  • タイムアウト付き取得
    • await asyncio.wait_for(q.get(), timeout=1.0) を使うことで、一定時間データが来なければ諦める処理を実装できます。

注意点

  1. スレッドセーフではない
    • asyncio.Queue はあくまで asyncioのイベントループ内でのみ 安全です。threading モジュールを使った別スレッドから put/get する場合は janus ライブラリなどのスレッドセーフな非同期キューを使用するか、call_soon_threadsafe を介する必要があります。
  2. デッドロック
    • maxsize が設定されている状態で、キューが満杯の時に put するとブロックされます。取り出す側のタスクが動いていないと、永久に待ち続けることになります。
  3. get_nowait / put_nowait
    • 待機せずに即座に処理する q.get_nowait() などもありますが、キューが空/満杯の場合に即座に例外(asyncio.QueueEmpty / asyncio.QueueFull)を送出します。例外処理が必要です。

応用

典型的な「生産者・消費者(Producer-Consumer)パターン」の実装例です。

生産者がジョブを作り、複数の消費者が並行して処理します。task_done と join を使用しています。

import asyncio
import random

async def producer(queue: asyncio.Queue, n: int):
    """ジョブを生成してキューに入れる"""
    for i in range(n):
        item = f"Job-{i}"
        await queue.put(item)
        print(f"[Producer] Added {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))

async def consumer(queue: asyncio.Queue, worker_id: int):
    """キューからジョブを取り出して処理する"""
    while True:
        # キューからアイテムを取得
        item = await queue.get()
        
        # 処理実行
        print(f"  [Worker-{worker_id}] Processing {item}...")
        await asyncio.sleep(random.uniform(0.2, 0.5))  # 処理時間のシミュレーション
        print(f"  [Worker-{worker_id}] Finished {item}")
        
        # 処理完了をキューに通知
        queue.task_done()

async def main_worker_pattern():
    queue = asyncio.Queue()
    
    # コンシューマー(ワーカー)を3つ起動
    # デーモンタスクとしてバックグラウンドで走らせる
    workers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    
    # プロデューサーが10個のジョブを投入
    await producer(queue, 10)
    
    print("--- 全ジョブ投入完了、処理待ち ---")
    
    # キュー内のアイテムが全て task_done() されるまで待機
    await queue.join()
    
    print("--- 全ジョブ完了 ---")
    
    # ワーカータスクをキャンセルして終了
    for w in workers:
        w.cancel()

if __name__ == "__main__":
    asyncio.run(main_worker_pattern())

まとめ

asyncio.Queue は、非同期プログラムにおけるデータパイプラインの要です。

  • 向く場面: 生産者と消費者の速度が違う場合のバッファリング、ロックを使わないデータの排他制御、タスク間のメッセージパッシング。
  • 変更ポイント: 状況に応じて maxsize を設定し、フロー制御を行ってください。
  • 注意点: マルチスレッド環境での利用と混同しないようにしましょう。

複雑なロック制御を避け、キューを通じてデータを渡す設計にすることで、バグの少ないクリーンな非同期コードが書けます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次