Overview
multiprocessing.Queue is a process-safe and convenient tool. However, the default put() and get() methods block indefinitely until their conditions are met (i.e., until space becomes available or data arrives). To prevent your application from appearing frozen, it is crucial to use the timeout argument. This allows the program to “give up” and raise an exception after a specific period. This article explains how to set timeouts and how to correctly catch the resulting exceptions (queue.Full and queue.Empty).
Specifications (Input/Output)
- Input:
- A queue with a defined
maxsize. - Timeout duration (in seconds).
- A queue with a defined
- Output:
- On Success: Data is added to or retrieved from the queue.
- On Failure (Timeout): A
queue.Fullorqueue.Emptyexception is raised.
- Prerequisite: You must import the standard
queuemodule to use the necessary exception classes.
Method Details
| Method | Argument Example | Behavior | Exception Raised |
| put | q.put(val, timeout=2) | Raises an error if the queue remains full for 2 seconds. | queue.Full |
| get | q.get(timeout=2) | Raises an error if no data arrives within 2 seconds. | queue.Empty |
Basic Usage
Always import the queue module and wrap your operations in a try-except block.
import multiprocessing
import queue # Required for exception classes
q = multiprocessing.Queue(maxsize=1)
# --- Writing (Put) ---
try:
# Wait up to 3 seconds if the queue is full
q.put("Data", timeout=3)
except queue.Full:
print("Write operation timed out")
# --- Reading (Get) ---
try:
# Wait up to 3 seconds if the queue is empty
data = q.get(timeout=3)
except queue.Empty:
print("Read operation timed out")
Full Code Example
The following code tests both the “Full Queue” and “Empty Queue” scenarios.
import multiprocessing
import queue
import time
def put_worker(q):
"""Worker that continuously adds data to the queue"""
try:
# The first item will be added successfully
q.put("Item 1", timeout=1)
print("[PutWorker] Successfully added Item 1")
# The second item will wait because the queue is full, then time out
print("[PutWorker] Waiting to add Item 2...")
q.put("Item 2", timeout=2)
print("[PutWorker] Successfully added Item 2")
except queue.Full:
print(">> [PutWorker] Error: Timeout (The queue is full)")
def get_worker(q):
"""Worker that continuously retrieves data from the queue"""
try:
# Attempt to retrieve data from an empty queue
print("[GetWorker] Waiting to retrieve data...")
data = q.get(timeout=2)
print(f"[GetWorker] Successfully retrieved: {data}")
except queue.Empty:
print(">> [GetWorker] Error: Timeout (No data received)")
def main():
print("--- 1. Put Timeout Test ---")
# Create a queue with a maximum size of 1
q_full = multiprocessing.Queue(maxsize=1)
# Run in a worker process
p1 = multiprocessing.Process(target=put_worker, args=(q_full,))
p1.start()
p1.join()
print("\n--- 2. Get Timeout Test ---")
# Create an empty queue
q_empty = multiprocessing.Queue()
p2 = multiprocessing.Process(target=get_worker, args=(q_empty,))
p2.start()
p2.join()
if __name__ == "__main__":
main()
Example Output
--- 1. Put Timeout Test ---
[PutWorker] Successfully added Item 1
[PutWorker] Waiting to add Item 2...
>> [PutWorker] Error: Timeout (The queue is full)
--- 2. Get Timeout Test ---
[GetWorker] Waiting to retrieve data...
>> [GetWorker] Error: Timeout (No data received)
Customization Points
- Non-blocking Mode (
block=False): This is equivalent totimeout=0. If you want to explicitly state that the program should not wait at all, useq.put_nowait(obj)orq.get_nowait(). These also raisequeue.Fullorqueue.Empty. - Retry Logic: You can create a loop that catches the timeout exception, logs the event, and then retries the operation.
Important Notes
- Exception Module Location: A common mistake is trying to access
multiprocessing.Fullormultiprocessing.Queue.Full. You must usequeue.Fullandqueue.Emptyfrom the standardqueuemodule. - Timeout Precision: Timeout durations are estimates. If the system load is very high, the exception may occur slightly after the specified time.
- Avoiding Deadlocks: Setting a timeout helps avoid total deadlocks (where the program hangs forever), but the operation itself still fails. You must write recovery logic, such as retrying the task or terminating with an error.
Advanced Application
Here is an example of a “safe get” function that returns a default value if the retrieval times out.
import multiprocessing
import queue
def safe_get(q, timeout=0.5, default=None):
"""
Returns the item from the queue if retrieved within the timeout;
otherwise, returns the default value.
"""
try:
return q.get(timeout=timeout)
except queue.Empty:
return default
if __name__ == "__main__":
q = multiprocessing.Queue()
# Returns the default value because the queue is empty
val = safe_get(q, default="No Data")
print(f"Result: {val}")
Conclusion
By setting appropriate timeout values, you can build more resilient multi-process applications.
Design Tip: Decide during the design phase whether a timeout should be treated as a critical error or as part of the normal flow (such as in polling).Since infinite waiting often leads to bugs, it is recommended to set timeouts for any production-level code involving queue operations.
Best for: Scenarios where you cannot guarantee a response from another process or when you must prevent the UI/system from freezing.
Key Action: Always import the queue module and catch the correct exceptions.
