[Python] Sharing Arrays Between Processes Using multiprocessing.Array

目次

Overview

While multiprocessing.Value is used to share a single value, multiprocessing.Array is designed to share a fixed-length array between processes. You can access it using indices just like a standard Python list, and it allows multiple processes to read from or write to the data simultaneously.

In this article, we will explain how to initialize a shared array, the basics of exclusive control (locking) during array operations, and important points to keep in mind.


Specifications (Input/Output)

Arguments for multiprocessing.Array

ArgumentDescription
typecode_or_typeA code representing the data type of the elements (e.g., ‘i’ for integer, ‘d’ for double). All elements must be the same type.
size_or_initializerAn integer specifies the array size (default value 0). A list initializes the array with that specific data.
lockTrue (default) enables locking. False disables it for higher speed, but you must handle race conditions manually.

Main Operations

  • Read/Write: Access data using arr[index] just like a normal list.
  • Full Retrieval: Convert to a standard Python list using arr[:] or list(arr).
  • Locking: Use with arr.get_lock(): to protect access to the entire array.

Basic Usage

You can create a shared array by either defining its size or passing a list of initial values.

# Create an integer array of size 5 (all initialized to 0)
arr1 = multiprocessing.Array('i', 5)

# Create an array with initial double values
arr2 = multiprocessing.Array('d', [1.5, 2.5, 3.5])

# Writing a value
arr2[0] = 9.9

Full Code Example

The following code demonstrates two processes incrementing “all elements” of a shared array. To prevent data corruption, a lock is used during the writing process.

import multiprocessing
import time
import os

def increment_array_elements(shared_array: multiprocessing.Array, loop_count: int):
    """
    Increments all elements of the shared array for a specified number of loops.
    """
    pid = os.getpid()
    print(f"[Process-{pid}] Started")
    
    for _ in range(loop_count):
        # Iterate through all elements of the array
        for i in range(len(shared_array)):
            # Acquire lock when writing to each element (Exclusive control)
            with shared_array.get_lock():
                shared_array[i] += 1
            
        # Simulate processing delay
        time.sleep(0.001)
        
    print(f"[Process-{pid}] Finished")

def main():
    # Create shared array: Type is integer ('i'), size is 5 (Initial values: [0, 0, 0, 0, 0])
    shared_arr = multiprocessing.Array('i', 5)
    
    print(f"Initial State: {list(shared_arr)}")
    
    # Run two processes in parallel
    process_count = 2
    loops = 50
    processes = []

    for _ in range(process_count):
        p = multiprocessing.Process(
            target=increment_array_elements,
            args=(shared_arr, loops)
        )
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    print("--- All processes completed ---")
    
    # Display results
    # Expected: 2 processes * 50 loops = 100 per element
    print(f"Final Result: {list(shared_arr)}")
    
    expected = [process_count * loops] * 5
    if list(shared_arr) == expected:
        print(">> Success: All elements were updated correctly.")
    else:
        print(">> Failure: Data inconsistency occurred.")

if __name__ == "__main__":
    main()

Example Output

Initial State: [0, 0, 0, 0, 0]
[Process-12345] Started
[Process-12346] Started
[Process-12345] Finished
[Process-12346] Finished
--- All processes completed ---
Final Result: [100, 100, 100, 100, 100]
>> Success: All elements were updated correctly.

Customization Points

Locking Granularity

In the example above, we lock for every single shared_array[i] += 1 operation. While safe, this is slow. If you need to update the entire array at once, wrapping the entire for loop with with shared_array.get_lock(): will be much faster because it reduces the number of lock acquisitions (though other processes will have to wait longer).

Type Code Selection

If you are handling image data or raw buffers, you might use typecode='B' (unsigned char) or other specific codes to optimize memory usage.


Important Notes

  • Fixed Length: Unlike a standard Python list, you cannot use append() or pop(). You must allocate the maximum size needed during initialization.
  • Global Lock: The get_lock() method locks the entire array object. Even if two processes try to write to different indices (e.g., index 0 and index 1), they will still block each other if they both acquire the lock.
  • Performance: Accessing shared memory is slower than accessing local variables. It is better to perform calculations using local variables and write the final result to the shared array at the end.

Advanced Application

This pattern shows how to use the array without locking (lock=False). If each process is assigned a specific, non-overlapping index range, race conditions will not occur, allowing for much faster processing.

import multiprocessing

def compute_part(shared_arr, start_index, end_index, value):
    """
    Modify only the assigned index range (No lock needed as indices don't overlap)
    """
    for i in range(start_index, end_index):
        shared_arr[i] = value

if __name__ == "__main__":
    # Disable locking for higher speed (Use carefully)
    size = 10
    arr = multiprocessing.Array('i', size, lock=False)
    
    # Process 1 handles the first 5 elements, Process 2 handles the last 5
    p1 = multiprocessing.Process(target=compute_part, args=(arr, 0, 5, 100))
    p2 = multiprocessing.Process(target=compute_part, args=(arr, 5, 10, 200))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(list(arr))
    # Result: [100, 100, 100, 100, 100, 200, 200, 200, 200, 200]

Conclusion

multiprocessing.Array is the standard way to share an array of numerical data in Python.

  • Best for: Aggregating calculation results from multiple workers or managing fixed-length status flags.
  • Key Action: Estimate the required size and type before initialization.
  • Design Tip: Consider the trade-off between the convenience of the fixed size and the performance impact of locking.

Using lock=False when tasks are clearly divided by index and get_lock() when multiple processes might touch the same elements is the key to efficient parallel processing.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次