Overview
Python’s multiprocessing.Queue is a FIFO (First-In, First-Out) queue designed for safe data exchange between multiple processes.
While it shares a similar interface with the threading module’s queue, this version uses pipes and locks internally. It is designed to be process-safe, ensuring that data is not corrupted even when multiple processes access it simultaneously.
This article explains the basic usage and implementation patterns for preventing data race conditions using a queue.
Specifications (Input/Output)
Basic Specifications of multiprocessing.Queue
- Initialization:
Queue(maxsize=0) - maxsize: The limit on the number of items in the queue. If it is 0 or less, the size is infinite (restricted only by available memory).
Main Methods
| Method | Syntax | Description | Blocking (Wait) |
| Add | q.put(obj, block=True, timeout=None) | Adds an object to the queue. Waits if the queue is full. | Yes |
| Get | q.get(block=True, timeout=None) | Retrieves an object from the queue. Waits if the queue is empty. | Yes |
| Size | q.qsize() | Returns the current number of items. *Not available on some OS like macOS (NotImplementedError). | No |
| Empty Check | q.empty() | Returns True if the queue is empty. Reliability is low. | No |
| Full Check | q.full() | Returns True if the queue is full. | No |
Basic Usage
Create the queue before spawning processes and pass it as an argument to each process.
from multiprocessing import Queue
# Create a queue
q = Queue()
# Add data
q.put("Data A")
q.put("Data B")
# Get data (retrieved in the order added)
print(q.get()) # -> Data A
print(q.get()) # -> Data B
Full Code Example
The following code uses the queue as a “processing right (token)” to prevent race conditions during counting without an explicit Lock.
A single “ball (current value)” is kept in the queue. Processes compete for it; only the process that retrieves the value can perform the calculation and update the counter.
import multiprocessing
import time
import os
def increment_worker(queue: multiprocessing.Queue, loops: int):
"""
Worker process that gets a value from the queue, increments it, and puts it back.
Because queue.get() blocks, exclusive control is handled automatically.
"""
pid = os.getpid()
for _ in range(loops):
# 1. Get the value (waits if empty = another process is processing)
value = queue.get()
# 2. Calculation (simulated heavy task)
time.sleep(0.01)
value += 1
# 3. Put the updated value back (allows the next process to get it)
queue.put(value)
# Log output (optional)
# print(f"[{pid}] updated to {value}")
print(f"[{pid}] Processing complete")
def main():
# Create a process-safe queue
counter_queue = multiprocessing.Queue()
# Put the initial value 0
counter_queue.put(0)
print("--- Starting process ---")
processes = []
num_processes = 2
loops = 50
# Create and start processes
for _ in range(num_processes):
p = multiprocessing.Process(
target=increment_worker,
args=(counter_queue, loops)
)
processes.append(p)
p.start()
# Wait for all processes to finish
for p in processes:
p.join()
# Retrieve the final result
final_result = counter_queue.get()
print("--- All processes complete ---")
print(f"Final counter value: {final_result}")
expected = num_processes * loops
if final_result == expected:
print(f">> Success: Value reached {expected}.")
else:
print(f">> Failure: Value reached {final_result} instead of {expected}.")
if __name__ == "__main__":
main()
Example Output
--- Starting process ---
[12345] Processing complete
[12346] Processing complete
--- All processes complete ---
Final counter value: 100
>> Success: Value reached 100.
Customization Points
- Timeout Settings: Use
q.get(timeout=3)to specify a wait time in seconds. This prevents the program from freezing indefinitely. Aqueue.Emptyexception is raised if the time limit is reached. - Non-blocking Operations:
q.put_nowait(obj)orq.get_nowait()attempt execution immediately without waiting. These raisequeue.Fullorqueue.Emptyexceptions on failure. - JoinableQueue: If you need task management, use
multiprocessing.JoinableQueue. It providesq.task_done()andq.join(), allowing you to wait until all items in the queue have been processed.
Important Notes
- qsize() Platform Dependence:
q.qsize()is not supported on certain platforms like macOS. Calling it will raise aNotImplementedError. Avoid relying on it if your code needs to be portable. - Deadlocks: Avoid structures where a process trying to
put()into a full queue and another trying toget()from an empty queue wait for each other. Specifically, joining a process while large amounts of data remain in theputbuffer can cause a deadlock. - Transfer Cost: Data is moved between processes using
pickle(serialization). Sending large data structures frequently will degrade performance due to the overhead of serialization and deserialization.
Advanced Application
This is a standard implementation of the Producer-Consumer pattern.
The producer creates data and puts it into the queue, while the consumer retrieves and processes it. A None value is used as a signal to stop the consumer.
import multiprocessing
import time
def producer(q):
for i in range(5):
data = f"Job-{i}"
print(f"[Prod] Generating: {data}")
q.put(data)
time.sleep(0.5)
# Send termination signal
q.put(None)
def consumer(q):
while True:
data = q.get()
if data is None:
break
print(f"[Cons] Processing: {data}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()
Conclusion
multiprocessing.Queue provides a simple way to handle process communication, which is often complex.
- Best Uses: Data buffering, producer-consumer models, and aggregating results from different tasks.
- Key Point: Configure timeouts and
maxsizeto ensure system stability. - Reminder: Note the lack of qsize() support on macOS and ensure that objects sent through the queue are pickleable.Using a Queue is generally more intuitive and safer than using shared memory (Value, Array). Consider this design pattern first for your parallel processing needs.
