Overview
By default, the put() and get() methods of asyncio.Queue block indefinitely if the queue is full or empty. However, in real-world applications, you often need to stop waiting after a certain period, such as “giving up if data doesn’t arrive” or “returning an error if a write isn’t possible.” This article explains how to use the standard asyncio.wait_for() function to set timeout limits on queue operations.
Specifications (Input/Output)
- Input:
- An
asyncio.Queue(usually with a size limit). - Timeout duration in seconds.
- An
- Output:
- If the operation completes within the limit: Returns the value (for
get) or completes the storage (forput). - If the limit is exceeded: Catches
asyncio.TimeoutErrorand handles the error.
- If the operation completes within the limit: Returns the value (for
- Behavior: Wrapping the operation in
wait_for(coroutine, timeout)forces the coroutine to cancel and raises an exception after the specified time.
Basic Usage
Pass the queue.get() or queue.put() call as the first argument to asyncio.wait_for.
try:
# If no item is retrieved within 3 seconds, a TimeoutError occurs
item = await asyncio.wait_for(queue.get(), timeout=3.0)
except asyncio.TimeoutError:
print("Operation timed out")
Full Code Example
The following code demonstrates two scenarios: “Put Timeout” (when the queue is full) and “Get Timeout” (when the queue is empty).
import asyncio
async def test_put_timeout():
"""
Example of a timeout while trying to put an item into a full queue.
"""
print("\n--- Put Timeout Test ---")
# Create a queue with a maximum size of 1
q = asyncio.Queue(maxsize=1)
# Add the first item (this succeeds immediately)
await q.put("Item 1")
print("Added the first item. The queue is now full.")
try:
print("Attempting to add a second item (timeout: 1.5s)...")
# This will wait because the queue is full
await asyncio.wait_for(q.put("Item 2"), timeout=1.5)
except asyncio.TimeoutError:
print(">> Timeout: No space became available within the time limit.")
else:
print(">> Success: Item added.")
async def test_get_timeout():
"""
Example of a timeout while trying to get an item from an empty queue.
"""
print("\n--- Get Timeout Test ---")
q = asyncio.Queue()
# Try to retrieve an item when the queue is empty
try:
print("Attempting to get an item (timeout: 1.5s)...")
item = await asyncio.wait_for(q.get(), timeout=1.5)
print(f">> Success: Received {item}.")
except asyncio.TimeoutError:
print(">> Timeout: No item arrived within the time limit.")
async def main():
# Verify write timeout
await test_put_timeout()
# Verify read timeout
await test_get_timeout()
if __name__ == "__main__":
asyncio.run(main())
Example Output
--- Put Timeout Test ---
Added the first item. The queue is now full.
Attempting to add a second item (timeout: 1.5s)...
>> Timeout: No space became available within the time limit.
--- Get Timeout Test ---
Attempting to get an item (timeout: 1.5s)...
>> Timeout: No item arrived within the time limit.
Customization Points
- Post-Timeout Behavior: Depending on your requirements, you can choose to retry the operation, use a default value, or stop the entire process after catching
TimeoutError. - Constants for Timeouts: Managing timeouts with constants like
TIMEOUT_SECONDS = 5.0makes it easier to adjust system-wide response settings.
Important Notes
- Difference from get_nowait / put_nowait:
q.get_nowait()raises an exception (QueueEmpty) immediately without waiting even for a second.wait_forwaits for a specific duration. This is better if there is a chance data will arrive shortly.
- Task Cancellation: When
asyncio.TimeoutErroroccurs, the pendingq.put()orq.get()operation is canceled. This ensures data integrity because the operation is treated as if it never happened, preventing “partial” states. - Negative Timeout Values: If you set the
timeoutto 0 or a negative number, it behaves similarly toget_nowait().
Advanced Application
The following helper function retrieves data if available within the time limit or returns a default value (like None) to continue processing.
import asyncio
from typing import Any, Optional
async def get_or_default(queue: asyncio.Queue, timeout: float, default: Any = None) -> Any:
"""
Returns the item from the queue if retrieved within the timeout,
otherwise returns the default value.
"""
try:
return await asyncio.wait_for(queue.get(), timeout)
except asyncio.TimeoutError:
return default
async def main_application():
q = asyncio.Queue()
# Returns "No Data" because the queue is empty
result = await get_or_default(q, timeout=1.0, default="No Data")
print(f"Result: {result}")
# Add data first
await q.put("Valid Data")
# Returns the data immediately
result = await get_or_default(q, timeout=1.0)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main_application())
Conclusion
Combining asyncio.wait_for with asyncio.Queue allows you to build robust asynchronous workflows.
- Ideal for: Waiting for responses from external systems or preventing deadlocks while waiting for buffer space.
- Adjustment: Tune the timeout duration based on the importance of the task and acceptable latency.
- Best Practice: Always import
asyncioand useasyncio.TimeoutErrorto ensure compatibility across different Python versions.
By setting appropriate timeouts, you can prevent your application from freezing due to external factors.
