概要
Pythonで処理を高速化・効率化するための3つのアプローチ(マルチスレッド、マルチプロセス、非同期処理)について解説します。
これらは「複数のタスクを同時にこなす」ための技術ですが、得意な場面が全く異なります。それぞれの仕組みを理解し、適切な場面で使い分けるための基礎コードを紹介します。
仕様(入出力)
- 入力: なし(各手法によるタスク実行)
- 出力: タスクの実行ログ、処理時間
- 前提:
- 標準ライブラリ
threading,multiprocessing,asyncioを使用。 - Python 3.7以上推奨。
- 標準ライブラリ
基本の使い方
3つの手法の「コードの書き方」と「イメージ」の違いです。
1. マルチスレッド (threading)
「1人の作業者が、待ち時間(レンジで加熱中など)に別の作業をする」 イメージです。
通信待ちやファイル読み込みなど、I/O待ち が多い処理に向いています。
import threading
import time
def task(name):
print(f"{name} 開始")
time.sleep(2) # 待機(通信などを模倣)
print(f"{name} 完了")
# スレッドを2つ作成
t1 = threading.Thread(target=task, args=("Thread-A",))
t2 = threading.Thread(target=task, args=("Thread-B",))
t1.start()
t2.start()
# 全員の終了を待つ
t1.join()
t2.join()
2. 非同期処理 (asyncio)
「1人の作業者が、司令塔(イベントループ)の指示に従って高速にタスクを切り替える」 イメージです。
マルチスレッドよりもメモリ効率が良く、大量の接続をさばくWebサーバーやクローラーに向いています。
import asyncio
async def task(name):
print(f"{name} 開始")
await asyncio.sleep(2) # ノンブロッキングな待機
print(f"{name} 完了")
async def main():
# 2つのタスクを同時並行で予約
await asyncio.gather(
task("Async-A"),
task("Async-B")
)
if __name__ == "__main__":
asyncio.run(main())
3. マルチプロセス (multiprocessing)
「作業者を2人に増やす」 イメージです。
CPUのコアをフル活用できるため、数値計算や画像処理などのCPU負荷が高い処理に向いています。
import multiprocessing
import time
def task(name):
print(f"{name} 開始")
# CPUを使う重い処理のシミュレーション
_ = [i**2 for i in range(10000000)]
print(f"{name} 完了")
if __name__ == "__main__":
# Windowsでは if __name__ == "__main__" が必須
p1 = multiprocessing.Process(target=task, args=("Process-A",))
p2 = multiprocessing.Process(target=task, args=("Process-B",))
p1.start()
p2.start()
p1.join()
p2.join()
コード全文
これら3つの違いを体感するために、それぞれの特性に合ったタスク(I/Oバウンド vs CPUバウンド)を実行し、比較するコードです。
import time
import threading
import multiprocessing
import asyncio
# --- タスク定義 ---
def heavy_calculation():
"""CPUバウンドな処理: 計算"""
# 無駄に重い計算を行う
sum([i**2 for i in range(10**6)])
def io_waiting():
"""I/Oバウンドな処理: 待機"""
# 通信待ちなどを想定してスリープ
time.sleep(1)
async def async_io_waiting():
"""非同期I/O処理"""
await asyncio.sleep(1)
# --- 実行関数の定義 ---
def run_multithreading():
"""マルチスレッドでI/O処理を実行"""
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=io_waiting)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"[Multi-Thread] I/O処理 4回: {time.time() - start:.4f}秒")
def run_multiprocessing():
"""マルチプロセスで計算処理を実行"""
start = time.time()
processes = []
for _ in range(4):
p = multiprocessing.Process(target=heavy_calculation)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"[Multi-Process] 計算処理 4回: {time.time() - start:.4f}秒")
async def run_asyncio():
"""AsyncioでI/O処理を実行"""
start = time.time()
tasks = [async_io_waiting() for _ in range(4)]
await asyncio.gather(*tasks)
print(f"[Asyncio ] I/O処理 4回: {time.time() - start:.4f}秒")
def run_normal_calculation():
"""比較用: 通常のループで計算処理"""
start = time.time()
for _ in range(4):
heavy_calculation()
print(f"[Normal Loop ] 計算処理 4回: {time.time() - start:.4f}秒")
if __name__ == "__main__":
print("--- 並行処理の比較ベンチマーク ---")
# 1. マルチスレッド (I/O待ちに強い)
# 4回sleep(1)しても、並列なので約1秒で終わるはず
run_multithreading()
# 2. Asyncio (I/O待ちに強い)
# こちらも約1秒で終わるはず。スレッドより低コスト
asyncio.run(run_asyncio())
# 3. マルチプロセス (CPU処理に強い)
# 4つのコアがあれば、直列実行より速くなるはず
run_multiprocessing()
# 4. 通常実行 (比較用)
# 直列なので時間がかかる
run_normal_calculation()
カスタムポイント
それぞれの特性と比較表です。
| 特徴 | マルチスレッド | async/await | マルチプロセス |
| モジュール | threading | asyncio | multiprocessing |
| メモリ空間 | 共有する | 共有する | 独立している |
| 得意分野 | I/O待ち(通信・DB) | I/O待ち(大量同時接続) | CPU計算(解析・加工) |
| CPU並列性 | × (GILにより1コア) | × (1コア) | ○ (複数コア利用) |
| 実装難易度 | 低〜中 | 中〜高 | 中 |
- GIL (Global Interpreter Lock):
- Python(CPython)の仕様上、マルチスレッドであっても同時に動くCPU命令は1つだけに制限されています。そのため、スレッドを増やしても計算速度は上がりません。計算を速くしたいなら「マルチプロセス」一択です。
注意点
- マルチプロセスの
if __name__ == "__main__":- Windows環境でマルチプロセスを行う場合、このブロックで囲まないとプロセスが無限に生成されてクラッシュします。必須です。
- 競合状態 (Race Condition)
- マルチスレッドは変数を共有するため、同時に書き込みに行くとデータが壊れることがあります。「ロック (
Lock)」などの排他制御が必要です。
- マルチスレッドは変数を共有するため、同時に書き込みに行くとデータが壊れることがあります。「ロック (
- 非同期の「ブロッキング」
async関数の中でtime.sleep()や重い計算を行うと、イベントループ全体が止まってしまいます。非同期のメリットが消えるため、必ずawait asyncio.sleep()を使い、重い処理は避けてください。
応用
より現代的で扱いやすい concurrent.futures モジュールを使った実装例です。スレッドとプロセスの使い分けを引数一つで切り替えられます。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
time.sleep(1)
return n * n
if __name__ == "__main__":
# ThreadPoolExecutor または ProcessPoolExecutor に変えるだけで切り替え可能
# I/Oバウンドなら ThreadPoolExecutor
# CPUバウンドなら ProcessPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task, [1, 2, 3, 4, 5]))
print(f"結果: {results}")
まとめ
- 通信・待機が多い →
asyncioまたはthreading - 計算・集計が重い →
multiprocessing - 手軽に並列化したい →
concurrent.futures
Pythonの並行処理は、この「I/Oバウンド」か「CPUバウンド」かの見極めが全てです。目的に合わせて使い分けてください。
