【Python】multiprocessing.Arrayでプロセス間の配列共有を行う

目次

概要

multiprocessing.Value が「単一の値」を共有するのに対し、multiprocessing.Array は「固定長の配列」をプロセス間で共有するために使用します。

リストのようにインデックスでアクセスでき、複数のプロセスから同時にデータの読み書きが可能です。

本記事では、共有配列の初期化方法と、配列操作における排他制御(ロック)の基本、および注意点を解説します。

仕様(入出力)

multiprocessing.Array の引数

引数名説明
typecode_or_type要素の型を指定するコード('i'=整数, 'd'=小数など)。全要素が同じ型である必要があります。
size_or_initializer整数を指定するとそのサイズの配列(初期値0)を作成し、リストを指定するとその内容で初期化されます。
lockTrue(デフォルト)ならロックあり。Falseならロックなし(高速だが競合対策が必要)。

主な操作

  • 読み書き: arr[index] で通常のリストと同様にアクセス可能。
  • 全取得: arr[:]list(arr) で通常のPythonリストに変換可能。
  • ロック: with arr.get_lock(): で配列全体へのアクセスを保護。

基本の使い方

サイズを指定して空の配列を作るか、初期値リストを渡して作成します。

# サイズ5の整数配列(初期値は全て0)
arr1 = multiprocessing.Array('i', 5)

# 初期値を指定して作成
arr2 = multiprocessing.Array('d', [1.5, 2.5, 3.5])

# 書き込み
arr2[0] = 9.9

コード全文

提示されたコードを修正・最適化したものです。

2つのプロセスが共有配列の「全ての要素」に対してインクリメントを行います。競合を防ぐため、書き込み時にロックを使用します。

import multiprocessing
import time
import os

def increment_array_elements(shared_array: multiprocessing.Array, loop_count: int):
    """
    共有配列の全要素を指定回数だけインクリメントする。
    """
    pid = os.getpid()
    print(f"[Process-{pid}] 開始")
    
    for _ in range(loop_count):
        # 配列の全要素を走査
        for i in range(len(shared_array)):
            # 1要素ごとの書き込み時にロックを取得(排他制御)
            with shared_array.get_lock():
                shared_array[i] += 1
            
        # 処理の重さをシミュレーション
        time.sleep(0.001)
        
    print(f"[Process-{pid}] 終了")

def main():
    # 共有配列を作成: 型は整数('i')、サイズは 5(初期値は自動で [0, 0, 0, 0, 0])
    shared_arr = multiprocessing.Array('i', 5)
    
    print(f"初期状態: {list(shared_arr)}")
    
    # 2つのプロセスで並列実行
    process_count = 2
    loops = 50
    processes = []

    for _ in range(process_count):
        p = multiprocessing.Process(
            target=increment_array_elements,
            args=(shared_arr, loops)
        )
        processes.append(p)
        p.start()

    # 全プロセスの終了待機
    for p in processes:
        p.join()

    print("--- 全処理完了 ---")
    
    # 結果の表示
    # 期待値: 2プロセス * 50回 = 100
    print(f"最終結果: {list(shared_arr)}")
    
    expected = [process_count * loops] * 5
    if list(shared_arr) == expected:
        print(">> 成功: 全ての要素が正しく更新されました。")
    else:
        print(">> 失敗: 値の不整合が発生しました。")

if __name__ == "__main__":
    main()

実行結果例

初期状態: [0, 0, 0, 0, 0]
[Process-12345] 開始
[Process-12346] 開始
[Process-12345] 終了
[Process-12346] 終了
--- 全処理完了 ---
最終結果: [100, 100, 100, 100, 100]
>> 成功: 全ての要素が正しく更新されました。

カスタムポイント

  • ロックの粒度(範囲)
    • 上記の例では shared_array[i] += 1 のたびにロックしていますが、これは安全ですが低速です。
    • 「配列全体を一気に更新する」ような処理であれば、for ループ全体を with shared_array.get_lock(): で囲むことで、ロック取得回数を減らし高速化できます(ただし、その間他のプロセスは待たされます)。
  • 型コードの選択
    • 画像データなどを扱う場合は、符号なし整数 'B' (unsigned char) などを使用します。

注意点

  1. 固定長である
    • 通常のリスト (list) と異なり、append()pop() で要素数を増減させることはできません。初期化時に必要な最大サイズを確保する必要があります。
  2. 要素ごとのロックではない
    • get_lock() は「配列オブジェクト全体」に対するロックです。「インデックス0への書き込み」と「インデックス1への書き込み」を別々のプロセスで行っても、ロックを取得すると互いに待ちが発生します。
  3. 速度
    • 共有メモリへのアクセスは、通常のローカル変数へのアクセスよりもオーバーヘッドが大きいです。計算頻度が高い変数はローカルで持ち、最終結果だけを共有配列に書き込む設計が推奨されます。

応用

ロックを使わない(lock=False)パターンです。

各プロセスが書き込む「場所(インデックス)」が明確に分かれている場合、競合は発生しないため、ロックを無効化して高速に処理できます。

import multiprocessing

def compute_part(shared_arr, start_index, end_index, value):
    """
    担当するインデックス範囲だけを書き換える(競合しないのでロック不要)
    """
    for i in range(start_index, end_index):
        shared_arr[i] = value

if __name__ == "__main__":
    # lock=False で高速化(ただし使い方は慎重に)
    size = 10
    arr = multiprocessing.Array('i', size, lock=False)
    
    # 前半5つをプロセス1、後半5つをプロセス2が担当
    p1 = multiprocessing.Process(target=compute_part, args=(arr, 0, 5, 100))
    p2 = multiprocessing.Process(target=compute_part, args=(arr, 5, 10, 200))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(list(arr))
    # 結果: [100, 100, 100, 100, 100, 200, 200, 200, 200, 200]

まとめ

multiprocessing.Array は、数値データの配列を共有する際の標準的な手段です。

  • 向く場面: 複数のワーカーで計算結果を集約する場合、固定長のステータスフラグ管理。
  • 変更ポイント: 必要なサイズと型を事前に見積もり、初期化してください。
  • 注意点: append できない制約と、ロックによるパフォーマンスへの影響を考慮して設計しましょう。

書き込み場所を分担できるなら lock=False、そうでないなら get_lock() を使う、という使い分けが重要です。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次