【Python】asyncioで複数の非同期処理を決められた順序で実行する

目次

概要

Pythonの標準ライブラリである asyncio を使用して、複数の非同期処理(コルーチン)を意図した順番通りに実行する方法を解説します。 非同期処理は並列実行(asyncio.gather 等)による高速化が注目されがちですが、前段の処理結果を後段で使用する場合など、完了順序を保証しなければならないケースも多々あります。 本記事では await キーワードを活用し、依存関係のあるタスクを直列に処理する実装パターンを紹介します。

仕様(入出力)

  • 入力
    • 処理対象の識別子(例: データIDやファイル名)。
    • 各工程でシミュレーション用の待機時間(秒)を設定。
  • 出力
    • 各工程の開始・終了ログを標準出力に表示。
    • 最終的な処理結果を辞書型などで返却。
  • 処理の流れ
    1. データ取得処理(ダウンロード)を実行。
    2. 取得したデータを用いて加工処理(プロセス)を実行。
    3. 全ての工程が完了した後、結果をまとめて出力。

基本の使い方

基本的な使い方は、コルーチン関数の呼び出し前に await を記述することです。 これにより、その行の処理が完了し、戻り値が返されるまで次の行へ進まずに待機します。

# 呼び出しイメージ
# step1 が完了して result1 が返るまで、step2 は開始されません
result1 = await step1()
result2 = await step2(result1)

コード全文

ここでは「サーバーから生データをダウンロードし、そのデータを解析してレポートを作成する」というシナリオを想定したコードを提示します。

import asyncio
import time
from typing import Dict, Any

async def download_raw_data(data_id: str) -> Dict[str, Any]:
    """
    指定されたIDのデータを外部ソースから取得するシミュレーション。
    
    Args:
        data_id (str): データの識別子
        
    Returns:
        Dict[str, Any]: 取得した生データ
    """
    print(f"[Download] ID: {data_id} のダウンロードを開始します...")
    # ネットワーク通信等のIO待ちをシミュレート
    await asyncio.sleep(1.0)
    
    print(f"[Download] ID: {data_id} のダウンロードが完了しました。")
    # 仮のデータを返却
    return {
        "id": data_id,
        "content": "raw_binary_content",
        "timestamp": time.time()
    }

async def analyze_data(raw_data: Dict[str, Any]) -> str:
    """
    生データを解析してレポートを生成するシミュレーション。
    
    Args:
        raw_data (Dict[str, Any]): download_raw_data で取得したデータ
        
    Returns:
        str: 解析結果のレポート文字列
    """
    data_id = raw_data.get("id")
    print(f"[Analyze] ID: {data_id} の解析を開始します...")
    
    # データ解析等の計算処理やIO待ちをシミュレート
    await asyncio.sleep(1.0)
    
    print(f"[Analyze] ID: {data_id} の解析が完了しました。")
    return f"Report for {data_id}: Content length is {len(raw_data['content'])}"

async def main():
    """
    メインのワークフローを実行するコルーチン。
    """
    target_id = "user_12345"
    
    print("--- ワークフロー開始 ---")
    
    # 1. データのダウンロード(完了するまで待機)
    raw_data = await download_raw_data(target_id)
    
    # 2. データの解析(ダウンロード結果を使って実行)
    report = await analyze_data(raw_data)
    
    print("--- ワークフロー終了 ---")
    print(f"最終結果: {report}")

if __name__ == "__main__":
    # イベントループの開始
    asyncio.run(main())

実行結果例

--- ワークフロー開始 ---
[Download] ID: user_12345 のダウンロードを開始します...
[Download] ID: user_12345 のダウンロードが完了しました。
[Analyze] ID: user_12345 の解析を開始します...
[Analyze] ID: user_12345 の解析が完了しました。
--- ワークフロー終了 ---
最終結果: Report for user_12345: Content length is 18

カスタムポイント

  • 待機時間の調整
    • asyncio.sleep(1.0) はデモンストレーション用の遅延です。実際には aiohttp などを用いた HTTP リクエストや、aiomysql などを用いたデータベース操作に置き換わります。
  • エラーハンドリング
    • 通信エラー等に備え、各 await 呼び出しを try-except ブロックで囲むことで、途中で失敗した場合のリカバリ処理を追加できます。
  • 戻り値の構造
    • Dict[str, Any]str ではなく、dataclasspydantic モデルを使用することで、データ構造をより堅牢に管理できます。

注意点

  1. 合計実行時間の増加
    • 逐次実行(直列実行)するため、全体の処理時間は「各タスクの処理時間の合計」になります。独立して動けるタスクであれば、asyncio.gather を使用して並列化する方が高速です。
  2. ブロッキング処理の混入
    • async 関数の中で time.sleep() や重い計算処理(CPUバウンドな処理)をそのまま書くと、イベントループ全体が止まってしまいます。必ず await asyncio.sleep() や、CPUバウンドな処理には run_in_executor を使用してください。
  3. 古い記法の混在
    • Python 3.7 未満の記事では @asyncio.coroutineyield from が使われていることがありますが、現在は async/await 構文が標準です。
  4. トップレベルでの await
    • awaitasync def で定義された関数内(または Jupyter Notebook のセル内)でのみ使用可能です。通常の関数内で使うと SyntaxError になります。

応用

リストに格納された複数のタスクを、forループを使って順番に実行するパターンです。 APIのレートリミット(アクセス頻度制限)を守るために、あえて並列化せず、一定間隔で順番に処理したい場合などに有効です。

import asyncio

async def process_item(item_id: int):
    print(f"Item {item_id}: 処理開始")
    await asyncio.sleep(0.5)
    print(f"Item {item_id}: 処理完了")
    return item_id * 10

async def main_sequential_loop():
    items = [1, 2, 3, 4, 5]
    results = []
    
    print("--- ループによる逐次実行開始 ---")
    
    for item in items:
        # ループ内で await することで、前の処理が終わってから次へ進む
        res = await process_item(item)
        results.append(res)
        
    print("--- 全処理完了 ---")
    print(f"結果リスト: {results}")

if __name__ == "__main__":
    asyncio.run(main_sequential_loop())

まとめ

asyncio における逐次実行は、手続き型プログラミングと同様に直感的な記述が可能です。

  • 向く場面: 前の処理結果を次の処理で使う場合、処理順序を厳密に管理したい場合、API制限などで同時実行数を1に抑えたい場合。
  • 変更ポイント: asyncio.gather を使わないことで並列性を排除し、確実に await で完了を待ちます。
  • 注意点: 不要な逐次実行はパフォーマンス低下の要因になるため、依存関係がない部分は並列化を検討してください。

依存関係を整理し、適切な箇所で await を使用することで、読みやすく予測可能な非同期プログラムを構築しましょう。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次