目次
概要
multiprocessing.Value が「単一の値」を共有するのに対し、multiprocessing.Array は「固定長の配列」をプロセス間で共有するために使用します。
リストのようにインデックスでアクセスでき、複数のプロセスから同時にデータの読み書きが可能です。
本記事では、共有配列の初期化方法と、配列操作における排他制御(ロック)の基本、および注意点を解説します。
仕様(入出力)
multiprocessing.Array の引数
| 引数名 | 説明 |
| typecode_or_type | 要素の型を指定するコード('i'=整数, 'd'=小数など)。全要素が同じ型である必要があります。 |
| size_or_initializer | 整数を指定するとそのサイズの配列(初期値0)を作成し、リストを指定するとその内容で初期化されます。 |
| lock | True(デフォルト)ならロックあり。Falseならロックなし(高速だが競合対策が必要)。 |
主な操作
- 読み書き:
arr[index]で通常のリストと同様にアクセス可能。 - 全取得:
arr[:]やlist(arr)で通常のPythonリストに変換可能。 - ロック:
with arr.get_lock():で配列全体へのアクセスを保護。
基本の使い方
サイズを指定して空の配列を作るか、初期値リストを渡して作成します。
# サイズ5の整数配列(初期値は全て0)
arr1 = multiprocessing.Array('i', 5)
# 初期値を指定して作成
arr2 = multiprocessing.Array('d', [1.5, 2.5, 3.5])
# 書き込み
arr2[0] = 9.9
コード全文
提示されたコードを修正・最適化したものです。
2つのプロセスが共有配列の「全ての要素」に対してインクリメントを行います。競合を防ぐため、書き込み時にロックを使用します。
import multiprocessing
import time
import os
def increment_array_elements(shared_array: multiprocessing.Array, loop_count: int):
"""
共有配列の全要素を指定回数だけインクリメントする。
"""
pid = os.getpid()
print(f"[Process-{pid}] 開始")
for _ in range(loop_count):
# 配列の全要素を走査
for i in range(len(shared_array)):
# 1要素ごとの書き込み時にロックを取得(排他制御)
with shared_array.get_lock():
shared_array[i] += 1
# 処理の重さをシミュレーション
time.sleep(0.001)
print(f"[Process-{pid}] 終了")
def main():
# 共有配列を作成: 型は整数('i')、サイズは 5(初期値は自動で [0, 0, 0, 0, 0])
shared_arr = multiprocessing.Array('i', 5)
print(f"初期状態: {list(shared_arr)}")
# 2つのプロセスで並列実行
process_count = 2
loops = 50
processes = []
for _ in range(process_count):
p = multiprocessing.Process(
target=increment_array_elements,
args=(shared_arr, loops)
)
processes.append(p)
p.start()
# 全プロセスの終了待機
for p in processes:
p.join()
print("--- 全処理完了 ---")
# 結果の表示
# 期待値: 2プロセス * 50回 = 100
print(f"最終結果: {list(shared_arr)}")
expected = [process_count * loops] * 5
if list(shared_arr) == expected:
print(">> 成功: 全ての要素が正しく更新されました。")
else:
print(">> 失敗: 値の不整合が発生しました。")
if __name__ == "__main__":
main()
実行結果例
初期状態: [0, 0, 0, 0, 0]
[Process-12345] 開始
[Process-12346] 開始
[Process-12345] 終了
[Process-12346] 終了
--- 全処理完了 ---
最終結果: [100, 100, 100, 100, 100]
>> 成功: 全ての要素が正しく更新されました。
カスタムポイント
- ロックの粒度(範囲)
- 上記の例では
shared_array[i] += 1のたびにロックしていますが、これは安全ですが低速です。 - 「配列全体を一気に更新する」ような処理であれば、
forループ全体をwith shared_array.get_lock():で囲むことで、ロック取得回数を減らし高速化できます(ただし、その間他のプロセスは待たされます)。
- 上記の例では
- 型コードの選択
- 画像データなどを扱う場合は、符号なし整数
'B'(unsigned char) などを使用します。
- 画像データなどを扱う場合は、符号なし整数
注意点
- 固定長である
- 通常のリスト (
list) と異なり、append()やpop()で要素数を増減させることはできません。初期化時に必要な最大サイズを確保する必要があります。
- 通常のリスト (
- 要素ごとのロックではない
get_lock()は「配列オブジェクト全体」に対するロックです。「インデックス0への書き込み」と「インデックス1への書き込み」を別々のプロセスで行っても、ロックを取得すると互いに待ちが発生します。
- 速度
- 共有メモリへのアクセスは、通常のローカル変数へのアクセスよりもオーバーヘッドが大きいです。計算頻度が高い変数はローカルで持ち、最終結果だけを共有配列に書き込む設計が推奨されます。
応用
ロックを使わない(lock=False)パターンです。
各プロセスが書き込む「場所(インデックス)」が明確に分かれている場合、競合は発生しないため、ロックを無効化して高速に処理できます。
import multiprocessing
def compute_part(shared_arr, start_index, end_index, value):
"""
担当するインデックス範囲だけを書き換える(競合しないのでロック不要)
"""
for i in range(start_index, end_index):
shared_arr[i] = value
if __name__ == "__main__":
# lock=False で高速化(ただし使い方は慎重に)
size = 10
arr = multiprocessing.Array('i', size, lock=False)
# 前半5つをプロセス1、後半5つをプロセス2が担当
p1 = multiprocessing.Process(target=compute_part, args=(arr, 0, 5, 100))
p2 = multiprocessing.Process(target=compute_part, args=(arr, 5, 10, 200))
p1.start()
p2.start()
p1.join()
p2.join()
print(list(arr))
# 結果: [100, 100, 100, 100, 100, 200, 200, 200, 200, 200]
まとめ
multiprocessing.Array は、数値データの配列を共有する際の標準的な手段です。
- 向く場面: 複数のワーカーで計算結果を集約する場合、固定長のステータスフラグ管理。
- 変更ポイント: 必要なサイズと型を事前に見積もり、初期化してください。
- 注意点:
appendできない制約と、ロックによるパフォーマンスへの影響を考慮して設計しましょう。
書き込み場所を分担できるなら lock=False、そうでないなら get_lock() を使う、という使い分けが重要です。
