【Python】Parallel Execution with asyncio.create_task

目次

Overview

In Python’s asyncio, simply using await on a coroutine function causes the process to run sequentially (one after another). However, by using asyncio.create_task(), you can wrap a coroutine into a “Task” and schedule it to run concurrently on the event loop.

This article explains how to create tasks, their execution timing, and how to use them for background processing. This is useful for reducing processing time or running background jobs while the main process continues.


Specifications (Input/Output)

  • Input: Multiple coroutine functions to be run concurrently (e.g., requests to different APIs, periodic log output).
  • Output: Standard output logs showing the start and end timing of each task, and the final results after all tasks complete.
  • Operational Logic:
    • The moment create_task() is called, the task is scheduled on the event loop. If the loop is available, execution starts immediately.
    • You use await later to wait for the results.

Basic Usage

When you pass a coroutine to asyncio.create_task(), a Task object is returned, and the execution is scheduled.

# Create a task (schedule for execution)
task1 = asyncio.create_task(some_coroutine())

# You can perform other processing here while task1 runs in the background

# Wait for the task when the result is needed
result = await task1

Full Code

This scenario simulates a “Inventory Check System” and a “Shipping Estimate System” querying two independent services simultaneously and combining the results.

import asyncio
import time

async def check_inventory(product_id: str) -> dict:
    """
    Coroutine to simulate checking inventory.
    """
    print(f"[Inventory] Starting inventory check for {product_id}...")
    # Simulate communication with an external API (2 seconds)
    await asyncio.sleep(2.0)
    print(f"[Inventory] Inventory check for {product_id} complete.")
    return {"id": product_id, "stock": 15, "status": "Available"}

async def estimate_shipping(product_id: str) -> dict:
    """
    Coroutine to simulate estimating shipping costs.
    """
    print(f"[Shipping] Starting shipping estimate for {product_id}...")
    # Simulate communication with an external API (1 second)
    await asyncio.sleep(1.0)
    print(f"[Shipping] Shipping estimate for {product_id} complete.")
    return {"id": product_id, "cost": 500, "days": 2}

async def main():
    """
    Main flow: Run two tasks in parallel and combine results.
    """
    target_item = "item_9876"
    
    print(f"--- Process Started: {time.strftime('%X')} ---")
    
    # 1. Create tasks (Execution is scheduled here)
    # Unlike sequential execution, we move to task2 without waiting for task1 to finish
    task_inventory = asyncio.create_task(check_inventory(target_item))
    task_shipping = asyncio.create_task(estimate_shipping(target_item))
    
    print(">> Tasks created. Other processing can be done here.")
    
    # 2. Wait for results (await)
    # Both tasks are already running in the background
    inventory_result = await task_inventory
    shipping_result = await task_shipping
    
    print(f"--- Process Finished: {time.strftime('%X')} ---")
    
    # Display results
    print("\n[Final Results]")
    print(f"Inventory: {inventory_result}")
    print(f"Shipping: {shipping_result}")

if __name__ == "__main__":
    asyncio.run(main())

Sample Execution Result

Since the inventory check (2s) and shipping estimate (1s) run in parallel, the total time is approximately 2 seconds (sequential execution would take 3 seconds).

--- Process Started: 10:00:00 ---
>> Tasks created. Other processing can be done here.
[Inventory] Starting inventory check for item_9876...
[Shipping] Starting shipping estimate for item_9876...
[Shipping] Shipping estimate for item_9876 complete.
[Inventory] Inventory check for item_9876 complete.
--- Process Finished: 10:00:02 ---

[Final Results]
Inventory: {'id': 'item_9876', 'stock': 15, 'status': 'Available'}
Shipping: {'id': 'item_9876', 'cost': 500, 'days': 2}

Customization Points

Task Management

By keeping the task object in a variable (like task_inventory), you can later interrupt the process with task.cancel() or check the status with task.done().

Error Handling

Exceptions raised within a task are thrown when you await task. Wrapping individual await calls in try-except blocks allows you to prevent one failure from stopping the entire application.

Task Naming

In Python 3.8 and later, you can name tasks using asyncio.create_task(coro, name="MyTask"), which is very helpful for debugging.


Important Notes

Keep a Reference

If you do not hold a reference (variable) to a created task, there is a risk that the task will be deleted mid-execution by the garbage collector (especially for “fire-and-forget” tasks). Always assign them to a variable or add them to a set for management.

Execution Timing

Calling create_task does not immediately hand over control to the task. The task starts the moment control returns to the event loop, such as when an await (e.g., await asyncio.sleep(0)) occurs in the current scope.

Difference from Threads

These are not “OS Threads” but “Cooperative Multitasking” within a single thread. If you include heavy CPU-bound calculations, other tasks will also stop.


Advanced Usage

This is a pattern for a background task that continues to display progress logs periodically while the main process is running.

import asyncio

async def background_monitor():
    """Monitoring task that runs periodically in the background"""
    for i in range(1, 6):
        print(f"  [Monitor] System running... ({i}/5)")
        # Output log every 0.5 seconds
        await asyncio.sleep(0.5)
    print("  [Monitor] Monitoring finished")
    return "OK"

async def main_heavy_process():
    """Main heavy process"""
    print(">>> Main: Starting data processing")
    
    # Create background task
    monitor_task = asyncio.create_task(background_monitor())
    
    # Simulate main process (Monitor runs during this await)
    await asyncio.sleep(3.0)
    
    print(">>> Main: Data processing complete")
    
    # Ensure background task completion
    await monitor_task

if __name__ == "__main__":
    asyncio.run(main_heavy_process())

Summary

asyncio.create_task is a fundamental and vital tool for controlling concurrency in asynchronous processing.

  • Best for: Multiple independent API requests, log transmission, or cache updates occurring behind the main process.
  • Key Differences: While asyncio.gather(*tasks) is good for grouping, create_task is better when you need individual task control (cancellation or specific await timing).
  • Reminder: Do not forget to keep a reference to your task variables to prevent unexpected garbage collection.

By scheduling tasks efficiently, you can significantly improve the responsiveness of your applications.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次