【Python】Executing Multiple Asynchronous Tasks in a Specific Order with asyncio

目次

Overview

This article explains how to use Python’s standard library asyncio to execute multiple asynchronous processes (coroutines) in a specific order. While asynchronous programming is often used for speed through parallel execution (like asyncio.gather), there are many cases where you must guarantee the order of completion—for example, when the result of one task is needed for the next. This article introduces implementation patterns for processing dependent tasks sequentially using the await keyword.

Specifications (Input/Output)

  • Input:
    • Identifiers for processing (e.g., data IDs or filenames).
    • Simulation wait times (seconds) for each step.
  • Output:
    • Start and end logs for each step displayed in the standard output.
    • Final processing results returned as a dictionary or similar structure.
  • Process Flow:
    1. Execute data retrieval (download).
    2. Execute data processing (analysis) using the retrieved data.
    3. Output the combined results after all steps are complete.

Basic Usage

The basic way to ensure sequential execution is to place await before calling each coroutine function. This ensures the program waits until that line’s process is finished and a return value is received before moving to the next line.

# Execution concept
# step2 will not start until step1 is complete and result1 is returned
result1 = await step1()
result2 = await step2(result1)

Full Code

Below is a code example based on a scenario where “raw data is downloaded from a server, analyzed, and then a report is created.”

import asyncio
import time
from typing import Dict, Any

async def download_raw_data(data_id: str) -> Dict[str, Any]:
    """
    Simulates retrieving data for a specific ID from an external source.
    
    Args:
        data_id (str): Data identifier
        
    Returns:
        Dict[str, Any]: The retrieved raw data
    """
    print(f"[Download] Starting download for ID: {data_id}...")
    # Simulate IO wait such as network communication
    await asyncio.sleep(1.0)
    
    print(f"[Download] Completed download for ID: {data_id}.")
    # Return dummy data
    return {
        "id": data_id,
        "content": "raw_binary_content",
        "timestamp": time.time()
    }

async def analyze_data(raw_data: Dict[str, Any]) -> str:
    """
    Simulates analyzing raw data to generate a report.
    
    Args:
        raw_data (Dict[str, Any]): Data retrieved via download_raw_data
        
    Returns:
        str: Analyzed report string
    """
    data_id = raw_data.get("id")
    print(f"[Analyze] Starting analysis for ID: {data_id}...")
    
    # Simulate computational processing or IO wait
    await asyncio.sleep(1.0)
    
    print(f"[Analyze] Completed analysis for ID: {data_id}.")
    return f"Report for {data_id}: Content length is {len(raw_data['content'])}"

async def main():
    """
    Main workflow coroutine.
    """
    target_id = "user_12345"
    
    print("--- Workflow Started ---")
    
    # 1. Download data (Wait until completion)
    raw_data = await download_raw_data(target_id)
    
    # 2. Analyze data (Execute using the download result)
    report = await analyze_data(raw_data)
    
    print("--- Workflow Finished ---")
    print(f"Final Result: {report}")

if __name__ == "__main__":
    # Start the event loop
    asyncio.run(main())

Sample Execution Result

--- Workflow Started ---
[Download] Starting download for ID: user_12345...
[Download] Completed download for ID: user_12345.
[Analyze] Starting analysis for ID: user_12345...
[Analyze] Completed analysis for ID: user_12345.
--- Workflow Finished ---
Final Result: Report for user_12345: Content length is 18

Customization Points

Adjusting Wait Times

asyncio.sleep(1.0) is used for demonstration delays. In a real application, replace this with HTTP requests using aiohttp or database operations using aiomysql.

Error Handling

To prepare for communication errors, you can wrap await calls in try-except blocks. This allows you to add recovery logic if a task fails midway.

Data Structures

Instead of using Dict[str, Any] or str, you can use dataclasses or pydantic models to manage your data structures more robustly.

Important Notes

Increase in Total Execution Time

Since tasks are executed sequentially, the total processing time will be the sum of each task’s duration. If tasks can run independently, parallelizing them with asyncio.gather is much faster.

Avoid Blocking Calls

If you write time.sleep() or heavy computational logic (CPU-bound tasks) directly inside an async function, the entire event loop will stop. Always use await asyncio.sleep() or use run_in_executor for CPU-bound tasks.

Mixing Old Syntax

Articles written for Python versions older than 3.7 may use @asyncio.coroutine or yield from. However, the async/await syntax is now the standard.

await at the Top Level

await can only be used inside functions defined with async def (or within Jupyter Notebook cells). Using it inside a regular function will result in a SyntaxError.

Advanced Usage

This pattern shows how to execute multiple tasks stored in a list sequentially using a for loop. This is useful when you want to process items one by one to respect API rate limits rather than parallelizing them.

import asyncio

async def process_item(item_id: int):
    print(f"Item {item_id}: Starting process")
    await asyncio.sleep(0.5)
    print(f"Item {item_id}: Finished process")
    return item_id * 10

async def main_sequential_loop():
    items = [1, 2, 3, 4, 5]
    results = []
    
    print("--- Starting Sequential Loop ---")
    
    for item in items:
        # Using await inside the loop ensures the next task starts only after the previous one finishes
        res = await process_item(item)
        results.append(res)
        
    print("--- All Tasks Completed ---")
    print(f"Result List: {results}")

if __name__ == "__main__":
    asyncio.run(main_sequential_loop())

Summary

Sequential execution in asyncio is intuitive and similar to procedural programming.

  • Best for: When you need to use a result from a previous step, when processing order is strict, or when limiting concurrency to 1 due to API restrictions.
  • Key change: Avoid asyncio.gather to eliminate parallelism and use await to ensure completion.
  • Reminder: Unnecessary sequential execution can lower performance. Consider parallelization if there are no dependencies between tasks.

By organizing dependencies and using await in appropriate places, you can build asynchronous programs that are both readable and predictable.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次