目次
概要
multiprocessing で生成したプロセスが「現在実行中なのか」「既に終了しているのか」を判定するには、is_alive() メソッドを使用します。 また、プロセスID (pid) や終了コード (exitcode) を確認することで、より詳細なプロセスの状態管理が可能になります。 本記事では、非同期に動作する複数のプロセスの生存確認を行いながら、メインプロセス側で監視ループを回す方法を解説します。
仕様(入出力)
- 入力: 生成された
multiprocessing.Processオブジェクト。 - 出力:
is_alive(): プロセスが実行中ならTrue、それ以外(未開始または終了済み)ならFalse。pid: プロセスID(実行中のみ有効)。exitcode: 終了コード(終了済みのみ有効。正常終了なら0)。
- 動作仕様:
- メインプロセスでループを回し、一定間隔で各サブプロセスの状態をポーリング(問い合わせ)して表示します。
基本の使い方
プロセスオブジェクトに対してメソッドを呼び出します。
if process.is_alive():
print(f"実行中 (PID: {process.pid})")
else:
print(f"終了済み (ExitCode: {process.exitcode})")
コード全文
処理時間の異なる2つのプロセスを起動し、両方が終了するまでメインプロセスで状況を実況し続けるコードです。
import multiprocessing
import time
import os
import random
def worker_task(name: str, duration: int):
"""
指定された時間だけ動作するタスク
"""
print(f" [{name}] 開始 (PID: {os.getpid()}) 所要時間: {duration}秒")
# 処理のシミュレーション
for i in range(duration):
time.sleep(1)
print(f" [{name}] 終了")
def main():
print("--- メインプロセス開始 ---")
# 処理時間の異なる2つのプロセスを作成
# process1: 3秒で終了
p1 = multiprocessing.Process(
target=worker_task,
kwargs={"name": "Process-1", "duration": 3}
)
# process2: 6秒で終了
p2 = multiprocessing.Process(
target=worker_task,
kwargs={"name": "Process-2", "duration": 6}
)
# プロセスの開始
p1.start()
p2.start()
# 監視ループ
# どちらか片方でも生きている間はループを続ける
while p1.is_alive() or p2.is_alive():
print(f">>> 監視中: P1={p1.is_alive()}, P2={p2.is_alive()}")
if not p1.is_alive() and p1.exitcode is not None:
print(f" (P1は終了済みです。ExitCode: {p1.exitcode})")
time.sleep(1.0)
print("--- 全プロセスの終了を検知 ---")
# リソースの回収(is_aliveがFalseでも、ゾンビプロセス化を防ぐためにjoinは呼ぶべき)
p1.join()
p2.join()
print("全ての処理が完了しました")
if __name__ == "__main__":
main()
実行結果例
Process-1が先に終了し、その後Process-2が終了する様子が確認できます。
--- メインプロセス開始 ---
[Process-1] 開始 (PID: 12345) 所要時間: 3秒
[Process-2] 開始 (PID: 12346) 所要時間: 6秒
>>> 監視中: P1=True, P2=True
>>> 監視中: P1=True, P2=True
>>> 監視中: P1=True, P2=True
[Process-1] 終了
>>> 監視中: P1=False, P2=True
(P1は終了済みです。ExitCode: 0)
>>> 監視中: P1=False, P2=True
(P1は終了済みです。ExitCode: 0)
[Process-2] 終了
--- 全プロセスの終了を検知 ---
全ての処理が完了しました
カスタムポイント
- ポーリング間隔の調整
time.sleep(1.0)の値を短くすれば検知の反応速度は上がりますが、CPU負荷が高まります。用途に応じて調整してください。
- 監視条件の変更
while p1.is_alive() or p2.is_alive():は「少なくとも1つが動いている間」待ちます。while p1.is_alive() and p2.is_alive():にすると「両方が動いている間」だけループし、片方が終わった時点でループを抜けます。
- 強制終了
- 長時間終わらないプロセスを検知した場合、
process.terminate()を呼んで強制終了させるロジックを追加できます。
- 長時間終わらないプロセスを検知した場合、
注意点
- join() の重要性
is_alive()がFalseになっても、join()を呼ぶまではプロセス情報がメモリに残ります(ゾンビプロセス)。監視ループを抜けた後には必ずjoin()を実行してください。
- start() 前の状態
- プロセスインスタンスを作成した直後(
start()前)もis_alive()はFalseを返します。「終了した」のか「始まっていない」のかの区別には注意が必要です。
- プロセスインスタンスを作成した直後(
- exitcode のタイミング
- プロセス実行中、
exitcodeはNoneです。終了して初めて整数値が入ります。
- プロセス実行中、
応用
join(timeout) を使ったノンブロッキングな待機パターンです。 while ループで自前で sleep する代わりに、join のタイムアウト機能を使って「終了するか、指定時間経つまで待つ」を繰り返します。
import multiprocessing
import time
def task():
time.sleep(2)
if __name__ == "__main__":
p = multiprocessing.Process(target=task)
p.start()
# プロセスが終了するまで繰り返す
while p.is_alive():
print("まだ実行中です...")
# 0.5秒だけ待ってみる。終了すれば即座に戻る。終了しなければ0.5秒後に戻る。
p.join(timeout=0.5)
print("終了しました")
まとめ
is_alive() を活用することで、ブラックボックスになりがちな並列処理の進行状況を可視化できます。
- 向く場面: 進捗バーの表示、死活監視、片方のプロセスが終わったら次の処理を始めたい場合。
- 変更ポイント: 監視ループの条件式(
orかandか)を要件に合わせて設定してください。 - 注意点: 監視が終わった後も、リソース解放のための
join()は必須です。
単純に待つだけの p.join() ではなく、アクティブに状態を確認しながら制御を行う高度な並列処理フローを実現しましょう。
