目次
概要
asyncio を使った非同期処理において、ユーザーの中断操作やタイムアウト、あるいは不要になった処理を停止させるために、実行中のタスクをキャンセルする方法を解説します。 task.cancel() メソッドを使用することで、対象のタスク内部に asyncio.CancelledError 例外を送出し、安全に処理を中断・終了させるフローを実装できます。
仕様(入出力)
- 入力
- キャンセル可能な長時間実行タスク(コルーチン)。
- キャンセルを実行するタイミング(メイン処理側での制御)。
- 出力
- キャンセル発生時の標準出力ログ。
- タスクが中断された後の戻り値(または例外の送出)。
- 動作仕様
asyncio.create_task()でバックグラウンドタスクを開始。- メイン処理で一定時間待機後、
task.cancel()を呼び出す。 - タスク内部で
asyncio.CancelledErrorが発生するので、これを捕捉して終了処理を行う。
基本の使い方
タスクオブジェクトに対して .cancel() を呼び出します。 タスク側では try-except ブロックでキャンセル通知を受け取ることができます。
# キャンセルを要求
task.cancel()
try:
# タスクの終了を待つ(ここで例外が再送出される場合がある)
await task
except asyncio.CancelledError:
print("タスクのキャンセルを確認しました")
コード全文
ここでは、「重い処理を実行しているタスク」をメイン処理から強制的にキャンセルし、タスク側でそれを検知してクリーンアップ(終了ログ出力)を行うコードを提示します。
import asyncio
async def heavy_worker():
"""
長時間実行される処理のシミュレーション。
キャンセルに対応できるよう実装します。
"""
print("[Worker] タスクを開始しました。重い処理を実行中...")
try:
# 長時間の処理をシミュレート(5秒待機)
# この待機中にキャンセルされると、ここで CancelledError が発生します
await asyncio.sleep(5)
print("[Worker] 処理が正常に完了しました。")
return "Done"
except asyncio.CancelledError:
# キャンセルされた場合の処理
print("[Worker] キャンセル通知を受信しました。処理を中断します。")
# 必要に応じてリソースの開放などを行う
# 値を返して終了する場合は return する
# ※ ここで raise を呼ぶと、呼び出し元へ例外が伝播します
return "Canceled"
async def main():
"""
タスクを生成し、途中でキャンセルするメイン処理。
"""
print("--- メイン処理開始 ---")
# タスクを生成(実行開始)
task = asyncio.create_task(heavy_worker())
# タスクが少し進むのを待つ(1秒間)
await asyncio.sleep(1)
print(">>> メイン: タスクのキャンセルを要求します...")
task.cancel()
# タスクの終了を待機し、結果を受け取る
try:
status = await task
print(f"メイン: タスクの結果 -> {status}")
except asyncio.CancelledError:
# タスク内で例外を再送出した場合はここでキャッチする
print("メイン: タスクはキャンセル例外を送出しました")
print("--- メイン処理終了 ---")
if __name__ == "__main__":
asyncio.run(main())
実行結果例
--- メイン処理開始 ---
[Worker] タスクを開始しました。重い処理を実行中...
>>> メイン: タスクのキャンセルを要求します...
[Worker] キャンセル通知を受信しました。処理を中断します。
メイン: タスクの結果 -> Canceled
--- メイン処理終了 ---
カスタムポイント
- クリーンアップ処理の保証 (
try-finally)- キャンセル時だけでなく、正常終了やエラー時も含めて必ず実行したい終了処理(ファイルのクローズ、DB切断など)がある場合は、
finallyブロックを使用するのが安全です。
- キャンセル時だけでなく、正常終了やエラー時も含めて必ず実行したい終了処理(ファイルのクローズ、DB切断など)がある場合は、
- キャンセルの拒否
- 原則として推奨されませんが、
CancelledErrorをキャッチしてそのまま処理を続行することで、キャンセルを無視することも技術的には可能です(シグナルを無視する挙動に近い)。
- 原則として推奨されませんが、
- Python 3.9以降のメッセージ機能
task.cancel(msg="タイムアウトしました")のように理由を渡すことができ、デバッグ時に役立ちます。
注意点
- 即時停止ではない
task.cancel()を呼んでも、タスクがawaitポイント(asyncio.sleepや I/O待ち)に到達するまでは処理が止まりません。CPUバウンドな計算ループ内ではキャンセルが効かないため、適宜await asyncio.sleep(0)を挟む必要があります。
- 例外の伝播
- タスク内部で
CancelledErrorをexceptしてreturnしない(あるいはraiseする)場合、await taskした箇所で再びCancelledErrorが発生します。呼び出し元でのハンドリング方針を決めておく必要があります。
- タスク内部で
- シールド (
asyncio.shield)- 「重要な処理なのでキャンセルされたくない」場合は、
await asyncio.shield(task)で包むことで、外からのキャンセル要求を防ぐことができます。
- 「重要な処理なのでキャンセルされたくない」場合は、
応用
「一定時間内に終わらなければキャンセルする(タイムアウト)」というパターンです。 手動で sleep と cancel を組み合わせるよりも、標準機能の asyncio.wait_for を使うのが一般的です。
import asyncio
async def slow_process():
print("[Process] 開始")
try:
await asyncio.sleep(3) # 3秒かかる処理
return "完了"
except asyncio.CancelledError:
print("[Process] 時間切れで中断されました")
raise # 例外を再送出して wait_for 側に伝える
async def main_timeout():
try:
# 1秒でタイムアウトさせる
result = await asyncio.wait_for(slow_process(), timeout=1.0)
print(f"結果: {result}")
except asyncio.TimeoutError:
print("メイン: タイムアウトしました(タスクはキャンセル済み)")
if __name__ == "__main__":
asyncio.run(main_timeout())
まとめ
タスクのキャンセルは、応答性の高い非同期アプリケーションを作る上で必須の知識です。
- 向く場面: ユーザーによる中断ボタン、リクエストのタイムアウト、アプリ終了時の全タスク停止処理。
- 変更ポイント: タスク内で
CancelledErrorを捕捉し、適切な後処理(ロールバックやリソース開放)を行ってください。 - 注意点: 計算集中型の処理には
awaitがないため、キャンセルが即座に反応しないことに注意してください。
create_task したものは適切に管理し、不要になったら cancel する癖をつけると、リソースリークのない堅牢なコードになります。
