目次
概要
Pythonの標準ライブラリである asyncio を使用して、複数の非同期処理(コルーチン)を意図した順番通りに実行する方法を解説します。 非同期処理は並列実行(asyncio.gather 等)による高速化が注目されがちですが、前段の処理結果を後段で使用する場合など、完了順序を保証しなければならないケースも多々あります。 本記事では await キーワードを活用し、依存関係のあるタスクを直列に処理する実装パターンを紹介します。
仕様(入出力)
- 入力
- 処理対象の識別子(例: データIDやファイル名)。
- 各工程でシミュレーション用の待機時間(秒)を設定。
- 出力
- 各工程の開始・終了ログを標準出力に表示。
- 最終的な処理結果を辞書型などで返却。
- 処理の流れ
- データ取得処理(ダウンロード)を実行。
- 取得したデータを用いて加工処理(プロセス)を実行。
- 全ての工程が完了した後、結果をまとめて出力。
基本の使い方
基本的な使い方は、コルーチン関数の呼び出し前に await を記述することです。 これにより、その行の処理が完了し、戻り値が返されるまで次の行へ進まずに待機します。
# 呼び出しイメージ
# step1 が完了して result1 が返るまで、step2 は開始されません
result1 = await step1()
result2 = await step2(result1)
コード全文
ここでは「サーバーから生データをダウンロードし、そのデータを解析してレポートを作成する」というシナリオを想定したコードを提示します。
import asyncio
import time
from typing import Dict, Any
async def download_raw_data(data_id: str) -> Dict[str, Any]:
"""
指定されたIDのデータを外部ソースから取得するシミュレーション。
Args:
data_id (str): データの識別子
Returns:
Dict[str, Any]: 取得した生データ
"""
print(f"[Download] ID: {data_id} のダウンロードを開始します...")
# ネットワーク通信等のIO待ちをシミュレート
await asyncio.sleep(1.0)
print(f"[Download] ID: {data_id} のダウンロードが完了しました。")
# 仮のデータを返却
return {
"id": data_id,
"content": "raw_binary_content",
"timestamp": time.time()
}
async def analyze_data(raw_data: Dict[str, Any]) -> str:
"""
生データを解析してレポートを生成するシミュレーション。
Args:
raw_data (Dict[str, Any]): download_raw_data で取得したデータ
Returns:
str: 解析結果のレポート文字列
"""
data_id = raw_data.get("id")
print(f"[Analyze] ID: {data_id} の解析を開始します...")
# データ解析等の計算処理やIO待ちをシミュレート
await asyncio.sleep(1.0)
print(f"[Analyze] ID: {data_id} の解析が完了しました。")
return f"Report for {data_id}: Content length is {len(raw_data['content'])}"
async def main():
"""
メインのワークフローを実行するコルーチン。
"""
target_id = "user_12345"
print("--- ワークフロー開始 ---")
# 1. データのダウンロード(完了するまで待機)
raw_data = await download_raw_data(target_id)
# 2. データの解析(ダウンロード結果を使って実行)
report = await analyze_data(raw_data)
print("--- ワークフロー終了 ---")
print(f"最終結果: {report}")
if __name__ == "__main__":
# イベントループの開始
asyncio.run(main())
実行結果例
--- ワークフロー開始 ---
[Download] ID: user_12345 のダウンロードを開始します...
[Download] ID: user_12345 のダウンロードが完了しました。
[Analyze] ID: user_12345 の解析を開始します...
[Analyze] ID: user_12345 の解析が完了しました。
--- ワークフロー終了 ---
最終結果: Report for user_12345: Content length is 18
カスタムポイント
- 待機時間の調整
asyncio.sleep(1.0)はデモンストレーション用の遅延です。実際にはaiohttpなどを用いた HTTP リクエストや、aiomysqlなどを用いたデータベース操作に置き換わります。
- エラーハンドリング
- 通信エラー等に備え、各
await呼び出しをtry-exceptブロックで囲むことで、途中で失敗した場合のリカバリ処理を追加できます。
- 通信エラー等に備え、各
- 戻り値の構造
Dict[str, Any]やstrではなく、dataclassやpydanticモデルを使用することで、データ構造をより堅牢に管理できます。
注意点
- 合計実行時間の増加
- 逐次実行(直列実行)するため、全体の処理時間は「各タスクの処理時間の合計」になります。独立して動けるタスクであれば、
asyncio.gatherを使用して並列化する方が高速です。
- 逐次実行(直列実行)するため、全体の処理時間は「各タスクの処理時間の合計」になります。独立して動けるタスクであれば、
- ブロッキング処理の混入
async関数の中でtime.sleep()や重い計算処理(CPUバウンドな処理)をそのまま書くと、イベントループ全体が止まってしまいます。必ずawait asyncio.sleep()や、CPUバウンドな処理にはrun_in_executorを使用してください。
- 古い記法の混在
- Python 3.7 未満の記事では
@asyncio.coroutineやyield fromが使われていることがありますが、現在はasync/await構文が標準です。
- Python 3.7 未満の記事では
- トップレベルでの await
awaitはasync defで定義された関数内(または Jupyter Notebook のセル内)でのみ使用可能です。通常の関数内で使うとSyntaxErrorになります。
応用
リストに格納された複数のタスクを、forループを使って順番に実行するパターンです。 APIのレートリミット(アクセス頻度制限)を守るために、あえて並列化せず、一定間隔で順番に処理したい場合などに有効です。
import asyncio
async def process_item(item_id: int):
print(f"Item {item_id}: 処理開始")
await asyncio.sleep(0.5)
print(f"Item {item_id}: 処理完了")
return item_id * 10
async def main_sequential_loop():
items = [1, 2, 3, 4, 5]
results = []
print("--- ループによる逐次実行開始 ---")
for item in items:
# ループ内で await することで、前の処理が終わってから次へ進む
res = await process_item(item)
results.append(res)
print("--- 全処理完了 ---")
print(f"結果リスト: {results}")
if __name__ == "__main__":
asyncio.run(main_sequential_loop())
まとめ
asyncio における逐次実行は、手続き型プログラミングと同様に直感的な記述が可能です。
- 向く場面: 前の処理結果を次の処理で使う場合、処理順序を厳密に管理したい場合、API制限などで同時実行数を1に抑えたい場合。
- 変更ポイント:
asyncio.gatherを使わないことで並列性を排除し、確実にawaitで完了を待ちます。 - 注意点: 不要な逐次実行はパフォーマンス低下の要因になるため、依存関係がない部分は並列化を検討してください。
依存関係を整理し、適切な箇所で await を使用することで、読みやすく予測可能な非同期プログラムを構築しましょう。
