【Python】How to Cancel Running asyncio Tasks Externally

目次

Overview

This article explains how to cancel running tasks in asyncio to stop unnecessary processes, handle user interruptions, or manage timeouts. By using the task.cancel() method, you can send an asyncio.CancelledError exception into the target task, allowing for a safe and clean interruption flow.

Specifications (Input/Output)

  • Input:
    • A long-running task (coroutine) that can be cancelled.
    • Timing for triggering the cancellation (controlled by the main process).
  • Output:
    • Standard output logs when cancellation occurs.
    • Return values or exceptions sent after the task is interrupted.
  • Operational Logic:
    1. Start a background task using asyncio.create_task().
    2. Wait for a specific duration in the main process, then call task.cancel().
    3. The task catches the asyncio.CancelledError internally to perform cleanup and exit.

Basic Usage

Call .cancel() on the task object. The task can receive the cancellation notice within a try-except block.

# Request cancellation
task.cancel()

try:
    # Wait for the task to finish (the exception may be re-raised here)
    await task
except asyncio.CancelledError:
    print("Task cancellation confirmed")

Full Code

This code demonstrates a scenario where a “heavy worker task” is forcibly cancelled by the main process, and the worker detects this to perform cleanup.

import asyncio

async def heavy_worker():
    """
    Simulation of a long-running process.
    Implemented to handle cancellation.
    """
    print("[Worker] Task started. Executing heavy process...")
    
    try:
        # Simulate a long process (5-second wait)
        # If cancelled during this wait, CancelledError occurs here
        await asyncio.sleep(5)
        
        print("[Worker] Process completed normally.")
        return "Done"
        
    except asyncio.CancelledError:
        # Handling the cancellation
        print("[Worker] Cancellation notice received. Interrupting process.")
        # Perform resource cleanup if necessary
        
        # Return a value to exit
        # Note: raising the error again will propagate it to the caller
        return "Canceled"

async def main():
    """
    Main process that creates and cancels a task midway.
    """
    print("--- Main Process Started ---")
    
    # Create task (starts execution)
    task = asyncio.create_task(heavy_worker())
    
    # Wait for the task to progress slightly (1 second)
    await asyncio.sleep(1)
    
    print(">>> Main: Requesting task cancellation...")
    task.cancel()
    
    # Wait for the task to finish and receive the result
    try:
        status = await task
        print(f"Main: Task result -> {status}")
    except asyncio.CancelledError:
        # Catch here if the task re-raises the cancellation exception
        print("Main: Task raised a cancellation exception")
        
    print("--- Main Process Finished ---")

if __name__ == "__main__":
    asyncio.run(main())

Sample Execution Result

--- Main Process Started ---
[Worker] Task started. Executing heavy process...
>>> Main: Requesting task cancellation...
[Worker] Cancellation notice received. Interrupting process.
Main: Task result -> Canceled
--- Main Process Finished ---

Customization Points

Guaranteeing Cleanup (try-finally)

If you have cleanup processes (like closing files or disconnecting databases) that must run regardless of whether the task finished normally or was cancelled, using a finally block is the safest approach.

Refusing Cancellation

While not recommended, it is technically possible to ignore a cancellation request by catching CancelledError and continuing the process, similar to ignoring a system signal.

Message Feature in Python 3.9+

Since Python 3.9, you can pass a reason like task.cancel(msg="Timeout occurred"), which is helpful for debugging.

Important Notes

Not an Immediate Stop

Calling task.cancel() does not stop the task until it reaches an await point (such as asyncio.sleep or an I/O wait). Cancellation will not work inside a CPU-bound calculation loop unless you manually insert await asyncio.sleep(0).

Exception Propagation

If you catch CancelledError inside the task and do not return a value (or if you raise it), another CancelledError will occur at the point where you await task. You must decide how the caller should handle this.

Shielding (asyncio.shield)

If a process is critical and should not be cancelled externally, you can wrap it in await asyncio.shield(task) to prevent external cancellation requests from affecting it.

Advanced Usage

This pattern uses the standard asyncio.wait_for to cancel a task if it does not finish within a set time.

import asyncio

async def slow_process():
    print("[Process] Started")
    try:
        await asyncio.sleep(3)  # Process taking 3 seconds
        return "Complete"
    except asyncio.CancelledError:
        print("[Process] Interrupted due to timeout")
        raise  # Re-raise to inform wait_for

async def main_timeout():
    try:
        # Timeout after 1 second
        result = await asyncio.wait_for(slow_process(), timeout=1.0)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Main: Timed out (Task already cancelled)")

if __name__ == "__main__":
    asyncio.run(main_timeout())

Summary

Task cancellation is essential knowledge for building responsive asynchronous applications.

  • Best for: User “stop” buttons, request timeouts, and stopping all tasks when an application shuts down.
  • Key change: Catch CancelledError within the task to perform proper post-processing (rollbacks or resource releases).
  • Reminder: Be aware that cancellation does not respond immediately to CPU-intensive tasks without await points.

Managing tasks by calling cancel() when they are no longer needed results in robust code free of resource leaks.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次