【Python】asyncio.create_taskで非同期処理を並行実行する

目次

概要

Pythonの asyncio において、コルーチン関数を単に await するだけでは処理は逐次的(順番)に行われますが、asyncio.create_task() を使用することで、コルーチンを「タスク」としてラップし、イベントループ上で並行して実行させることができます。 本記事では、タスクの生成方法、実行タイミング、そしてバックグラウンド処理としての活用パターンを解説します。処理時間の短縮や、メイン処理の裏側でのジョブ実行に役立ちます。

仕様(入出力)

  • 入力
    • 並行実行したい複数のコルーチン関数(例: 異なるAPIへのリクエスト、定期ログ出力など)。
    • 各処理のシミュレーション待機時間。
  • 出力
    • 各タスクの開始・終了タイミングがわかる標準出力ログ。
    • 全タスク完了後の最終結果。
  • 動作仕様
    • create_task() を呼んだ時点でタスクはイベントループにスケジュール(予約)され、待機可能状態であれば即座に実行が始まります。
    • その後の await で結果が返るのを待ち合わせます。

基本の使い方

コルーチンを asyncio.create_task() に渡すと Task オブジェクトが返されます。この時点で実行予約が行われます。

# タスクを作成(実行予約)
task1 = asyncio.create_task(some_coroutine())

# ここで別の処理を行える(task1は裏で進む)

# 結果が必要になったタイミングで待機
result = await task1

コード全文

ここでは「在庫確認システム」と「配送見積もりシステム」という2つの独立したサービスへ同時に問い合わせを行い、両方の結果を合算するシナリオを提示します。

import asyncio
import time

async def check_inventory(product_id: str) -> dict:
    """
    在庫確認を行うシミュレーション用コルーチン。
    
    Args:
        product_id (str): 商品ID
        
    Returns:
        dict: 在庫状況の結果
    """
    print(f"[Inventory] 商品 {product_id} の在庫確認を開始...")
    # 外部APIへの通信を想定した待機(2秒)
    await asyncio.sleep(2.0)
    print(f"[Inventory] 商品 {product_id} の在庫確認完了。")
    return {"id": product_id, "stock": 15, "status": "Available"}

async def estimate_shipping(product_id: str) -> dict:
    """
    配送見積もりを行うシミュレーション用コルーチン。
    
    Args:
        product_id (str): 商品ID
        
    Returns:
        dict: 見積もり結果
    """
    print(f"[Shipping] 商品 {product_id} の配送見積もりを開始...")
    # 外部APIへの通信を想定した待機(1秒)
    await asyncio.sleep(1.0)
    print(f"[Shipping] 商品 {product_id} の配送見積もり完了。")
    return {"id": product_id, "cost": 500, "days": 2}

async def main():
    """
    メインフロー:2つのタスクを並行実行して結果をまとめる。
    """
    target_item = "item_9876"
    
    print(f"--- 処理開始: {time.strftime('%X')} ---")
    
    # 1. タスクの生成(ここで実行がスケジュールされます)
    # 直列実行と異なり、task1の完了を待たずにtask2の生成へ進みます
    task_inventory = asyncio.create_task(check_inventory(target_item))
    task_shipping = asyncio.create_task(estimate_shipping(target_item))
    
    print(">> タスク生成完了。他の処理を実行可能です。")
    
    # 2. 結果の待機(await)
    # 両方のタスクは既に裏で走っています
    inventory_result = await task_inventory
    shipping_result = await task_shipping
    
    print(f"--- 処理終了: {time.strftime('%X')} ---")
    
    # 結果の表示
    print("\n【最終結果】")
    print(f"在庫: {inventory_result}")
    print(f"配送: {shipping_result}")

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

在庫確認(2秒)と配送見積もり(1秒)が並行して動くため、全体の処理時間は約2秒で完了します(直列なら合計3秒かかります)。

--- 処理開始: 10:00:00 ---
>> タスク生成完了。他の処理を実行可能です。
[Inventory] 商品 item_9876 の在庫確認を開始...
[Shipping] 商品 item_9876 の配送見積もりを開始...
[Shipping] 商品 item_9876 の配送見積もり完了。
[Inventory] 商品 item_9876 の在庫確認完了。
--- 処理終了: 10:00:02 ---

【最終結果】
在庫: {'id': 'item_9876', 'stock': 15, 'status': 'Available'}
配送: {'id': 'item_9876', 'cost': 500, 'days': 2}

カスタムポイント

  • タスクの管理
    • 生成したタスクオブジェクトを変数(task_inventory 等)に保持しておくことで、後から task.cancel() で処理を中断させたり、task.done() で完了状態を確認したりできます。
  • 例外処理
    • await task を実行したタイミングで、タスク内で発生した例外が送出されます。個別に try-except で囲むことで、片方の失敗が全体を停止させるのを防げます。
  • タスク名
    • Python 3.8 以降では asyncio.create_task(coro, name="MyTask") のように名前を付けられ、デバッグ時に役立ちます。

注意点

  1. 参照の保持
    • 作成したタスクへの参照(変数)を保持しないと、ガベージコレクションによってタスクが途中で消滅するリスクがあります(特に「撃ちっ放し」にする場合)。必ず変数に代入するか、セットに追加して管理してください。
  2. 実行タイミング
    • create_task しただけでは、まだ制御がタスクに移っていません。現在のスコープで await(例えば await asyncio.sleep(0) など)が発生し、イベントループへ制御が戻った瞬間にタスクが動き出します。
  3. スレッドとの違い
    • これは「OSスレッド」ではなく、シングルスレッド内の「協調的マルチタスク」です。CPUバウンドな重い計算処理を入れると、他のタスクも止まります。

応用

「メイン処理を行っている間、裏側で定期的に進捗ログを表示し続ける」バックグラウンドタスクのパターンです。

import asyncio

async def background_monitor():
    """バックグラウンドで定期実行される監視タスク"""
    for i in range(1, 6):
        print(f"  [Monitor] システム稼働中... ({i}/5)")
        # 0.5秒ごとにログ出力
        await asyncio.sleep(0.5)
    print("  [Monitor] 監視終了")
    return "OK"

async def main_heavy_process():
    """メインの重い処理"""
    print(">>> Main: データ処理を開始します")
    
    # バックグラウンドタスクを生成(即座には動き出さない)
    monitor_task = asyncio.create_task(background_monitor())
    
    # メイン処理のシミュレーション(ここでawaitするので、その隙にMonitorが動く)
    await asyncio.sleep(3.0)
    
    print(">>> Main: データ処理が完了しました")
    
    # バックグラウンドタスクの完了を確実に待つ
    await monitor_task

if __name__ == "__main__":
    asyncio.run(main_heavy_process())

まとめ

asyncio.create_task は、非同期処理の並行性を制御する基本かつ重要なツールです。

  • 向く場面: 互いに依存しない複数のAPIリクエスト、メイン処理の裏で行うログ送信やキャッシュ更新。
  • 変更ポイント: 単純にリストでまとめて asyncio.gather(*tasks) する方法もありますが、create_task は個別のタスク制御(キャンセルや個別のawait)が必要な場合に適しています。
  • 注意点: タスク変数の参照保持を忘れずに行い、意図しないガベージコレクションを防いでください。

効率的にタスクをスケジュールし、アプリケーションの応答性を高めましょう。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次