Overview
In Python’s asyncio, simply using await on a coroutine function causes the process to run sequentially (one after another). However, by using asyncio.create_task(), you can wrap a coroutine into a “Task” and schedule it to run concurrently on the event loop.
This article explains how to create tasks, their execution timing, and how to use them for background processing. This is useful for reducing processing time or running background jobs while the main process continues.
Specifications (Input/Output)
- Input: Multiple coroutine functions to be run concurrently (e.g., requests to different APIs, periodic log output).
- Output: Standard output logs showing the start and end timing of each task, and the final results after all tasks complete.
- Operational Logic:
- The moment
create_task()is called, the task is scheduled on the event loop. If the loop is available, execution starts immediately. - You use
awaitlater to wait for the results.
- The moment
Basic Usage
When you pass a coroutine to asyncio.create_task(), a Task object is returned, and the execution is scheduled.
# Create a task (schedule for execution)
task1 = asyncio.create_task(some_coroutine())
# You can perform other processing here while task1 runs in the background
# Wait for the task when the result is needed
result = await task1
Full Code
This scenario simulates a “Inventory Check System” and a “Shipping Estimate System” querying two independent services simultaneously and combining the results.
import asyncio
import time
async def check_inventory(product_id: str) -> dict:
"""
Coroutine to simulate checking inventory.
"""
print(f"[Inventory] Starting inventory check for {product_id}...")
# Simulate communication with an external API (2 seconds)
await asyncio.sleep(2.0)
print(f"[Inventory] Inventory check for {product_id} complete.")
return {"id": product_id, "stock": 15, "status": "Available"}
async def estimate_shipping(product_id: str) -> dict:
"""
Coroutine to simulate estimating shipping costs.
"""
print(f"[Shipping] Starting shipping estimate for {product_id}...")
# Simulate communication with an external API (1 second)
await asyncio.sleep(1.0)
print(f"[Shipping] Shipping estimate for {product_id} complete.")
return {"id": product_id, "cost": 500, "days": 2}
async def main():
"""
Main flow: Run two tasks in parallel and combine results.
"""
target_item = "item_9876"
print(f"--- Process Started: {time.strftime('%X')} ---")
# 1. Create tasks (Execution is scheduled here)
# Unlike sequential execution, we move to task2 without waiting for task1 to finish
task_inventory = asyncio.create_task(check_inventory(target_item))
task_shipping = asyncio.create_task(estimate_shipping(target_item))
print(">> Tasks created. Other processing can be done here.")
# 2. Wait for results (await)
# Both tasks are already running in the background
inventory_result = await task_inventory
shipping_result = await task_shipping
print(f"--- Process Finished: {time.strftime('%X')} ---")
# Display results
print("\n[Final Results]")
print(f"Inventory: {inventory_result}")
print(f"Shipping: {shipping_result}")
if __name__ == "__main__":
asyncio.run(main())
Sample Execution Result
Since the inventory check (2s) and shipping estimate (1s) run in parallel, the total time is approximately 2 seconds (sequential execution would take 3 seconds).
--- Process Started: 10:00:00 ---
>> Tasks created. Other processing can be done here.
[Inventory] Starting inventory check for item_9876...
[Shipping] Starting shipping estimate for item_9876...
[Shipping] Shipping estimate for item_9876 complete.
[Inventory] Inventory check for item_9876 complete.
--- Process Finished: 10:00:02 ---
[Final Results]
Inventory: {'id': 'item_9876', 'stock': 15, 'status': 'Available'}
Shipping: {'id': 'item_9876', 'cost': 500, 'days': 2}
Customization Points
Task Management
By keeping the task object in a variable (like task_inventory), you can later interrupt the process with task.cancel() or check the status with task.done().
Error Handling
Exceptions raised within a task are thrown when you await task. Wrapping individual await calls in try-except blocks allows you to prevent one failure from stopping the entire application.
Task Naming
In Python 3.8 and later, you can name tasks using asyncio.create_task(coro, name="MyTask"), which is very helpful for debugging.
Important Notes
Keep a Reference
If you do not hold a reference (variable) to a created task, there is a risk that the task will be deleted mid-execution by the garbage collector (especially for “fire-and-forget” tasks). Always assign them to a variable or add them to a set for management.
Execution Timing
Calling create_task does not immediately hand over control to the task. The task starts the moment control returns to the event loop, such as when an await (e.g., await asyncio.sleep(0)) occurs in the current scope.
Difference from Threads
These are not “OS Threads” but “Cooperative Multitasking” within a single thread. If you include heavy CPU-bound calculations, other tasks will also stop.
Advanced Usage
This is a pattern for a background task that continues to display progress logs periodically while the main process is running.
import asyncio
async def background_monitor():
"""Monitoring task that runs periodically in the background"""
for i in range(1, 6):
print(f" [Monitor] System running... ({i}/5)")
# Output log every 0.5 seconds
await asyncio.sleep(0.5)
print(" [Monitor] Monitoring finished")
return "OK"
async def main_heavy_process():
"""Main heavy process"""
print(">>> Main: Starting data processing")
# Create background task
monitor_task = asyncio.create_task(background_monitor())
# Simulate main process (Monitor runs during this await)
await asyncio.sleep(3.0)
print(">>> Main: Data processing complete")
# Ensure background task completion
await monitor_task
if __name__ == "__main__":
asyncio.run(main_heavy_process())
Summary
asyncio.create_task is a fundamental and vital tool for controlling concurrency in asynchronous processing.
- Best for: Multiple independent API requests, log transmission, or cache updates occurring behind the main process.
- Key Differences: While
asyncio.gather(*tasks)is good for grouping,create_taskis better when you need individual task control (cancellation or specific await timing). - Reminder: Do not forget to keep a reference to your task variables to prevent unexpected garbage collection.
By scheduling tasks efficiently, you can significantly improve the responsiveness of your applications.
