【Python】asyncio.gatherで複数の非同期処理をまとめて並行実行する

目次

概要

Pythonの asyncio を使ってパフォーマンスを向上させる最大のメリットは、I/O待ち時間の並列化です。 asyncio.gather() を使用すると、複数のコルーチン(非同期タスク)を一度にスケジュールし、すべての処理が完了するのを待ってから、結果をリストとしてまとめて受け取ることができます。 個別に await するのと異なり、最も遅いタスクの完了時間に合わせて全体の処理時間が決まるため、大幅な高速化が期待できます。

仕様(入出力)

  • 入力
    • 実行したい複数のコルーチン関数(引数付きも可)。
    • 例: 複数のURLリスト、複数のファイルパスなど。
  • 出力
    • 全タスクの戻り値が、リクエストした順序通りに格納されたリスト。
  • 動作仕様
    • すべてのタスクを同時に開始(並行実行)します。
    • 全員が終わるまで現在の処理をブロック(待機)します。
    • デフォルトでは、1つでも例外が発生すると即座に全体が失敗扱いとなります(オプションで変更可)。

基本の使い方

コルーチン関数を呼び出して(オブジェクト化して)、asyncio.gather にカンマ区切りで渡します。

# 複数のコルーチンを渡して、全結果をリストで受け取る
results = await asyncio.gather(task1(), task2(), task3())
# results は [result1, result2, result3] となります

コード全文

ここでは、処理時間の異なる「複数のサーバーへのヘルスチェック(状態確認)」を並行して行い、結果を一括集計するシナリオを提示します。

import asyncio
import time
import random

async def check_server_health(server_name: str, wait_time: float) -> dict:
    """
    サーバーのヘルスチェックをシミュレーションするコルーチン。
    
    Args:
        server_name (str): サーバー名
        wait_time (float): 応答にかかる時間(秒)
        
    Returns:
        dict: サーバーの状態レポート
    """
    print(f"[{server_name}] 接続確認を開始します... (想定時間: {wait_time}秒)")
    
    # 通信待ちのシミュレーション
    await asyncio.sleep(wait_time)
    
    print(f"[{server_name}] 応答あり。")
    
    # 擬似的なステータス判定(ランダム)
    status = "OK" if random.random() > 0.2 else "Warning"
    
    return {
        "server": server_name,
        "status": status,
        "latency": wait_time
    }

async def main():
    """
    複数のサーバーチェックをまとめて実行し、結果を集計する。
    """
    start_time = time.time()
    print("--- 一斉チェック開始 ---")

    # 1. 実行したいコルーチンのリストを準備
    # 注意: ここではまだ実行されず、コルーチンオブジェクトが生成されるだけです
    tasks = [
        check_server_health("Server_A", 2.0),
        check_server_health("Server_B", 3.0),
        check_server_health("Server_C", 1.0),
    ]

    # 2. gatherでまとめて実行&待機
    # *tasks でリストを展開して引数として渡しています
    results = await asyncio.gather(*tasks)

    print("--- 全チェック完了 ---")
    
    # 結果の表示(resultsの順序はtasksの順序と一致します)
    for res in results:
        print(f"結果: {res}")

    total_time = time.time() - start_time
    print(f"\n所要時間: {total_time:.2f}秒 (直列なら合計 6.0秒)")

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

最も遅い「Server_B(3.0秒)」に合わせて全体が約3秒で完了します。

--- 一斉チェック開始 ---
[Server_A] 接続確認を開始します... (想定時間: 2.0秒)
[Server_B] 接続確認を開始します... (想定時間: 3.0秒)
[Server_C] 接続確認を開始します... (想定時間: 1.0秒)
[Server_C] 応答あり。
[Server_A] 応答あり。
[Server_B] 応答あり。
--- 全チェック完了 ---
結果: {'server': 'Server_A', 'status': 'OK', 'latency': 2.0}
結果: {'server': 'Server_B', 'status': 'OK', 'latency': 3.0}
結果: {'server': 'Server_C', 'status': 'Warning', 'latency': 1.0}

所要時間: 3.01秒 (直列なら合計 6.0秒)

カスタムポイント

  • 例外処理の制御 (return_exceptions)
    • デフォルトでは、タスクの1つがエラーを起こすと gather 全体が例外を送出します。
    • await asyncio.gather(*tasks, return_exceptions=True) とすることで、エラーが起きても停止せず、結果リストの中に例外オブジェクト(Exception)を含めて返してくれます。全タスクの結果を確実に回収したい場合に必須の設定です。
  • タスクリストの動的生成
    • [func(i) for i in range(10)] のようにリスト内包表記を使うことで、可変長のタスクを簡単に扱えます。

注意点

  1. 結果の順序
    • 完了した順ではなく、引数に渡した順で結果リストが返ってきます。これにより、入力データと結果の紐付けが容易になります。
  2. キャンセルの連動
    • gather 自体をキャンセル(task.cancel())すると、管理下にある全ての子タスクに対してもキャンセルが伝播します。
  3. 引数の展開
    • リストでタスクを持っている場合は *tasks(アンパック)が必要です。gather(tasks) とリストをそのまま渡すとエラーになります(タスクオブジェクトそのものを引数として期待するため)。

応用

return_exceptions=True を使用して、一部のタスクが失敗しても全体の処理を続行させるパターンです。

import asyncio

async def risky_task(task_id: int):
    if task_id == 2:
        raise ValueError("予期せぬエラーが発生!")
    await asyncio.sleep(0.5)
    return f"Task {task_id} Success"

async def main_resilient():
    tasks = [risky_task(1), risky_task(2), risky_task(3)]
    
    # エラーがあっても中断せず、例外オブジェクトを結果として受け取る
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    print("--- 実行結果 ---")
    for i, res in enumerate(results):
        if isinstance(res, Exception):
            print(f"Task {i+1}: 失敗 ({res})")
        else:
            print(f"Task {i+1}: 成功 ({res})")

if __name__ == "__main__":
    asyncio.run(main_resilient())

まとめ

asyncio.gather は、複数の非同期処理を扱う上で最も頻繁に使用されるAPIの一つです。

  • 向く場面: 互いに依存しない複数のタスク(APIコール、DBクエリ等)をヨーイドンで実行し、全ての結果を使って次の処理を行いたい場合。
  • 変更ポイント: return_exceptions=True を使うかどうかで、エラー時の堅牢性が大きく変わります。
  • 注意点: 大量のタスク(数千件など)を一度に gather すると負荷が高まるため、その場合はセマフォ(asyncio.Semaphore)等で同時実行数を制限する必要があります。

このパターンを習得すれば、Pythonの非同期処理による高速化の恩恵を最大限に受けることができます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次