【Python】実行中のasyncioタスクを外部からキャンセルする方法

目次

概要

asyncio を使った非同期処理において、ユーザーの中断操作やタイムアウト、あるいは不要になった処理を停止させるために、実行中のタスクをキャンセルする方法を解説します。 task.cancel() メソッドを使用することで、対象のタスク内部に asyncio.CancelledError 例外を送出し、安全に処理を中断・終了させるフローを実装できます。

仕様(入出力)

  • 入力
    • キャンセル可能な長時間実行タスク(コルーチン)。
    • キャンセルを実行するタイミング(メイン処理側での制御)。
  • 出力
    • キャンセル発生時の標準出力ログ。
    • タスクが中断された後の戻り値(または例外の送出)。
  • 動作仕様
    1. asyncio.create_task() でバックグラウンドタスクを開始。
    2. メイン処理で一定時間待機後、task.cancel() を呼び出す。
    3. タスク内部で asyncio.CancelledError が発生するので、これを捕捉して終了処理を行う。

基本の使い方

タスクオブジェクトに対して .cancel() を呼び出します。 タスク側では try-except ブロックでキャンセル通知を受け取ることができます。

# キャンセルを要求
task.cancel()

try:
    # タスクの終了を待つ(ここで例外が再送出される場合がある)
    await task
except asyncio.CancelledError:
    print("タスクのキャンセルを確認しました")

コード全文

ここでは、「重い処理を実行しているタスク」をメイン処理から強制的にキャンセルし、タスク側でそれを検知してクリーンアップ(終了ログ出力)を行うコードを提示します。

import asyncio

async def heavy_worker():
    """
    長時間実行される処理のシミュレーション。
    キャンセルに対応できるよう実装します。
    """
    print("[Worker] タスクを開始しました。重い処理を実行中...")
    
    try:
        # 長時間の処理をシミュレート(5秒待機)
        # この待機中にキャンセルされると、ここで CancelledError が発生します
        await asyncio.sleep(5)
        
        print("[Worker] 処理が正常に完了しました。")
        return "Done"
        
    except asyncio.CancelledError:
        # キャンセルされた場合の処理
        print("[Worker] キャンセル通知を受信しました。処理を中断します。")
        # 必要に応じてリソースの開放などを行う
        
        # 値を返して終了する場合は return する
        # ※ ここで raise を呼ぶと、呼び出し元へ例外が伝播します
        return "Canceled"

async def main():
    """
    タスクを生成し、途中でキャンセルするメイン処理。
    """
    print("--- メイン処理開始 ---")
    
    # タスクを生成(実行開始)
    task = asyncio.create_task(heavy_worker())
    
    # タスクが少し進むのを待つ(1秒間)
    await asyncio.sleep(1)
    
    print(">>> メイン: タスクのキャンセルを要求します...")
    task.cancel()
    
    # タスクの終了を待機し、結果を受け取る
    try:
        status = await task
        print(f"メイン: タスクの結果 -> {status}")
    except asyncio.CancelledError:
        # タスク内で例外を再送出した場合はここでキャッチする
        print("メイン: タスクはキャンセル例外を送出しました")
        
    print("--- メイン処理終了 ---")

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

--- メイン処理開始 ---
[Worker] タスクを開始しました。重い処理を実行中...
>>> メイン: タスクのキャンセルを要求します...
[Worker] キャンセル通知を受信しました。処理を中断します。
メイン: タスクの結果 -> Canceled
--- メイン処理終了 ---

カスタムポイント

  • クリーンアップ処理の保証 (try-finally)
    • キャンセル時だけでなく、正常終了やエラー時も含めて必ず実行したい終了処理(ファイルのクローズ、DB切断など)がある場合は、finally ブロックを使用するのが安全です。
  • キャンセルの拒否
    • 原則として推奨されませんが、CancelledError をキャッチしてそのまま処理を続行することで、キャンセルを無視することも技術的には可能です(シグナルを無視する挙動に近い)。
  • Python 3.9以降のメッセージ機能
    • task.cancel(msg="タイムアウトしました") のように理由を渡すことができ、デバッグ時に役立ちます。

注意点

  1. 即時停止ではない
    • task.cancel() を呼んでも、タスクが await ポイントasyncio.sleep や I/O待ち)に到達するまでは処理が止まりません。CPUバウンドな計算ループ内ではキャンセルが効かないため、適宜 await asyncio.sleep(0) を挟む必要があります。
  2. 例外の伝播
    • タスク内部で CancelledErrorexcept して return しない(あるいは raise する)場合、await task した箇所で再び CancelledError が発生します。呼び出し元でのハンドリング方針を決めておく必要があります。
  3. シールド (asyncio.shield)
    • 「重要な処理なのでキャンセルされたくない」場合は、await asyncio.shield(task) で包むことで、外からのキャンセル要求を防ぐことができます。

応用

「一定時間内に終わらなければキャンセルする(タイムアウト)」というパターンです。 手動で sleepcancel を組み合わせるよりも、標準機能の asyncio.wait_for を使うのが一般的です。

import asyncio

async def slow_process():
    print("[Process] 開始")
    try:
        await asyncio.sleep(3)  # 3秒かかる処理
        return "完了"
    except asyncio.CancelledError:
        print("[Process] 時間切れで中断されました")
        raise  # 例外を再送出して wait_for 側に伝える

async def main_timeout():
    try:
        # 1秒でタイムアウトさせる
        result = await asyncio.wait_for(slow_process(), timeout=1.0)
        print(f"結果: {result}")
    except asyncio.TimeoutError:
        print("メイン: タイムアウトしました(タスクはキャンセル済み)")

if __name__ == "__main__":
    asyncio.run(main_timeout())

まとめ

タスクのキャンセルは、応答性の高い非同期アプリケーションを作る上で必須の知識です。

  • 向く場面: ユーザーによる中断ボタン、リクエストのタイムアウト、アプリ終了時の全タスク停止処理。
  • 変更ポイント: タスク内で CancelledError を捕捉し、適切な後処理(ロールバックやリソース開放)を行ってください。
  • 注意点: 計算集中型の処理には await がないため、キャンセルが即座に反応しないことに注意してください。

create_task したものは適切に管理し、不要になったら cancel する癖をつけると、リソースリークのない堅牢なコードになります。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次