目次
概要
マルチプロセス環境において、複数のプロセスが同時に同じファイルへ書き込みを行ったり、標準出力へログを出したりすると、データが壊れたり表示が混ざったりする「競合」が発生します。 これを防ぐために multiprocessing.Lock を使用します。 あるプロセスがロックを獲得している間、他のプロセスは待機状態となり、処理が「直列化」されることでデータの整合性が保たれます。
仕様(入出力)
- 入力:
- 共有のリソース(例: テキストファイル)。
multiprocessing.Lockオブジェクト。
- 出力:
- 複数のプロセスが並行して更新を行っても、矛盾なく整合性が取れた最終結果。
- 仕組み:
with lock:ブロックに入るとロックを獲得し、ブロックを抜けるときに自動的に解放します。
基本の使い方
メインプロセスで Lock を作成し、それを各サブプロセスの引数として渡します。
from multiprocessing import Process, Lock
lock = Lock()
def worker(l):
with l:
# このブロック内は同時に1つのプロセスしか実行できません
print("排他制御中")
# 引数でロックを渡す
p = Process(target=worker, args=(lock,))
p.start()
コード全文
提示された「ファイルを使ったカウンター」の例を修正し、排他制御を適用したコードです。 ロックがない場合、複数のプロセスが同時に「読み込み→書き込み」を行うため、値が上書きされてカウントが合わなくなりますが、ロックを使うことで期待通りに動作します。
import multiprocessing
import time
import os
# データを保存するファイル名
FILE_NAME = "counter_tmp.txt"
def init_data():
"""ファイルを初期化(0を書き込む)"""
with open(FILE_NAME, "w") as f:
f.write("0")
print("[Main] ファイルを初期化しました: 0")
def read_data():
"""ファイルから数値を読み込む"""
with open(FILE_NAME, "r") as f:
text = f.read()
return int(text) if text else 0
def write_data(n):
"""ファイルへ数値を書き込む"""
with open(FILE_NAME, "w") as f:
f.write(str(n))
def increment_process(lock, process_name, loops):
"""
ファイルを読み書きしてインクリメントするプロセス。
Lockを使って、読み込みから書き込みまでを一連の操作として保護する。
"""
for _ in range(loops):
# --- クリティカルセクション開始 ---
with lock:
# 1. 読み込み
current_val = read_data()
# 2. 計算(処理時間をシミュレート)
new_val = current_val + 1
time.sleep(0.01)
# 3. 書き込み
write_data(new_val)
# 途中経過の表示(これもロック内で行うと表示乱れを防げる)
# print(f"[{process_name}] updated to {new_val}")
# --- クリティカルセクション終了 ---
# ロックの外で少し待機(他のプロセスに譲るため)
time.sleep(0.001)
print(f"[{process_name}] 完了")
def main():
# 1. ロックオブジェクトの生成
lock = multiprocessing.Lock()
# 2. データの初期化
init_data()
loops = 10
process_count = 2
print("--- 並列処理開始 ---")
# 3. プロセスにロックを渡して生成
p1 = multiprocessing.Process(target=increment_process, args=(lock, "Process-1", loops))
p2 = multiprocessing.Process(target=increment_process, args=(lock, "Process-2", loops))
p1.start()
p2.start()
p1.join()
p2.join()
print("--- 全処理完了 ---")
# 結果確認
final_val = read_data()
expected_val = loops * process_count
print(f"最終結果: {final_val}")
print(f"期待値: {expected_val}")
if final_val == expected_val:
print(">> 成功: 排他制御により正しくカウントされました。")
else:
print(">> 失敗: 競合が発生しました。")
# 後始末
if os.path.exists(FILE_NAME):
os.remove(FILE_NAME)
if __name__ == "__main__":
main()
実行結果例
[Main] ファイルを初期化しました: 0
--- 並列処理開始 ---
[Process-1] 完了
[Process-2] 完了
--- 全処理完了 ---
最終結果: 20
期待値: 20
>> 成功: 排他制御により正しくカウントされました。
カスタムポイント
- ロックの範囲
read_dataからwrite_dataまでの「一連の流れ」全体をロックで囲むことが重要です。個々の関数の内部だけをロックしても、読み込んでから書き込むまでの間に割り込まれると意味がありません。
- 標準出力の保護
printも競合すると文字が混ざるため、ログ出力用として別途ロックを用意する場合もあります。
注意点
- デッドロック
- ロックの中で例外が発生して
withブロックを抜けられなかったり、ロックを持ったまま別のロックを取りに行くと、プログラムが永遠に停止する「デッドロック」が発生するリスクがあります。
- ロックの中で例外が発生して
- パフォーマンス
- ファイルアクセスのような遅い処理をロックすると、並列処理のメリット(高速化)が損なわれます。本当に必要な箇所だけを最小限の範囲でロックしてください。
- スレッド用Lockとの違い
threading.Lockとmultiprocessing.Lockは別物です。プロセス間で使う場合は必ずmultiprocessingモジュールのものを使用してください。
応用
再帰ロック (multiprocessing.RLock) 同じプロセス(スレッド)であれば、何度でも獲得できるロックです。再帰的な関数呼び出しの中でロックを使う場合に適しています。
import multiprocessing
def recursive_worker(rlock, count):
if count <= 0:
return
# RLockなら同じプロセス内で重ねて acquire できる
with rlock:
print(f"Layer {count}: ロック獲得中")
recursive_worker(rlock, count - 1)
if __name__ == "__main__":
rlock = multiprocessing.RLock()
p = multiprocessing.Process(target=recursive_worker, args=(rlock, 3))
p.start()
p.join()
まとめ
multiprocessing.Lock は、プロセス間のリソース競合を防ぐための最も基本的な手段です。
- 向く場面: ファイルへの書き込み、データベースへのアクセス、標準出力へのログ表示。
- 変更ポイント: 必ず「読み込み→計算→書き込み」のアトミック性を保証したい範囲全体を
with lock:で囲ってください。 - 注意点: ロックの範囲が広すぎると直列実行と変わらなくなり、狭すぎると競合を防げません。
ファイル操作など、外部リソースを共有する並列処理では必ず排他制御を導入しましょう。
