Overview
The biggest advantage of using Python’s asyncio to improve performance is the parallelization of I/O wait times. By using asyncio.gather(), you can schedule multiple coroutines (asynchronous tasks) all at once. The program waits for all tasks to complete and then receives the results together as a list. Unlike waiting for tasks individually with await, the total processing time is determined by the slowest task, allowing for significant speed improvements.
Specifications (Input/Output)
- Input: Multiple coroutine functions you want to execute (can include arguments).
- Example: A list of multiple URLs, multiple file paths, etc.
- Output: A list containing the return values of all tasks, stored in the same order they were requested.
- Operational Logic:
- Starts all tasks simultaneously (concurrent execution).
- Blocks (waits) the current process until all tasks are finished.
- By default, if even one exception occurs, the entire operation is treated as a failure immediately (this can be changed with options).
Basic Usage
Call the coroutine functions to create objects and pass them to asyncio.gather separated by commas.
# Pass multiple coroutines and receive all results in a list
results = await asyncio.gather(task1(), task2(), task3())
# results will be [result1, result2, result3]
Full Code
This scenario performs health checks on multiple servers with different response times and aggregates the results.
import asyncio
import time
import random
async def check_server_health(server_name: str, wait_time: float) -> dict:
"""
Coroutine to simulate a server health check.
Args:
server_name (str): Name of the server
wait_time (float): Time taken for the response (seconds)
Returns:
dict: Server status report
"""
print(f"[{server_name}] Starting connection check... (Estimated time: {wait_time}s)")
# Simulate communication wait
await asyncio.sleep(wait_time)
print(f"[{server_name}] Response received.")
# Pseudo status determination (random)
status = "OK" if random.random() > 0.2 else "Warning"
return {
"server": server_name,
"status": status,
"latency": wait_time
}
async def main():
"""
Executes multiple server checks at once and aggregates the results.
"""
start_time = time.time()
print("--- Simultaneous Check Started ---")
# 1. Prepare a list of coroutines to execute
# Note: Execution hasn't started yet; coroutine objects are just created
tasks = [
check_server_health("Server_A", 2.0),
check_server_health("Server_B", 3.0),
check_server_health("Server_C", 1.0),
]
# 2. Execute and wait for all using gather
# *tasks unpacks the list to pass items as individual arguments
results = await asyncio.gather(*tasks)
print("--- All Checks Completed ---")
# Displaying results (the order of results matches the order of tasks)
for res in results:
print(f"Result: {res}")
total_time = time.time() - start_time
print(f"\nTime taken: {total_time:.2f}s (Would be 6.0s if sequential)")
if __name__ == "__main__":
asyncio.run(main())
Sample Execution Result
The entire process finishes in about 3 seconds, matching the slowest task (“Server_B”).
--- Simultaneous Check Started ---
[Server_A] Starting connection check... (Estimated time: 2.0s)
[Server_B] Starting connection check... (Estimated time: 3.0s)
[Server_C] Starting connection check... (Estimated time: 1.0s)
[Server_C] Response received.
[Server_A] Response received.
[Server_B] Response received.
--- All Checks Completed ---
Result: {'server': 'Server_A', 'status': 'OK', 'latency': 2.0}
Result: {'server': 'Server_B', 'status': 'OK', 'latency': 3.0}
Result: {'server': 'Server_C', 'status': 'Warning', 'latency': 1.0}
Time taken: 3.01s (Would be 6.0s if sequential)
Customization Points
Exception Handling Control (return_exceptions)
By default, if one task raises an error, gather raises that exception immediately. By setting await asyncio.gather(*tasks, return_exceptions=True), the process will not stop even if an error occurs. Instead, it returns the Exception object within the result list. This is essential if you want to ensure results are collected from all successful tasks.
Dynamic Task Generation
You can easily handle a variable number of tasks by using list comprehensions like [func(i) for i in range(10)].
Important Notes
Order of Results
Results are returned in the same order as the arguments passed, not the order in which they finished. This makes it easy to map results back to input data.
Cancellation Linkage
If you cancel the gather task itself (task.cancel()), the cancellation propagates to all child tasks managed under it.
Unpacking Arguments
If you have tasks in a list, you must use *tasks (unpacking). Passing gather(tasks) directly will result in an error because gather expects task objects as individual arguments.
Advanced Usage
This pattern uses return_exceptions=True to continue processing even if some tasks fail.
import asyncio
async def risky_task(task_id: int):
if task_id == 2:
raise ValueError("Unexpected error occurred!")
await asyncio.sleep(0.5)
return f"Task {task_id} Success"
async def main_resilient():
tasks = [risky_task(1), risky_task(2), risky_task(3)]
# Do not interrupt on error; receive Exception objects in the results
results = await asyncio.gather(*tasks, return_exceptions=True)
print("--- Execution Results ---")
for i, res in enumerate(results):
if isinstance(res, Exception):
print(f"Task {i+1}: Failed ({res})")
else:
print(f"Task {i+1}: Success ({res})")
if __name__ == "__main__":
asyncio.run(main_resilient())
Summary
asyncio.gather is one of the most frequently used APIs for handling multiple asynchronous processes.
- Best for: Executing multiple independent tasks (API calls, DB queries, etc.) at once and using all results for the next step.
- Key setting: Robustness against errors changes significantly depending on whether you use
return_exceptions=True. - Caution: Gathering a massive number of tasks (e.g., thousands) at once can increase system load. In such cases, you should limit the number of concurrent executions using tools like semaphores (
asyncio.Semaphore).
By mastering this pattern, you can take full advantage of performance gains through asynchronous processing in Python.
