【Python】asyncio.Lockで非同期処理のデータ競合を防ぐ(排他制御)

目次

概要

Pythonの asyncio はシングルスレッドで動作しますが、await(I/O待ち)のタイミングでタスクが切り替わるため、複数のタスクが同じ変数やリソースを同時に書き換えると「競合状態(レースコンディション)」が発生します。

これを防ぐために asyncio.Lock() を使用します。これにより、あるタスクが処理中の間、他のタスクがそのリソースに触れないよう待機させ、データの整合性を保つことができます。

仕様(入出力)

  • 入力
    • 共有リソース(変数、ファイル、DBなど)に対して更新を行う複数のコルーチン。
    • 各更新処理には、意図的に競合を誘発させるための待機時間(await asyncio.sleep)が含まれる。
  • 出力
    • すべてのタスク完了後の、整合性が保たれた共有リソースの値。
  • 動作要件
    • ロックを使用しない場合、計算結果がズレる(例: 100回足したのに95にしかならない)。
    • ロックを使用した場合、期待通りにカウントアップされる。

基本の使い方

排他制御の基本は、クリティカルセクション(同時に実行されては困る部分)を async with lock: で囲むことです。

構文と意味

構文意味
lock = asyncio.Lock()ロックオブジェクト(ミューテックス)を作成します。初期状態は「解除」です。
async with lock:ロックを獲得してブロック内を実行し、終了時に自動解放します。他のタスクがロック中の場合、解放されるまで待機します。
await lock.acquire()手動でロックを獲得します(通常は async with 推奨)。
lock.release()手動でロックを解放します。
# 基本パターン
lock = asyncio.Lock()

async def safe_operation():
    async with lock:
        # ここは同時に1つのタスクしか実行できません
        print("Locked processing...")

コード全文

ここでは「銀行口座クラス」を作成し、複数のATM(タスク)から同時に「残高確認→入金」という操作が行われた場合をシミュレーションします。ロックにより、残高の読み出しと書き込みの間で他のタスクが割り込まないようにします。

import asyncio

class BankAccount:
    def __init__(self, balance: int = 0):
        self.balance = balance
        # 排他制御用のロックオブジェクトを生成
        self.lock = asyncio.Lock()

    async def deposit(self, amount: int, task_name: str):
        """
        口座に入金を行うメソッド。
        ロックを使用することで、読み出しから書き込みまでの整合性を保つ。
        """
        print(f"[{task_name}] ロック獲得待ち...")
        
        # --- クリティカルセクション開始 ---
        async with self.lock:
            print(f"[{task_name}] ロック獲得。処理開始。")
            
            # 1. 現在の残高を取得
            current_value = self.balance
            print(f"  [{task_name}] 読み出し残高: {current_value}")
            
            # 2. 処理中のコンテキストスイッチをシミュレート(ここで他のタスクが動く)
            await asyncio.sleep(0.1)
            
            # 3. 計算して更新
            new_value = current_value + amount
            self.balance = new_value
            
            print(f"  [{task_name}] 更新後残高: {new_value}")
        # --- クリティカルセクション終了(自動的にロック解除) ---
        
        print(f"[{task_name}] 処理完了。ロック解放済み。")

async def main():
    # 口座を作成(初期残高 0)
    account = BankAccount()
    
    print("--- 並行入金処理を開始 ---")
    
    # 3つのタスクが同時に「100円入金」を試みる
    # ロックがない場合、全てのタスクが「残高0」を読み取ってしまい、最終結果が100になる恐れがある
    await asyncio.gather(
        account.deposit(100, "ATM-A"),
        account.deposit(100, "ATM-B"),
        account.deposit(100, "ATM-C")
    )
    
    print("--- 全処理完了 ---")
    print(f"最終残高: {account.balance} (期待値: 300)")

if __name__ == "__main__":
    asyncio.run(main())

実行結果例

タスクが順番にロックを獲得し、処理している様子がわかります。

--- 並行入金処理を開始 ---
[ATM-A] ロック獲得待ち...
[ATM-B] ロック獲得待ち...
[ATM-C] ロック獲得待ち...
[ATM-A] ロック獲得。処理開始。
  [ATM-A] 読み出し残高: 0
  [ATM-A] 更新後残高: 100
[ATM-A] 処理完了。ロック解放済み。
[ATM-B] ロック獲得。処理開始。
  [ATM-B] 読み出し残高: 100
  [ATM-B] 更新後残高: 200
[ATM-B] 処理完了。ロック解放済み。
[ATM-C] ロック獲得。処理開始。
  [ATM-C] 読み出し残高: 200
  [ATM-C] 更新後残高: 300
[ATM-C] 処理完了。ロック解放済み。
--- 全処理完了 ---
最終残高: 300 (期待値: 300)

カスタムポイント

  • ロックの粒度(範囲)
    • async with lock: の範囲は必要最小限に留めてください。
    • 関係のない重い処理(通信や計算)までロックの中に含めると、並列性のメリットが失われ、実質的な直列実行(パフォーマンス低下)になってしまいます。
  • グローバル変数 vs クラスメンバ
    • 単純なスクリプトではグローバル変数の lock でも動きますが、上記コードのようにクラスのメンバ変数(self.lock)として持たせると、複数の口座オブジェクトを作った際にそれぞれ独立して排他制御ができるため推奨されます。

注意点

  1. デッドロック
    • ロックの中でさらに別のロックを獲得しようとすると、設計によっては永遠に待ち続ける「デッドロック」が発生します。複数のロックを使う場合は、獲得順序を統一するなどの注意が必要です。
  2. awaitの位置
    • ロックブロック内で await を行うと、その待機中もロックは保持されたままです。他のタスクはその間ずっと待たされます。
    • 逆に、ロックブロック内に await(コンテキストスイッチする箇所)が一切ない場合、そもそも競合は発生しにくいため、ロック自体が不要な場合もあります。
  3. スレッドセーフとの違い
    • asyncio.Lock はあくまで asyncio のイベントループ内での競合を防ぐものです。threading モジュールで動作する別スレッドからのアクセスを防ぐことはできません(その場合は threading.Lock が必要)。

応用

lock.acquire() と lock.release() を手動で使うパターンです。

通常は async with が推奨されますが、関数をまたいでロックを維持したい場合などの特殊なケースで使用します。

import asyncio

lock = asyncio.Lock()

async def manual_lock_example():
    print("ロックを獲得します...")
    await lock.acquire()
    
    try:
        print("クリティカルセクション実行中")
        await asyncio.sleep(0.1)
    finally:
        # エラーが起きても確実に解放するために try-finally を使う
        print("ロックを解放します")
        lock.release()

if __name__ == "__main__":
    asyncio.run(manual_lock_example())

まとめ

asyncio での状態管理において、データの不整合を防ぐためにはロックが不可欠です。

  • 向く場面: データベースの更新、ファイルへの書き込み、グローバル変数の集計など、複数のタスクが読み書きを行う場合。
  • 変更ポイント: async with lock: で囲むだけですが、囲む範囲を間違えないようにしましょう。
  • 注意点: 不要なロックは処理速度を落とします。本当に競合する部分だけを保護してください。

直感的に並行処理を書けるのが asyncio の強みですが、副作用としてのデータ競合には常に注意を払いましょう。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次