【Python】multiprocessing.Processを継承してクラスベースで並列処理を実装する

目次

概要

multiprocessing.Process を使用する場合、通常は target 引数に関数を渡しますが、クラスを継承(サブクラス化)して実装する方法もあります。 この手法をとると、プロセス自体に状態(属性)を持たせたり、複雑な初期化処理を __init__ にカプセル化したりすることが容易になります。 本記事では、Process クラスを継承し、run メソッドをオーバーライドすることで、オブジェクト指向的に並列処理を実装する方法を解説します。

仕様(入出力)

  • 入力
    • multiprocessing.Process を継承したカスタムクラス。
    • コンストラクタ(__init__)での初期値の受け渡し。
  • 必須要件
    • run() メソッドのオーバーライド: プロセス開始時に実行されるメインロジックをここに記述します。
    • super().__init__() の呼び出し: コンストラクタを定義する場合、親クラスの初期化を必ず最初に行う必要があります。
  • 出力
    • start() メソッドを呼ぶと、別プロセスとして run() 内のコードが実行されます。

基本の使い方

関数を渡す代わりに、クラス内に処理を記述します。引数は args ではなく、クラスの __init__ で直接受け取ります。

class MyProcess(multiprocessing.Process):
    def __init__(self, data):
        super().__init__()  # 必須
        self.data = data

    def run(self):
        # start() 時に実行される処理
        print(self.data)

p = MyProcess("Hello")
p.start()
p.join()

コード全文

ここでは「数値カウンター」と「文字ステッパー」という2種類の異なる動作をするワーカークラスを定義し、並列実行する例を示します。

import multiprocessing
import time
import os

class CountWorker(multiprocessing.Process):
    """
    指定された範囲の数値をカウントアップして出力するプロセス
    """
    def __init__(self, name: str, limit: int):
        # 親クラスの初期化(これを忘れるとエラーになります)
        super().__init__()
        
        # プロセス内で使用するパラメータをインスタンス変数として保持
        self.worker_name = name
        self.limit = limit

    def run(self):
        """
        start() 実行時に別プロセスで呼び出されるメソッド
        """
        print(f"[{self.worker_name}] 開始 (PID: {os.getpid()})")
        
        for i in range(self.limit):
            print(f"  [{self.worker_name}] Count: {i}")
            time.sleep(1.0)
            
        print(f"[{self.worker_name}] 終了")

class CharacterWorker(multiprocessing.Process):
    """
    リスト内の文字を順に出力するプロセス
    """
    def __init__(self, name: str, chars: list):
        super().__init__()
        self.worker_name = name
        self.chars = chars

    def run(self):
        print(f"[{self.worker_name}] 開始 (PID: {os.getpid()})")
        
        for char in self.chars:
            print(f"  [{self.worker_name}] Char: {char}")
            time.sleep(1.5)
            
        print(f"[{self.worker_name}] 終了")

def main():
    print("--- メインプロセス開始 ---")

    # インスタンス化(引数は通常のクラス同様に渡せます)
    worker_num = CountWorker(name="NumberProc", limit=5)
    worker_char = CharacterWorker(name="CharProc", chars=["A", "B", "C", "D"])

    # プロセスの開始(runメソッドが別プロセスで実行されます)
    worker_num.start()
    worker_char.start()

    # プロセスの終了待機
    worker_num.join()
    worker_char.join()

    print("--- 全ての処理が終了しました ---")

if __name__ == "__main__":
    main()

実行結果例

--- メインプロセス開始 ---
[NumberProc] 開始 (PID: 12345)
[CharProc] 開始 (PID: 12346)
  [NumberProc] Count: 0
  [CharProc] Char: A
  [NumberProc] Count: 1
  [NumberProc] Count: 2
  [CharProc] Char: B
  [NumberProc] Count: 3
  [NumberProc] Count: 4
[NumberProc] 終了
  [CharProc] Char: C
  [CharProc] Char: D
[CharProc] 終了
--- 全ての処理が終了しました ---

カスタムポイント

  • コンストラクタ引数の活用
    • Process(target=func, args=(...)) と書くよりも、MyProcess(param1, param2) のように通常のクラスインスタンスとして引数を渡せるため、コードの可読性が向上します。
    • 型ヒント(Type Hinting)も標準的な方法で適用できます。
  • メソッドの追加
    • run 以外にも、プロセス内部で使用するヘルパーメソッド(例: _connect_db(), _process_data())をクラス内に追加することで、ロジックを整理できます。

注意点

  1. super().__init__() の呼び出し
    • サブクラスで __init__ を定義する場合、必ず super().__init__() を呼び出してください。これを行わないと、Process としての内部設定が行われず、start() 時に AttributeError などが発生します。
  2. メモリ空間の独立性
    • クラスのインスタンス変数(self.count 等)を run() 内で書き換えても、メインプロセスにある元のインスタンスには反映されません(メモリがコピーされているため)。
    • データを戻したい場合は、QueuePipe をコンストラクタで受け取って使用する必要があります。
  3. プロセス間通信の初期化
    • Queue などの同期プリミティブをメンバ変数として持たせる場合、それらは __init__ で受け取るか生成し、run メソッド内で使用してください。

応用

コンストラクタで multiprocessing.Queue を受け取り、データ共有を行う実用的なパターンです。

import multiprocessing

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(3):
            self.queue.put(f"Data-{i}")
            print(f"[Prod] Put Data-{i}")

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:  # 終了シグナル
                break
            print(f"[Cons] Got {item}")

def main_app():
    q = multiprocessing.Queue()
    
    # キューを渡して初期化
    p = Producer(q)
    c = Consumer(q)
    
    p.start()
    c.start()
    
    p.join()
    q.put(None)  # 終了シグナル送信
    c.join()

if __name__ == "__main__":
    main_app()

まとめ

multiprocessing.Process のサブクラス化は、コードの構造化に役立ちます。

  • 向く場面: 複雑な処理を行うワーカーの実装、状態(設定値など)をきれいに保持したい場合、target 関数への引数渡しが煩雑になった場合。
  • 変更ポイント: run() メソッドに処理を記述し、__init__super() を呼ぶことを忘れないでください。
  • 注意点: オブジェクト指向で書いても、プロセス間のメモリは分離しているという原則は変わりません。

大規模な並列処理アプリケーションを構築する際は、この継承パターンを採用することで保守性を高めることができます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次