[Python] Transferring Data Between Processes Using multiprocessing.Queue

目次

Overview

Python’s multiprocessing.Queue is a FIFO (First-In, First-Out) queue designed for safe data exchange between multiple processes.

While it shares a similar interface with the threading module’s queue, this version uses pipes and locks internally. It is designed to be process-safe, ensuring that data is not corrupted even when multiple processes access it simultaneously.

This article explains the basic usage and implementation patterns for preventing data race conditions using a queue.

Specifications (Input/Output)

Basic Specifications of multiprocessing.Queue

  • Initialization: Queue(maxsize=0)
  • maxsize: The limit on the number of items in the queue. If it is 0 or less, the size is infinite (restricted only by available memory).

Main Methods

MethodSyntaxDescriptionBlocking (Wait)
Addq.put(obj, block=True, timeout=None)Adds an object to the queue. Waits if the queue is full.Yes
Getq.get(block=True, timeout=None)Retrieves an object from the queue. Waits if the queue is empty.Yes
Sizeq.qsize()Returns the current number of items. *Not available on some OS like macOS (NotImplementedError).No
Empty Checkq.empty()Returns True if the queue is empty. Reliability is low.No
Full Checkq.full()Returns True if the queue is full.No

Basic Usage

Create the queue before spawning processes and pass it as an argument to each process.

from multiprocessing import Queue

# Create a queue
q = Queue()

# Add data
q.put("Data A")
q.put("Data B")

# Get data (retrieved in the order added)
print(q.get())  # -> Data A
print(q.get())  # -> Data B

Full Code Example

The following code uses the queue as a “processing right (token)” to prevent race conditions during counting without an explicit Lock.

A single “ball (current value)” is kept in the queue. Processes compete for it; only the process that retrieves the value can perform the calculation and update the counter.

import multiprocessing
import time
import os

def increment_worker(queue: multiprocessing.Queue, loops: int):
    """
    Worker process that gets a value from the queue, increments it, and puts it back.
    Because queue.get() blocks, exclusive control is handled automatically.
    """
    pid = os.getpid()
    
    for _ in range(loops):
        # 1. Get the value (waits if empty = another process is processing)
        value = queue.get()
        
        # 2. Calculation (simulated heavy task)
        time.sleep(0.01)
        value += 1
        
        # 3. Put the updated value back (allows the next process to get it)
        queue.put(value)
        
        # Log output (optional)
        # print(f"[{pid}] updated to {value}")

    print(f"[{pid}] Processing complete")

def main():
    # Create a process-safe queue
    counter_queue = multiprocessing.Queue()
    
    # Put the initial value 0
    counter_queue.put(0)
    
    print("--- Starting process ---")
    
    processes = []
    num_processes = 2
    loops = 50
    
    # Create and start processes
    for _ in range(num_processes):
        p = multiprocessing.Process(
            target=increment_worker,
            args=(counter_queue, loops)
        )
        processes.append(p)
        p.start()
    
    # Wait for all processes to finish
    for p in processes:
        p.join()
    
    # Retrieve the final result
    final_result = counter_queue.get()
    
    print("--- All processes complete ---")
    print(f"Final counter value: {final_result}")
    
    expected = num_processes * loops
    if final_result == expected:
        print(f">> Success: Value reached {expected}.")
    else:
        print(f">> Failure: Value reached {final_result} instead of {expected}.")

if __name__ == "__main__":
    main()

Example Output

--- Starting process ---
[12345] Processing complete
[12346] Processing complete
--- All processes complete ---
Final counter value: 100
>> Success: Value reached 100.

Customization Points

  • Timeout Settings: Use q.get(timeout=3) to specify a wait time in seconds. This prevents the program from freezing indefinitely. A queue.Empty exception is raised if the time limit is reached.
  • Non-blocking Operations: q.put_nowait(obj) or q.get_nowait() attempt execution immediately without waiting. These raise queue.Full or queue.Empty exceptions on failure.
  • JoinableQueue: If you need task management, use multiprocessing.JoinableQueue. It provides q.task_done() and q.join(), allowing you to wait until all items in the queue have been processed.

Important Notes

  • qsize() Platform Dependence: q.qsize() is not supported on certain platforms like macOS. Calling it will raise a NotImplementedError. Avoid relying on it if your code needs to be portable.
  • Deadlocks: Avoid structures where a process trying to put() into a full queue and another trying to get() from an empty queue wait for each other. Specifically, joining a process while large amounts of data remain in the put buffer can cause a deadlock.
  • Transfer Cost: Data is moved between processes using pickle (serialization). Sending large data structures frequently will degrade performance due to the overhead of serialization and deserialization.

Advanced Application

This is a standard implementation of the Producer-Consumer pattern.

The producer creates data and puts it into the queue, while the consumer retrieves and processes it. A None value is used as a signal to stop the consumer.

import multiprocessing
import time

def producer(q):
    for i in range(5):
        data = f"Job-{i}"
        print(f"[Prod] Generating: {data}")
        q.put(data)
        time.sleep(0.5)
    # Send termination signal
    q.put(None)

def consumer(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f"[Cons] Processing: {data}")

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    
    p1.start(); p2.start()
    p1.join(); p2.join()

Conclusion

multiprocessing.Queue provides a simple way to handle process communication, which is often complex.

  • Best Uses: Data buffering, producer-consumer models, and aggregating results from different tasks.
  • Key Point: Configure timeouts and maxsize to ensure system stability.
  • Reminder: Note the lack of qsize() support on macOS and ensure that objects sent through the queue are pickleable.Using a Queue is generally more intuitive and safer than using shared memory (Value, Array). Consider this design pattern first for your parallel processing needs.
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次