Overview
This article explains how to cancel running tasks in asyncio to stop unnecessary processes, handle user interruptions, or manage timeouts. By using the task.cancel() method, you can send an asyncio.CancelledError exception into the target task, allowing for a safe and clean interruption flow.
Specifications (Input/Output)
- Input:
- A long-running task (coroutine) that can be cancelled.
- Timing for triggering the cancellation (controlled by the main process).
- Output:
- Standard output logs when cancellation occurs.
- Return values or exceptions sent after the task is interrupted.
- Operational Logic:
- Start a background task using
asyncio.create_task(). - Wait for a specific duration in the main process, then call
task.cancel(). - The task catches the
asyncio.CancelledErrorinternally to perform cleanup and exit.
- Start a background task using
Basic Usage
Call .cancel() on the task object. The task can receive the cancellation notice within a try-except block.
# Request cancellation
task.cancel()
try:
# Wait for the task to finish (the exception may be re-raised here)
await task
except asyncio.CancelledError:
print("Task cancellation confirmed")
Full Code
This code demonstrates a scenario where a “heavy worker task” is forcibly cancelled by the main process, and the worker detects this to perform cleanup.
import asyncio
async def heavy_worker():
"""
Simulation of a long-running process.
Implemented to handle cancellation.
"""
print("[Worker] Task started. Executing heavy process...")
try:
# Simulate a long process (5-second wait)
# If cancelled during this wait, CancelledError occurs here
await asyncio.sleep(5)
print("[Worker] Process completed normally.")
return "Done"
except asyncio.CancelledError:
# Handling the cancellation
print("[Worker] Cancellation notice received. Interrupting process.")
# Perform resource cleanup if necessary
# Return a value to exit
# Note: raising the error again will propagate it to the caller
return "Canceled"
async def main():
"""
Main process that creates and cancels a task midway.
"""
print("--- Main Process Started ---")
# Create task (starts execution)
task = asyncio.create_task(heavy_worker())
# Wait for the task to progress slightly (1 second)
await asyncio.sleep(1)
print(">>> Main: Requesting task cancellation...")
task.cancel()
# Wait for the task to finish and receive the result
try:
status = await task
print(f"Main: Task result -> {status}")
except asyncio.CancelledError:
# Catch here if the task re-raises the cancellation exception
print("Main: Task raised a cancellation exception")
print("--- Main Process Finished ---")
if __name__ == "__main__":
asyncio.run(main())
Sample Execution Result
--- Main Process Started ---
[Worker] Task started. Executing heavy process...
>>> Main: Requesting task cancellation...
[Worker] Cancellation notice received. Interrupting process.
Main: Task result -> Canceled
--- Main Process Finished ---
Customization Points
Guaranteeing Cleanup (try-finally)
If you have cleanup processes (like closing files or disconnecting databases) that must run regardless of whether the task finished normally or was cancelled, using a finally block is the safest approach.
Refusing Cancellation
While not recommended, it is technically possible to ignore a cancellation request by catching CancelledError and continuing the process, similar to ignoring a system signal.
Message Feature in Python 3.9+
Since Python 3.9, you can pass a reason like task.cancel(msg="Timeout occurred"), which is helpful for debugging.
Important Notes
Not an Immediate Stop
Calling task.cancel() does not stop the task until it reaches an await point (such as asyncio.sleep or an I/O wait). Cancellation will not work inside a CPU-bound calculation loop unless you manually insert await asyncio.sleep(0).
Exception Propagation
If you catch CancelledError inside the task and do not return a value (or if you raise it), another CancelledError will occur at the point where you await task. You must decide how the caller should handle this.
Shielding (asyncio.shield)
If a process is critical and should not be cancelled externally, you can wrap it in await asyncio.shield(task) to prevent external cancellation requests from affecting it.
Advanced Usage
This pattern uses the standard asyncio.wait_for to cancel a task if it does not finish within a set time.
import asyncio
async def slow_process():
print("[Process] Started")
try:
await asyncio.sleep(3) # Process taking 3 seconds
return "Complete"
except asyncio.CancelledError:
print("[Process] Interrupted due to timeout")
raise # Re-raise to inform wait_for
async def main_timeout():
try:
# Timeout after 1 second
result = await asyncio.wait_for(slow_process(), timeout=1.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Main: Timed out (Task already cancelled)")
if __name__ == "__main__":
asyncio.run(main_timeout())
Summary
Task cancellation is essential knowledge for building responsive asynchronous applications.
- Best for: User “stop” buttons, request timeouts, and stopping all tasks when an application shuts down.
- Key change: Catch
CancelledErrorwithin the task to perform proper post-processing (rollbacks or resource releases). - Reminder: Be aware that cancellation does not respond immediately to CPU-intensive tasks without
awaitpoints.
Managing tasks by calling cancel() when they are no longer needed results in robust code free of resource leaks.
