目次
概要
Pythonの queue モジュールは、マルチスレッドプログラミングにおいて、スレッド間でデータを安全にやり取り(通信)するための仕組みを提供します。
list などの通常のデータ構造を複数のスレッドで同時に操作するとデータが破損する恐れがありますが、queue.Queue は内部で排他制御が行われているため、ロック(Lock)を明示的に書かなくても安全にデータの追加・取り出しが可能です。
仕様(入出力)
- 入力: 任意のPythonオブジェクト(データ)
- 出力: 格納された順序に従ったデータ取り出し
- 機能:
- FIFO (First-In, First-Out): 先に入れたデータが先に出てくる(行列の待ち順)。
- スレッドセーフ: 複数のスレッドが同時に
putやgetを行っても整合性が保たれる。 - ブロッキング: キューが空の状態で
get()すると、データが入るまで待機させることができる。
メソッドと処理一覧
| メソッド・構文 | 引数例 | 処理・意味 |
queue.Queue(maxsize) | maxsize=0 | キューオブジェクトを生成します。maxsize で容量制限を設定可能(0は無制限)。 |
q.put(item) | item, timeout=None | キューにデータを入れます。キューが満杯の場合、空きができるまで待機(ブロック)します。 |
q.get() | timeout=None | キューからデータを取り出します。キューが空の場合、データが入るまで待機(ブロック)します。 |
q.qsize() | なし | 現在キューに入っているデータの概算数を返します。 |
q.empty() | なし | キューが空なら True を返します。 |
q.full() | なし | キューが満杯なら True を返します。 |
基本の使い方
まずはシングルスレッドでの基本的な動作(FIFO)を確認するコードです。
import queue
# キューの作成
q = queue.Queue()
# データの追加 (Put)
q.put("Data-A")
q.put("Data-B")
q.put("Data-C")
print(f"現在のサイズ: {q.qsize()}")
# データの取り出し (Get)
# 追加した順序(A -> B -> C)で取り出されます
while not q.empty():
data = q.get()
print(f"取得: {data}")
コード全文
マルチスレッド環境での使用例です。
「生産者(Producer)」スレッドがデータを作り、「消費者(Consumer)」スレッドがそれを処理する、という一般的な並行処理パターンを実装しています。
import threading
import queue
import time
import random
def producer(q, count):
"""データを生成してキューに追加するスレッド処理"""
print("[Producer] データ生成を開始します")
for i in range(count):
item = f"Order-{i}"
# 擬似的な生成時間
time.sleep(random.uniform(0.1, 0.5))
# キューに追加
q.put(item)
print(f" [Producer] Put -> {item} (QSize: {q.qsize()})")
print("[Producer] 全データの生成完了")
def consumer(q, thread_name):
"""キューからデータを取り出して処理するスレッド処理"""
print(f"[{thread_name}] 待機開始")
while True:
try:
# キューからデータを取得
# timeout=3: 3秒待っても来なければ終了とみなす
item = q.get(timeout=3)
print(f" [{thread_name}] Get <- {item}")
# 擬似的な処理時間
time.sleep(random.uniform(0.2, 0.8))
# タスク完了を通知(今回は使用しないが定石として重要)
q.task_done()
except queue.Empty:
# タイムアウトしたらループ終了
print(f"[{thread_name}] データが来ないため終了します")
break
if __name__ == "__main__":
# スレッドセーフなキューを作成
my_queue = queue.Queue()
# 生産者スレッドを作成
p_thread = threading.Thread(target=producer, args=(my_queue, 5))
# 消費者スレッドを2つ作成(並列処理)
c_threads = []
for i in range(2):
t = threading.Thread(target=consumer, args=(my_queue, f"Consumer-{i}"))
c_threads.append(t)
# スレッド開始
p_thread.start()
for t in c_threads:
t.start()
# 全スレッドの終了待機
p_thread.join()
for t in c_threads:
t.join()
print("全ての処理が完了しました")
カスタムポイント
ブロッキングの制御
put と get はデフォルトで「完了できるまで待つ(ブロックする)」挙動をしますが、引数で制御可能です。
q.get(block=False)またはq.get_nowait():- 待機しません。キューが空の場合、即座に
queue.Empty例外が発生します。
- 待機しません。キューが空の場合、即座に
q.get(timeout=5):- 最大5秒待ちます。それでも空なら
queue.Empty例外が発生します。
- 最大5秒待ちます。それでも空なら
種類の異なるキュー
queue モジュールには、通常のFIFO以外にも便利なキューがあります。
queue.LifoQueue:- LIFO (Last-In, First-Out)。スタック構造。後に入れたものが先に出ます。
queue.PriorityQueue:- 優先順位付きキュー。データを
put((優先度, データ))のタプルで入れると、優先度(数値が小さい方)が高い順に取り出されます。
- 優先順位付きキュー。データを
注意点
- qsize() の信頼性
- マルチスレッド環境では、
q.qsize() > 0を確認した直後に、別のスレッドがget()して空になる可能性があります。 - そのため、
if not q.empty(): data = q.get()という書き方は安全ではありません(競合状態でエラーになる)。 - 必ず
q.get()をそのまま呼び出して待機させるか、try-except queue.Emptyで例外処理を行ってください。
- マルチスレッド環境では、
- Noneの格納
- キューには
Noneを含めた任意のオブジェクトを格納できますが、慣例として「終了シグナル(番兵)」としてNoneをputし、受け取った側がNoneならループを抜ける、という実装によく使われます。
- キューには
応用
task_done() と join() を使用して、キュー内の全タスクが「処理完了」するまでメインスレッドを待機させるパターンです。
import threading
import queue
def worker(q):
while True:
item = q.get()
if item is None: # 終了シグナル
q.task_done()
break
print(f"処理中: {item}")
# 処理が終わったことをキューに通知
q.task_done()
q = queue.Queue()
t = threading.Thread(target=worker, args=(q,))
t.start()
# タスク投入
for i in range(5):
q.put(i)
# 全タスクの処理完了(task_doneが呼ばれる)までブロック
q.join()
# スレッドを停止させる
q.put(None)
t.join()
まとめ
マルチスレッドでデータをやり取りするなら、複雑な Lock 制御を自前で書くよりも、まず queue.Queue の利用を検討してください。
「入れる側」と「取り出す側」を分離(疎結合)でき、安全かつ簡潔に非同期処理を実装できます。
