[Python] Sharing Data Between Processes Using multiprocessing.Value

目次

Overview

In Python’s multiprocessing environment, each process has its own independent memory space. This means that changing a variable in one process does not affect other processes.

To share simple data like numbers or characters and reflect updates in real-time, you should use multiprocessing.Value. This object provides shared memory.

This article explains how to create shared memory and how to use locking mechanisms (mutual exclusion) to prevent data race conditions.


Specifications

Arguments for multiprocessing.Value

ArgumentDescription
typecode_or_typeA code representing the data type (see the table below) or a ctypes type.
argsThe initial value of the data.
lockWhether to use a lock for mutual exclusion. The default is True, which creates a new lock. You can also pass an existing Lock object.

Main Type Codes (ctypes)

Type CodeC TypePython TypeDescription
‘i’signed intintSigned integer. Often used for counters.
‘d’doublefloatDouble-precision floating point. Used for calculation results.
‘c’charbytesA 1-byte character.
‘u’wchar_tstrA Unicode character (1 character).

Basic Usage

Create a Value object and access it using the .value attribute. When multiple processes write to it at the same time, you must use .get_lock() for exclusive control.

# Create shared memory (Integer 'i', initial value 0)
counter = multiprocessing.Value('i', 0)

# Writing (using a lock)
with counter.get_lock():
    counter.value += 1

# Reading
print(counter.value)

Full Code Example

The following code demonstrates two processes incrementing the same counter (shared memory) concurrently. It uses a lock to ensure data integrity.

import multiprocessing
import time
import os

def increment_worker(shared_counter: multiprocessing.Value, count: int):
    """
    Worker function that increments a shared counter.
    
    Args:
        shared_counter (multiprocessing.Value): The shared counter.
        count (int): Number of loops.
    """
    pid = os.getpid()
    print(f"[Process-{pid}] Started")
    
    for _ in range(count):
        # Start exclusive control (critical section)
        # Without this, multiple processes might read/write simultaneously, causing conflicts.
        with shared_counter.get_lock():
            shared_counter.value += 1
            
        # Simulate heavy processing to trigger potential race conditions
        time.sleep(0.001)
        
    print(f"[Process-{pid}] Finished")

def main():
    # Create shared memory: Type is integer ('i'), initial value is 0.
    # It has an internal lock by default (lock=True).
    counter = multiprocessing.Value('i', 0)
    
    num_processes = 2
    loops_per_process = 50
    
    print(f"--- Processing Started (Initial Value: {counter.value}) ---")
    
    processes = []
    
    # Create processes
    for _ in range(num_processes):
        p = multiprocessing.Process(
            target=increment_worker,
            args=(counter, loops_per_process)
        )
        processes.append(p)
        p.start()
    
    # Wait for all processes to finish
    for p in processes:
        p.join()
        
    print("--- All Processes Finished ---")
    
    # Verify the results
    expected_value = num_processes * loops_per_process
    print(f"Final Counter Value: {counter.value}")
    print(f"Expected Value: {expected_value}")
    
    if counter.value == expected_value:
        print(">> Success: Data was updated without conflicts.")
    else:
        print(">> Failure: Data race condition occurred.")

if __name__ == "__main__":
    main()

Example Output

--- Processing Started (Initial Value: 0) ---
[Process-12345] Started
[Process-12346] Started
[Process-12345] Finished
[Process-12346] Finished
--- All Processes Finished ---
Final Counter Value: 100
Expected Value: 100
>> Success: Data was updated without conflicts.

Customization Points

  • Disabling Locks: If you only use the memory for reading or perform operations that are guaranteed to be atomic, you can set lock=False to reduce overhead.
  • Injecting Shared Locks: If you need to update multiple Value objects at once, you can create a single multiprocessing.Lock and pass it to the lock argument of each Value to control them together.

Important Notes

  • Sharing Strings: Value is meant for numbers or single characters. To share long strings or complex objects (like lists or dictionaries), use multiprocessing.Manager.
  • Performance: Lock control between processes is expensive. Frequently writing to shared memory can slow down your program. Calculate values locally and write to shared memory at the end when possible.
  • Atomicity: counter.value += 1 is not an atomic operation in Python. Using with get_lock(): is mandatory to prevent incorrect counts.

Advanced Applications

To share an array of data, use multiprocessing.Array. Its usage is very similar to Value.

import multiprocessing

def update_array(shared_arr):
    # Lock the entire array during modification
    with shared_arr.get_lock():
        for i in range(len(shared_arr)):
            shared_arr[i] *= 2

if __name__ == "__main__":
    # Integer array ('i'), size 3, initialized with a list
    arr = multiprocessing.Array('i', [1, 2, 3])
    
    p = multiprocessing.Process(target=update_array, args=(arr,))
    p.start()
    p.join()
    
    # Accessing an Array with a slice returns a list
    print(f"Updated Array: {arr[:]}")  # Output: [2, 4, 6]

Conclusion

multiprocessing.Value acts like a “small window” for communication in a multi-process environment.

Reminder: Always use get_lock() to prevent data corruption.By applying proper exclusive control, you can ensure safe and reliable parallel processing.

Best Uses: Sharing progress counters, passing flags (like stop signals), or sharing simple numerical parameters.

Key Point: Choose the correct type code (i, d, etc.) based on your data.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次