目次
概要
Pythonのマルチプロセス(multiprocessing)環境では、各プロセスが独立したメモリ空間を持つため、通常変数を書き換えても他のプロセスには反映されません。
プロセス間で数値や文字などの単純なデータを共有し、かつリアルタイムに更新を反映させたい場合は、共有メモリ割り当てオブジェクトである multiprocessing.Value を使用します。
本記事では、共有メモリの作成方法と、データ競合を防ぐためのロック機構(排他制御)の正しい使い方を解説します。
仕様(入出力)
multiprocessing.Value の引数
| 引数名 | 説明 |
| typecode_or_type | 共有するデータの型を指定するコード(後述の型コード表参照)または ctypes の型。 |
| args | データの初期値(型コードの次に指定します)。 |
| lock | 排他制御用のロックオブジェクトを使用するかどうか。True(デフォルト)なら新しいロックが生成され、Lockオブジェクトを渡すことも可能です。 |
主な型コード(ctypes)
| 型コード | C言語の型 | Pythonの型 | 説明 |
| ‘i’ | signed int | int | 符号付き整数(サイズはプラットフォーム依存)。カウンタなど。 |
| ‘d’ | double | float | 倍精度浮動小数点数。計算結果など。 |
| ‘c’ | char | bytes | 1バイトの文字。 |
| ‘u’ | wchar_t | str | Unicode文字(1文字)。 |
基本の使い方
Value オブジェクトを作成し、.value 属性を通じて読み書きを行います。
同時に書き込みが発生する場合は、.get_lock() を使って排他制御を行う必要があります。
# 共有メモリの作成(整数型 'i', 初期値 0)
counter = multiprocessing.Value('i', 0)
# 書き込み(ロックを使用)
with counter.get_lock():
counter.value += 1
# 読み出し
print(counter.value)
コード全文
2つのプロセスから同じカウンタ変数(共有メモリ)に対して並行して加算処理を行い、ロックを使ってデータの整合性を保つ例です。
import multiprocessing
import time
import os
def increment_worker(shared_counter: multiprocessing.Value, count: int):
"""
共有カウンタをインクリメントするワーカー関数。
Args:
shared_counter (multiprocessing.Value): プロセス共有のカウンタ
count (int): ループ回数
"""
pid = os.getpid()
print(f"[Process-{pid}] 開始")
for _ in range(count):
# 排他制御を開始(クリティカルセクション)
# これがないと、複数のプロセスが同時に値を読み書きして競合が発生します
with shared_counter.get_lock():
shared_counter.value += 1
# 処理の重さをシミュレーション(競合を起きやすくする)
time.sleep(0.001)
print(f"[Process-{pid}] 終了")
def main():
# 共有メモリを作成: 型は整数('i')、初期値は 0
# デフォルトで lock=True なので内部にロックを持っています
counter = multiprocessing.Value('i', 0)
# ワーカープロセス数とループ回数
num_processes = 2
loops_per_process = 50
print(f"--- 処理開始 (初期値: {counter.value}) ---")
processes = []
# プロセスの生成
for _ in range(num_processes):
p = multiprocessing.Process(
target=increment_worker,
args=(counter, loops_per_process)
)
processes.append(p)
p.start()
# 全プロセスの終了待機
for p in processes:
p.join()
print("--- 全処理完了 ---")
# 結果の検証
expected_value = num_processes * loops_per_process
print(f"最終カウンタ値: {counter.value}")
print(f"期待値: {expected_value}")
if counter.value == expected_value:
print(">> 成功: データ競合なく更新されました。")
else:
print(">> 失敗: データ競合が発生しました。")
if __name__ == "__main__":
main()
実行結果例
--- 処理開始 (初期値: 0) ---
[Process-12345] 開始
[Process-12346] 開始
[Process-12345] 終了
[Process-12346] 終了
--- 全処理完了 ---
最終カウンタ値: 100
期待値: 100
>> 成功: データ競合なく更新されました。
カスタムポイント
- ロックの無効化
- 読み取り専用で使う場合や、アトミック性が保証されている操作のみの場合は
lock=Falseを指定することで、ロック取得のオーバーヘッドを削減できます。
- 読み取り専用で使う場合や、アトミック性が保証されている操作のみの場合は
- 共有ロックの外部注入
- 複数の
Valueを同時に更新する必要がある場合、個々のget_lock()ではなく、別途作成したmultiprocessing.Lockを各Valueのlock引数に渡すことで、一括して制御できます。
- 複数の
注意点
- 文字列の共有
Valueは基本的に数値や1文字の共有向けです。長い文字列や複雑なオブジェクト(リストや辞書)を共有したい場合は、multiprocessing.Managerを使用してください。
- パフォーマンス
- プロセス間のロック制御は高コストです。頻繁に共有メモリを書き換える設計は、並列処理の速度低下を招く可能性があります。可能な限り、ローカル変数で計算してから最後に共有メモリに書き込む等の工夫が必要です。
- アトミック性
counter.value += 1はPythonレベルではアトミック(分割不可能な操作)ではないため、with get_lock():が必須です。これを忘れるとカウントがずれます。
応用
配列データを共有したい場合は multiprocessing.Array を使用します。使い方は Value とほぼ同じです。
import multiprocessing
def update_array(shared_arr):
# 配列全体をロックして操作
with shared_arr.get_lock():
for i in range(len(shared_arr)):
shared_arr[i] *= 2
if __name__ == "__main__":
# 整数配列('i')、サイズ3、初期値はリストで指定可能
arr = multiprocessing.Array('i', [1, 2, 3])
p = multiprocessing.Process(target=update_array, args=(arr,))
p.start()
p.join()
# Arrayはスライスで値を取得するとリストになります
print(f"更新後の配列: {arr[:]}") # 出力例: [2, 4, 6]
まとめ
multiprocessing.Value は、マルチプロセス環境下での「小さな窓」のようなものです。
- 向く場面: 進行状況カウンタの共有、フラグ(停止信号など)の伝達、単純な数値パラメータの共有。
- 変更ポイント: 扱うデータ型に合わせて適切な型コード(
i,d等)を選択してください。 - 注意点: 必ず
get_lock()を使用して競合を防いでください。
シンプルですが強力な機能です。適切に排他制御を行い、安全な並列処理を実現しましょう。
