概要
マルチスレッド処理において、複数のスレッドが同じ変数やリソース(ファイル、データベース、グローバル変数など)を同時に書き換えようとすると、データが整合性を失う「競合状態(Race Condition)」が発生します。
これを防ぐために threading.Lock を使用し、あるスレッドが処理中は他のスレッドを待機させる「排他制御」の実装方法を解説します。
仕様(入出力)
- 入力: 共有リソース(変数など)を操作する関数
- 出力: 競合せず正しく計算された結果
- 機能:
- クリティカルセクション(同時に実行されては困る部分)の保護
- スレッド間の同期
構文と意味
| 構文 | 意味・役割 |
lock = threading.Lock() | 新しいロックオブジェクトを生成します。初期状態は「ロック解除」です。 |
with lock: | コンテキストマネージャを使用してロックを取得(Acquire)します。ブロックを抜けると自動的に解放(Release)されます。推奨される書き方です。 |
lock.acquire() | 手動でロックを取得します。他のスレッドがロック中の場合、解放されるまで待機します。 |
lock.release() | 手動でロックを解放します。取得したスレッドが必ず呼ぶ必要があります。 |
基本の使い方
ロックオブジェクトを作成し、with 文で保護したい処理を囲むのが基本パターンです。
import threading
# 1. ロックオブジェクトの生成
lock = threading.Lock()
shared_data = 0
def safe_update():
global shared_data
# 2. 排他制御の開始
# このブロック内は常に1つのスレッドしか実行できません
with lock:
current = shared_data
current += 1
shared_data = current
# 以降でスレッドを作成・開始...
コード全文
「ロックなし(危険)」な状態と「ロックあり(安全)」な状態の違いをシミュレーションするコードです。
銀行口座の残高更新を例に、排他制御がないと計算が合わなくなる現象と、それを Lock で解決する方法を示します。
import threading
import time
# 共有リソース(銀行口座の残高と仮定)
account_balance = 0
# 排他制御用のロックオブジェクトを作成
balance_lock = threading.Lock()
def update_balance_unsafe(amount, loops):
"""
【危険】ロックなしで残高を更新する関数
読み込みと書き込みの間に待機時間があるため、競合が発生しやすい
"""
global account_balance
for _ in range(loops):
# 現在の値を読み込む
current = account_balance
# IO待ちなどを想定したタイムラグ
time.sleep(0.001)
# 計算して書き戻す(ここで他のスレッドが値を上書きしている可能性がある)
account_balance = current + amount
def update_balance_safe(amount, loops):
"""
【安全】ロックを使用して残高を更新する関数
"""
global account_balance
for _ in range(loops):
# --- クリティカルセクション開始 ---
with balance_lock:
current = account_balance
time.sleep(0.001)
account_balance = current + amount
# --- クリティカルセクション終了 ---
def run_simulation(safe_mode=True):
global account_balance
account_balance = 0 # リセット
threads = []
loops = 50
thread_count = 5
print(f"--- シミュレーション開始 (SafeMode: {safe_mode}) ---")
target_func = update_balance_safe if safe_mode else update_balance_unsafe
# 複数のスレッドで同時に更新を行う
for _ in range(thread_count):
t = threading.Thread(target=target_func, args=(1, loops))
threads.append(t)
t.start()
for t in threads:
t.join()
# 期待値: スレッド数 × ループ回数
expected = thread_count * loops
print(f"最終残高: {account_balance} (期待値: {expected})")
if account_balance == expected:
print(">> 結果: 正常")
else:
print(">> 結果: データ不整合が発生しました!")
print("-" * 30)
if __name__ == "__main__":
# 1. ロックなしで実行(失敗する可能性が高い)
run_simulation(safe_mode=False)
# 2. ロックありで実行(必ず成功する)
run_simulation(safe_mode=True)
カスタムポイント
with 文を使うメリット
lock.acquire() と lock.release() を手動で書くこともできますが、推奨されません。
with lock: を使うことで、ブロック内でエラー(例外)が発生しても、自動的に release() が呼ばれ、ロックが解除されます。
これにより、「エラーで処理が止まったのにロックが掛かったままで、他のスレッドが永遠に待ち続ける(デッドロック)」という事態を防げます。
粒度(範囲)の調整
ロックを掛ける範囲(with の中身)は、必要最小限に留めるのが鉄則です。
不要な計算や重い通信までロックの中に含めると、並列処理のメリットがなくなり、プログラム全体の動作が遅くなります(実質的なシングルスレッドになってしまいます)。
注意点
- デッドロック (Deadlock)
- 複数のロック(Lock A, Lock B)がある場合、スレッド1がAを持ってBを待ち、スレッド2がBを持ってAを待つ状態になると、互いに動けなくなります。
- ロックの取得順序を統一するなどの設計が必要です。
- 再帰的なロック
- 通常の
Lockは、同じスレッドであっても2回acquireすると自分自身で待機状態になり停止します。 - 同じスレッドが何度もロックを取得する必要がある場合(再帰呼び出しなど)は、
threading.RLockを使用してください。
- 通常の
- パフォーマンス
- ロックの取得・解放にはオーバーヘッドがあります。あまりに頻繁にロックを行うと、逆に処理速度が低下することがあります。
応用
RLock(再入可能ロック)を使用した例です。同じスレッドであれば、ネストしてロックを取得できます。
import threading
# RLockを使用
rlock = threading.RLock()
def recursive_worker(count):
with rlock:
print(f"ロック取得: 深さ {count}")
if count > 0:
# 通常のLockならここでフリーズするが、RLockなら通過できる
recursive_worker(count - 1)
if __name__ == "__main__":
recursive_worker(3)
まとめ
マルチスレッドで共有データを扱う際は、必ず threading.Lock で排他制御を行ってください。
書き方は with lock: が基本です。「読み込んで、計算して、書き込む」という一連の流れをアトミック(分割不可能)な操作にすることで、データの整合性を守ることができます。
