目次
概要
非同期関数(コルーチン)の中で、処理を一定時間停止させたい場合は await asyncio.sleep(秒数) を使用します。
通常の time.sleep() と異なり、待機している間、イベントループに制御を返すため、その間に他の並行タスクを実行できる(ノンブロッキング)のが最大の特徴です。
仕様(入出力)
- 入力: 待機したい秒数(float または int)
- 出力: 指定時間の待機完了
- 機能:
- 現在のタスクを指定時間サスペンド(一時停止)する。
- 待機中にイベントループが他のタスクを実行できるようにする。
構文と意味
| 構文 | 引数例 | 処理内容 |
await asyncio.sleep(delay) | 1, 0.5 | 指定した秒数だけ処理を停止します。必ず await を付けて呼び出します。 |
time.sleep(delay) | 1 | (非推奨) プログラム全体(イベントループ)を停止させます。非同期処理の中で使うと、並列性が損なわれます。 |
基本の使い方
asyncio.sleep を使用して1秒待機する基本的なコードです。
import asyncio
async def main():
print("スリープ開始")
# 1秒間待機(この間、他のタスクがあれば動ける)
await asyncio.sleep(1)
print("スリープ終了")
return "result value"
if __name__ == "__main__":
coroutine_obj = main()
result = asyncio.run(coroutine_obj)
print(result)
コード全文
asyncio.sleep のノンブロッキングな性質を確認するために、2つのタスクを同時に実行するデモです。
一方がスリープしている間に、もう一方が動いていることが確認できます。
import asyncio
import time
async def task_a():
print("[Task A] 開始: 2秒待ちます...")
# ここで2秒待つが、その間 Task B が動ける
await asyncio.sleep(2)
print("[Task A] 完了")
return "A"
async def task_b():
print(" [Task B] 開始: 処理中...")
# 短い待機を繰り返す
for i in range(3):
print(f" [Task B] 処理 {i+1}/3")
await asyncio.sleep(0.5)
print(" [Task B] 完了")
return "B"
async def main():
print("--- 並行実行開始 ---")
start = time.time()
# gatherを使って2つのタスクを同時に実行
await asyncio.gather(task_a(), task_b())
elapsed = time.time() - start
print(f"--- 全処理終了: {elapsed:.2f}秒 ---")
# Task A(2秒) と Task B(1.5秒) が同時に動くため、
# 合計は 3.5秒 ではなく、長い方の 約2秒 で終わる
if __name__ == "__main__":
asyncio.run(main())
注意点
非同期処理の意味がなくなるため、原則として time.sleep は使用しないでください。
awaitの付け忘れ
asyncio.sleep(1) とだけ書いて await を忘れると、スリープは実行されず、即座に次の行に進んでしまいます(コルーチンオブジェクトが作られて捨てられるだけになります)。
必ず await asyncio.sleep(1) と記述してください。
time.sleepの使用
async def の中で time.sleep(1) を使うと、その1秒間はイベントループ自体が停止するため、全ての非同期タスクが止まります。
