[Python] Implementing Class-Based Parallel Processing by Inheriting multiprocessing.Process

目次

Overview

When using multiprocessing.Process, you usually pass a function to the target argument. However, you can also implement parallel processing by inheriting (subclassing) the Process class. This approach makes it easier to give the process its own state (attributes) and encapsulate complex initialization logic within the __init__ method. This article explains how to implement parallel processing in an object-oriented way by inheriting the Process class and overriding the run method.

Specifications (Input/Output)

Input

  • A custom class that inherits multiprocessing.Process.
  • Passing initial values through the constructor (__init__).

Requirements

  • Overriding run() method: Write the main logic to be executed when the process starts here.
  • Calling super().__init__(): When defining a constructor, you must call the parent class’s initialization first.

Output

  • When the start() method is called, the code inside run() executes as a separate process.

Basic Usage

Instead of passing a function, write the logic inside a class. Arguments are received directly in the __init__ method of the class rather than through the args parameter.

class MyProcess(multiprocessing.Process):
    def __init__(self, data):
        super().__init__()  # Mandatory
        self.data = data

    def run(self):
        # Logic executed when start() is called
        print(self.data)

p = MyProcess("Hello")
p.start()
p.join()

Full Code Example

In this example, we define two different worker classes, a “Number Counter” and a “Character Stepper,” and execute them in parallel.

import multiprocessing
import time
import os

class CountWorker(multiprocessing.Process):
    """
    A process that counts up and prints numbers in a specified range.
    """
    def __init__(self, name: str, limit: int):
        # Initialize the parent class (mandatory, or it will cause an error)
        super().__init__()
        
        # Store parameters for use within the process as instance variables
        self.worker_name = name
        self.limit = limit

    def run(self):
        """
        Method called in a separate process when start() is executed.
        """
        print(f"[{self.worker_name}] Started (PID: {os.getpid()})")
        
        for i in range(self.limit):
            print(f"  [{self.worker_name}] Count: {i}")
            time.sleep(1.0)
            
        print(f"[{self.worker_name}] Finished")

class CharacterWorker(multiprocessing.Process):
    """
    A process that prints characters from a list sequentially.
    """
    def __init__(self, name: str, chars: list):
        super().__init__()
        self.worker_name = name
        self.chars = chars

    def run(self):
        print(f"[{self.worker_name}] Started (PID: {os.getpid()})")
        
        for char in self.chars:
            print(f"  [{self.worker_name}] Char: {char}")
            time.sleep(1.5)
            
        print(f"[{self.worker_name}] Finished")

def main():
    print("--- Main Process Started ---")

    # Instantiate classes (pass arguments like a normal class)
    worker_num = CountWorker(name="NumberProc", limit=5)
    worker_char = CharacterWorker(name="CharProc", chars=["A", "B", "C", "D"])

    # Start the processes (the run method executes in a separate process)
    worker_num.start()
    worker_char.start()

    # Wait for processes to finish
    worker_num.join()
    worker_char.join()

    print("--- All processes have finished ---")

if __name__ == "__main__":
    main()

Example Output

--- Main Process Started ---
[NumberProc] Started (PID: 12345)
[CharProc] Started (PID: 12346)
  [NumberProc] Count: 0
  [CharProc] Char: A
  [NumberProc] Count: 1
  [NumberProc] Count: 2
  [CharProc] Char: B
  [NumberProc] Count: 3
  [NumberProc] Count: 4
[NumberProc] Finished
  [CharProc] Char: C
  [CharProc] Char: D
[CharProc] Finished
--- All processes have finished ---

Customization Points

  • Using Constructor Arguments: Passing arguments as a normal class instance like MyProcess(param1, param2) improves code readability compared to using Process(target=func, args=(...)). You can also apply standard Type Hinting.
  • Adding Methods: You can add helper methods (e.g., _connect_db(), _process_data()) used inside the process to organize the logic better.

Important Notes

  • Calling super().__init__(): If you define __init__ in a subclass, always call super().__init__(). Without this, internal settings for the Process will not be configured, leading to errors like AttributeError when calling start().
  • Independent Memory Space: Even if you change an instance variable (e.g., self.count) inside run(), it will not be reflected in the original instance in the main process because the memory is copied. If you need to return data, you must use Queue or Pipe passed through the constructor.
  • Initializing Inter-process Communication: If you store synchronization primitives like Queue as member variables, receive or create them in __init__ and use them inside the run method.

Advanced Application

This is a practical pattern where a multiprocessing.Queue is passed to the constructor to share data.

import multiprocessing

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(3):
            self.queue.put(f"Data-{i}")
            print(f"[Prod] Put Data-{i}")

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:  # End signal
                break
            print(f"[Cons] Got {item}")

def main_app():
    q = multiprocessing.Queue()
    
    # Initialize by passing the queue
    p = Producer(q)
    c = Consumer(q)
    
    p.start()
    c.start()
    
    p.join()
    q.put(None)  # Send end signal
    c.join()

if __name__ == "__main__":
    main_app()

Conclusion

Subclassing multiprocessing.Process is helpful for structuring your code.

Caution: Even with object-oriented code, the principle of separate memory spaces between processes still applies. By using this inheritance pattern when building large-scale parallel processing applications, you can improve maintainability.

Best for: Implementing workers with complex logic, keeping state (settings, etc.) clean, and when passing arguments to a target function becomes messy.

Key points: Write your logic in the run() method and do not forget to call super().__init__() in __init__.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次