【Python】スレッドセーフなキューでデータを安全に受け渡す

目次

概要

Pythonの queue モジュールは、マルチスレッドプログラミングにおいて、スレッド間でデータを安全にやり取り(通信)するための仕組みを提供します。

list などの通常のデータ構造を複数のスレッドで同時に操作するとデータが破損する恐れがありますが、queue.Queue は内部で排他制御が行われているため、ロック(Lock)を明示的に書かなくても安全にデータの追加・取り出しが可能です。

仕様(入出力)

  • 入力: 任意のPythonオブジェクト(データ)
  • 出力: 格納された順序に従ったデータ取り出し
  • 機能:
    • FIFO (First-In, First-Out): 先に入れたデータが先に出てくる(行列の待ち順)。
    • スレッドセーフ: 複数のスレッドが同時に putget を行っても整合性が保たれる。
    • ブロッキング: キューが空の状態で get() すると、データが入るまで待機させることができる。

メソッドと処理一覧

メソッド・構文引数例処理・意味
queue.Queue(maxsize)maxsize=0キューオブジェクトを生成します。maxsize で容量制限を設定可能(0は無制限)。
q.put(item)item, timeout=Noneキューにデータを入れます。キューが満杯の場合、空きができるまで待機(ブロック)します。
q.get()timeout=Noneキューからデータを取り出します。キューが空の場合、データが入るまで待機(ブロック)します。
q.qsize()なし現在キューに入っているデータの概算数を返します。
q.empty()なしキューが空なら True を返します。
q.full()なしキューが満杯なら True を返します。

基本の使い方

まずはシングルスレッドでの基本的な動作(FIFO)を確認するコードです。

import queue

# キューの作成
q = queue.Queue()

# データの追加 (Put)
q.put("Data-A")
q.put("Data-B")
q.put("Data-C")

print(f"現在のサイズ: {q.qsize()}")

# データの取り出し (Get)
# 追加した順序(A -> B -> C)で取り出されます
while not q.empty():
    data = q.get()
    print(f"取得: {data}")

コード全文

マルチスレッド環境での使用例です。

「生産者(Producer)」スレッドがデータを作り、「消費者(Consumer)」スレッドがそれを処理する、という一般的な並行処理パターンを実装しています。

import threading
import queue
import time
import random

def producer(q, count):
    """データを生成してキューに追加するスレッド処理"""
    print("[Producer] データ生成を開始します")
    for i in range(count):
        item = f"Order-{i}"
        
        # 擬似的な生成時間
        time.sleep(random.uniform(0.1, 0.5))
        
        # キューに追加
        q.put(item)
        print(f"  [Producer] Put -> {item} (QSize: {q.qsize()})")
    
    print("[Producer] 全データの生成完了")

def consumer(q, thread_name):
    """キューからデータを取り出して処理するスレッド処理"""
    print(f"[{thread_name}] 待機開始")
    while True:
        try:
            # キューからデータを取得
            # timeout=3: 3秒待っても来なければ終了とみなす
            item = q.get(timeout=3)
            
            print(f"    [{thread_name}] Get <- {item}")
            
            # 擬似的な処理時間
            time.sleep(random.uniform(0.2, 0.8))
            
            # タスク完了を通知(今回は使用しないが定石として重要)
            q.task_done()
            
        except queue.Empty:
            # タイムアウトしたらループ終了
            print(f"[{thread_name}] データが来ないため終了します")
            break

if __name__ == "__main__":
    # スレッドセーフなキューを作成
    my_queue = queue.Queue()
    
    # 生産者スレッドを作成
    p_thread = threading.Thread(target=producer, args=(my_queue, 5))
    
    # 消費者スレッドを2つ作成(並列処理)
    c_threads = []
    for i in range(2):
        t = threading.Thread(target=consumer, args=(my_queue, f"Consumer-{i}"))
        c_threads.append(t)
    
    # スレッド開始
    p_thread.start()
    for t in c_threads:
        t.start()
    
    # 全スレッドの終了待機
    p_thread.join()
    for t in c_threads:
        t.join()
        
    print("全ての処理が完了しました")

カスタムポイント

ブロッキングの制御

putget はデフォルトで「完了できるまで待つ(ブロックする)」挙動をしますが、引数で制御可能です。

  • q.get(block=False) または q.get_nowait():
    • 待機しません。キューが空の場合、即座に queue.Empty 例外が発生します。
  • q.get(timeout=5):
    • 最大5秒待ちます。それでも空なら queue.Empty 例外が発生します。

種類の異なるキュー

queue モジュールには、通常のFIFO以外にも便利なキューがあります。

  • queue.LifoQueue:
    • LIFO (Last-In, First-Out)。スタック構造。後に入れたものが先に出ます。
  • queue.PriorityQueue:
    • 優先順位付きキュー。データを put((優先度, データ)) のタプルで入れると、優先度(数値が小さい方)が高い順に取り出されます。

注意点

  1. qsize() の信頼性
    • マルチスレッド環境では、q.qsize() > 0 を確認した直後に、別のスレッドが get() して空になる可能性があります。
    • そのため、if not q.empty(): data = q.get() という書き方は安全ではありません(競合状態でエラーになる)。
    • 必ず q.get() をそのまま呼び出して待機させるか、try-except queue.Empty で例外処理を行ってください。
  2. Noneの格納
    • キューには None を含めた任意のオブジェクトを格納できますが、慣例として「終了シグナル(番兵)」として Noneput し、受け取った側が None ならループを抜ける、という実装によく使われます。

応用

task_done()join() を使用して、キュー内の全タスクが「処理完了」するまでメインスレッドを待機させるパターンです。

import threading
import queue

def worker(q):
    while True:
        item = q.get()
        if item is None: # 終了シグナル
            q.task_done()
            break
        print(f"処理中: {item}")
        # 処理が終わったことをキューに通知
        q.task_done()

q = queue.Queue()
t = threading.Thread(target=worker, args=(q,))
t.start()

# タスク投入
for i in range(5):
    q.put(i)

# 全タスクの処理完了(task_doneが呼ばれる)までブロック
q.join()

# スレッドを停止させる
q.put(None)
t.join()

まとめ

マルチスレッドでデータをやり取りするなら、複雑な Lock 制御を自前で書くよりも、まず queue.Queue の利用を検討してください。

「入れる側」と「取り出す側」を分離(疎結合)でき、安全かつ簡潔に非同期処理を実装できます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次