[Python] Setting Timeouts for asyncio.Queue Operations

目次

Overview

By default, the put() and get() methods of asyncio.Queue block indefinitely if the queue is full or empty. However, in real-world applications, you often need to stop waiting after a certain period, such as “giving up if data doesn’t arrive” or “returning an error if a write isn’t possible.” This article explains how to use the standard asyncio.wait_for() function to set timeout limits on queue operations.


Specifications (Input/Output)

  • Input:
    • An asyncio.Queue (usually with a size limit).
    • Timeout duration in seconds.
  • Output:
    • If the operation completes within the limit: Returns the value (for get) or completes the storage (for put).
    • If the limit is exceeded: Catches asyncio.TimeoutError and handles the error.
  • Behavior: Wrapping the operation in wait_for(coroutine, timeout) forces the coroutine to cancel and raises an exception after the specified time.

Basic Usage

Pass the queue.get() or queue.put() call as the first argument to asyncio.wait_for.

try:
    # If no item is retrieved within 3 seconds, a TimeoutError occurs
    item = await asyncio.wait_for(queue.get(), timeout=3.0)
except asyncio.TimeoutError:
    print("Operation timed out")

Full Code Example

The following code demonstrates two scenarios: “Put Timeout” (when the queue is full) and “Get Timeout” (when the queue is empty).

import asyncio

async def test_put_timeout():
    """
    Example of a timeout while trying to put an item into a full queue.
    """
    print("\n--- Put Timeout Test ---")
    # Create a queue with a maximum size of 1
    q = asyncio.Queue(maxsize=1)
    
    # Add the first item (this succeeds immediately)
    await q.put("Item 1")
    print("Added the first item. The queue is now full.")

    try:
        print("Attempting to add a second item (timeout: 1.5s)...")
        # This will wait because the queue is full
        await asyncio.wait_for(q.put("Item 2"), timeout=1.5)
        
    except asyncio.TimeoutError:
        print(">> Timeout: No space became available within the time limit.")
    else:
        print(">> Success: Item added.")

async def test_get_timeout():
    """
    Example of a timeout while trying to get an item from an empty queue.
    """
    print("\n--- Get Timeout Test ---")
    q = asyncio.Queue()
    
    # Try to retrieve an item when the queue is empty
    try:
        print("Attempting to get an item (timeout: 1.5s)...")
        item = await asyncio.wait_for(q.get(), timeout=1.5)
        print(f">> Success: Received {item}.")
        
    except asyncio.TimeoutError:
        print(">> Timeout: No item arrived within the time limit.")

async def main():
    # Verify write timeout
    await test_put_timeout()
    
    # Verify read timeout
    await test_get_timeout()

if __name__ == "__main__":
    asyncio.run(main())

Example Output

--- Put Timeout Test ---
Added the first item. The queue is now full.
Attempting to add a second item (timeout: 1.5s)...
>> Timeout: No space became available within the time limit.

--- Get Timeout Test ---
Attempting to get an item (timeout: 1.5s)...
>> Timeout: No item arrived within the time limit.

Customization Points

  • Post-Timeout Behavior: Depending on your requirements, you can choose to retry the operation, use a default value, or stop the entire process after catching TimeoutError.
  • Constants for Timeouts: Managing timeouts with constants like TIMEOUT_SECONDS = 5.0 makes it easier to adjust system-wide response settings.

Important Notes

  • Difference from get_nowait / put_nowait:
    • q.get_nowait() raises an exception (QueueEmpty) immediately without waiting even for a second.
    • wait_for waits for a specific duration. This is better if there is a chance data will arrive shortly.
  • Task Cancellation: When asyncio.TimeoutError occurs, the pending q.put() or q.get() operation is canceled. This ensures data integrity because the operation is treated as if it never happened, preventing “partial” states.
  • Negative Timeout Values: If you set the timeout to 0 or a negative number, it behaves similarly to get_nowait().

Advanced Application

The following helper function retrieves data if available within the time limit or returns a default value (like None) to continue processing.

import asyncio
from typing import Any, Optional

async def get_or_default(queue: asyncio.Queue, timeout: float, default: Any = None) -> Any:
    """
    Returns the item from the queue if retrieved within the timeout,
    otherwise returns the default value.
    """
    try:
        return await asyncio.wait_for(queue.get(), timeout)
    except asyncio.TimeoutError:
        return default

async def main_application():
    q = asyncio.Queue()
    
    # Returns "No Data" because the queue is empty
    result = await get_or_default(q, timeout=1.0, default="No Data")
    print(f"Result: {result}")
    
    # Add data first
    await q.put("Valid Data")
    # Returns the data immediately
    result = await get_or_default(q, timeout=1.0)
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main_application())

Conclusion

Combining asyncio.wait_for with asyncio.Queue allows you to build robust asynchronous workflows.

  • Ideal for: Waiting for responses from external systems or preventing deadlocks while waiting for buffer space.
  • Adjustment: Tune the timeout duration based on the importance of the task and acceptable latency.
  • Best Practice: Always import asyncio and use asyncio.TimeoutError to ensure compatibility across different Python versions.

By setting appropriate timeouts, you can prevent your application from freezing due to external factors.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次