目次
概要
multiprocessing.Process を使用する場合、通常は target 引数に関数を渡しますが、クラスを継承(サブクラス化)して実装する方法もあります。 この手法をとると、プロセス自体に状態(属性)を持たせたり、複雑な初期化処理を __init__ にカプセル化したりすることが容易になります。 本記事では、Process クラスを継承し、run メソッドをオーバーライドすることで、オブジェクト指向的に並列処理を実装する方法を解説します。
仕様(入出力)
- 入力
multiprocessing.Processを継承したカスタムクラス。- コンストラクタ(
__init__)での初期値の受け渡し。
- 必須要件
run()メソッドのオーバーライド: プロセス開始時に実行されるメインロジックをここに記述します。super().__init__()の呼び出し: コンストラクタを定義する場合、親クラスの初期化を必ず最初に行う必要があります。
- 出力
start()メソッドを呼ぶと、別プロセスとしてrun()内のコードが実行されます。
基本の使い方
関数を渡す代わりに、クラス内に処理を記述します。引数は args ではなく、クラスの __init__ で直接受け取ります。
class MyProcess(multiprocessing.Process):
def __init__(self, data):
super().__init__() # 必須
self.data = data
def run(self):
# start() 時に実行される処理
print(self.data)
p = MyProcess("Hello")
p.start()
p.join()
コード全文
ここでは「数値カウンター」と「文字ステッパー」という2種類の異なる動作をするワーカークラスを定義し、並列実行する例を示します。
import multiprocessing
import time
import os
class CountWorker(multiprocessing.Process):
"""
指定された範囲の数値をカウントアップして出力するプロセス
"""
def __init__(self, name: str, limit: int):
# 親クラスの初期化(これを忘れるとエラーになります)
super().__init__()
# プロセス内で使用するパラメータをインスタンス変数として保持
self.worker_name = name
self.limit = limit
def run(self):
"""
start() 実行時に別プロセスで呼び出されるメソッド
"""
print(f"[{self.worker_name}] 開始 (PID: {os.getpid()})")
for i in range(self.limit):
print(f" [{self.worker_name}] Count: {i}")
time.sleep(1.0)
print(f"[{self.worker_name}] 終了")
class CharacterWorker(multiprocessing.Process):
"""
リスト内の文字を順に出力するプロセス
"""
def __init__(self, name: str, chars: list):
super().__init__()
self.worker_name = name
self.chars = chars
def run(self):
print(f"[{self.worker_name}] 開始 (PID: {os.getpid()})")
for char in self.chars:
print(f" [{self.worker_name}] Char: {char}")
time.sleep(1.5)
print(f"[{self.worker_name}] 終了")
def main():
print("--- メインプロセス開始 ---")
# インスタンス化(引数は通常のクラス同様に渡せます)
worker_num = CountWorker(name="NumberProc", limit=5)
worker_char = CharacterWorker(name="CharProc", chars=["A", "B", "C", "D"])
# プロセスの開始(runメソッドが別プロセスで実行されます)
worker_num.start()
worker_char.start()
# プロセスの終了待機
worker_num.join()
worker_char.join()
print("--- 全ての処理が終了しました ---")
if __name__ == "__main__":
main()
実行結果例
--- メインプロセス開始 ---
[NumberProc] 開始 (PID: 12345)
[CharProc] 開始 (PID: 12346)
[NumberProc] Count: 0
[CharProc] Char: A
[NumberProc] Count: 1
[NumberProc] Count: 2
[CharProc] Char: B
[NumberProc] Count: 3
[NumberProc] Count: 4
[NumberProc] 終了
[CharProc] Char: C
[CharProc] Char: D
[CharProc] 終了
--- 全ての処理が終了しました ---
カスタムポイント
- コンストラクタ引数の活用
Process(target=func, args=(...))と書くよりも、MyProcess(param1, param2)のように通常のクラスインスタンスとして引数を渡せるため、コードの可読性が向上します。- 型ヒント(Type Hinting)も標準的な方法で適用できます。
- メソッドの追加
run以外にも、プロセス内部で使用するヘルパーメソッド(例:_connect_db(),_process_data())をクラス内に追加することで、ロジックを整理できます。
注意点
- super().__init__() の呼び出し
- サブクラスで
__init__を定義する場合、必ずsuper().__init__()を呼び出してください。これを行わないと、Processとしての内部設定が行われず、start()時にAttributeErrorなどが発生します。
- サブクラスで
- メモリ空間の独立性
- クラスのインスタンス変数(
self.count等)をrun()内で書き換えても、メインプロセスにある元のインスタンスには反映されません(メモリがコピーされているため)。 - データを戻したい場合は、
QueueやPipeをコンストラクタで受け取って使用する必要があります。
- クラスのインスタンス変数(
- プロセス間通信の初期化
Queueなどの同期プリミティブをメンバ変数として持たせる場合、それらは__init__で受け取るか生成し、runメソッド内で使用してください。
応用
コンストラクタで multiprocessing.Queue を受け取り、データ共有を行う実用的なパターンです。
import multiprocessing
class Producer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(3):
self.queue.put(f"Data-{i}")
print(f"[Prod] Put Data-{i}")
class Consumer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = self.queue.get()
if item is None: # 終了シグナル
break
print(f"[Cons] Got {item}")
def main_app():
q = multiprocessing.Queue()
# キューを渡して初期化
p = Producer(q)
c = Consumer(q)
p.start()
c.start()
p.join()
q.put(None) # 終了シグナル送信
c.join()
if __name__ == "__main__":
main_app()
まとめ
multiprocessing.Process のサブクラス化は、コードの構造化に役立ちます。
- 向く場面: 複雑な処理を行うワーカーの実装、状態(設定値など)をきれいに保持したい場合、
target関数への引数渡しが煩雑になった場合。 - 変更ポイント:
run()メソッドに処理を記述し、__init__でsuper()を呼ぶことを忘れないでください。 - 注意点: オブジェクト指向で書いても、プロセス間のメモリは分離しているという原則は変わりません。
大規模な並列処理アプリケーションを構築する際は、この継承パターンを採用することで保守性を高めることができます。
