[Python] Reading and Writing with Timeouts in multiprocessing.Queue

目次

Overview

multiprocessing.Queue is a process-safe and convenient tool. However, the default put() and get() methods block indefinitely until their conditions are met (i.e., until space becomes available or data arrives). To prevent your application from appearing frozen, it is crucial to use the timeout argument. This allows the program to “give up” and raise an exception after a specific period. This article explains how to set timeouts and how to correctly catch the resulting exceptions (queue.Full and queue.Empty).

Specifications (Input/Output)

  • Input:
    • A queue with a defined maxsize.
    • Timeout duration (in seconds).
  • Output:
    • On Success: Data is added to or retrieved from the queue.
    • On Failure (Timeout): A queue.Full or queue.Empty exception is raised.
  • Prerequisite: You must import the standard queue module to use the necessary exception classes.

Method Details

MethodArgument ExampleBehaviorException Raised
putq.put(val, timeout=2)Raises an error if the queue remains full for 2 seconds.queue.Full
getq.get(timeout=2)Raises an error if no data arrives within 2 seconds.queue.Empty

Basic Usage

Always import the queue module and wrap your operations in a try-except block.

import multiprocessing
import queue  # Required for exception classes

q = multiprocessing.Queue(maxsize=1)

# --- Writing (Put) ---
try:
    # Wait up to 3 seconds if the queue is full
    q.put("Data", timeout=3)
except queue.Full:
    print("Write operation timed out")

# --- Reading (Get) ---
try:
    # Wait up to 3 seconds if the queue is empty
    data = q.get(timeout=3)
except queue.Empty:
    print("Read operation timed out")

Full Code Example

The following code tests both the “Full Queue” and “Empty Queue” scenarios.

import multiprocessing
import queue
import time

def put_worker(q):
    """Worker that continuously adds data to the queue"""
    try:
        # The first item will be added successfully
        q.put("Item 1", timeout=1)
        print("[PutWorker] Successfully added Item 1")
        
        # The second item will wait because the queue is full, then time out
        print("[PutWorker] Waiting to add Item 2...")
        q.put("Item 2", timeout=2)
        print("[PutWorker] Successfully added Item 2")
        
    except queue.Full:
        print(">> [PutWorker] Error: Timeout (The queue is full)")

def get_worker(q):
    """Worker that continuously retrieves data from the queue"""
    try:
        # Attempt to retrieve data from an empty queue
        print("[GetWorker] Waiting to retrieve data...")
        data = q.get(timeout=2)
        print(f"[GetWorker] Successfully retrieved: {data}")
        
    except queue.Empty:
        print(">> [GetWorker] Error: Timeout (No data received)")

def main():
    print("--- 1. Put Timeout Test ---")
    # Create a queue with a maximum size of 1
    q_full = multiprocessing.Queue(maxsize=1)
    
    # Run in a worker process
    p1 = multiprocessing.Process(target=put_worker, args=(q_full,))
    p1.start()
    p1.join()

    print("\n--- 2. Get Timeout Test ---")
    # Create an empty queue
    q_empty = multiprocessing.Queue()
    
    p2 = multiprocessing.Process(target=get_worker, args=(q_empty,))
    p2.start()
    p2.join()

if __name__ == "__main__":
    main()

Example Output

--- 1. Put Timeout Test ---
[PutWorker] Successfully added Item 1
[PutWorker] Waiting to add Item 2...
>> [PutWorker] Error: Timeout (The queue is full)

--- 2. Get Timeout Test ---
[GetWorker] Waiting to retrieve data...
>> [GetWorker] Error: Timeout (No data received)

Customization Points

  • Non-blocking Mode (block=False): This is equivalent to timeout=0. If you want to explicitly state that the program should not wait at all, use q.put_nowait(obj) or q.get_nowait(). These also raise queue.Full or queue.Empty.
  • Retry Logic: You can create a loop that catches the timeout exception, logs the event, and then retries the operation.

Important Notes

  • Exception Module Location: A common mistake is trying to access multiprocessing.Full or multiprocessing.Queue.Full. You must use queue.Full and queue.Empty from the standard queue module.
  • Timeout Precision: Timeout durations are estimates. If the system load is very high, the exception may occur slightly after the specified time.
  • Avoiding Deadlocks: Setting a timeout helps avoid total deadlocks (where the program hangs forever), but the operation itself still fails. You must write recovery logic, such as retrying the task or terminating with an error.

Advanced Application

Here is an example of a “safe get” function that returns a default value if the retrieval times out.

import multiprocessing
import queue

def safe_get(q, timeout=0.5, default=None):
    """
    Returns the item from the queue if retrieved within the timeout;
    otherwise, returns the default value.
    """
    try:
        return q.get(timeout=timeout)
    except queue.Empty:
        return default

if __name__ == "__main__":
    q = multiprocessing.Queue()
    
    # Returns the default value because the queue is empty
    val = safe_get(q, default="No Data")
    print(f"Result: {val}")

Conclusion

By setting appropriate timeout values, you can build more resilient multi-process applications.

Design Tip: Decide during the design phase whether a timeout should be treated as a critical error or as part of the normal flow (such as in polling).Since infinite waiting often leads to bugs, it is recommended to set timeouts for any production-level code involving queue operations.

Best for: Scenarios where you cannot guarantee a response from another process or when you must prevent the UI/system from freezing.

Key Action: Always import the queue module and catch the correct exceptions.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次