データ分析のワークフローにおいて、データベース(DB)に保存されたデータを直接PandasのDataFrameとして読み込んだり、逆に加工したDataFrameをデータベースへ保存したりする処理は非常に重要です。PandasはSQLとの親和性が高く、標準機能でこれらをシームレスに行うことができます。
本記事では、標準ライブラリの sqlite3 および SQLAlchemy を使用して、データベースに対する読み書き(SELECT / INSERT)を行う方法を解説します。
SQLiteを用いた基本操作
まずはPythonに標準で組み込まれている sqlite3 モジュールを使用し、メモリ上のデータベースに対して読み書きを行う例を紹介します。ここでは、店舗の商品在庫データを管理するシナリオを想定します。
1. データベースからDataFrameへの読み込み (read_sql)
pd.read_sql 関数を使用すると、SQLクエリの結果を直接DataFrameとして取得できます。
- sql: 実行したいSQL文(SELECT文など)。
- con: データベース接続オブジェクト(コネクション)。
- index_col: DataFrameのインデックスとして使用する列名。
2. DataFrameからデータベースへの書き込み (to_sql)
df.to_sql メソッドを使用すると、DataFrameの内容をテーブルにINSERT(またはテーブル作成)できます。
- name: ターゲットとなるテーブル名。
- con: データベース接続オブジェクト。
- if_exists: テーブルが既に存在する場合の挙動。
'fail': エラーを発生させる(デフォルト)。'replace': テーブルを削除して作り直す。'append': 既存のテーブルに行を追加する。
- index: DataFrameのインデックスをカラムとして書き込むかどうか。
以下は、これらを組み合わせた完全な実装コードです。
import sqlite3
import pandas as pd
def manage_inventory_db():
"""
SQLiteデータベースに対するPandasの読み書き操作を実演する関数
"""
# メモリ上のSQLiteデータベースに接続
# (実ファイルを使用する場合は ":memory:" の代わりに "file.db" などを指定)
with sqlite3.connect(":memory:") as conn:
# --- 準備: テスト用テーブルとデータの作成 ---
cur = conn.cursor()
cur.execute("CREATE TABLE inventory (item_id INTEGER PRIMARY KEY, price INTEGER, stock INTEGER)")
# 初期データの投入
initial_data = [
(101, 1500, 50),
(102, 2800, 30),
(103, 900, 100)
]
cur.executemany("INSERT INTO inventory VALUES (?, ?, ?)", initial_data)
conn.commit()
print("--- 初期データベースの状態 ---")
# 確認のためSQLで直接出力
for row in cur.execute("SELECT * FROM inventory"):
print(row)
print("\n")
# --- 1. データベースからDataFrameへ読み込み (read_sql) ---
print("=== read_sqlによる読み込み ===")
# SQLクエリを実行し、結果をDataFrameに格納
# item_id列をDataFrameのインデックスとして設定
df_inventory = pd.read_sql(
"SELECT item_id, price, stock FROM inventory",
conn,
index_col="item_id"
)
print(df_inventory)
print("\n")
# --- 2. DataFrameからデータベースへ書き込み (to_sql) ---
print("=== to_sqlによる追加書き込み ===")
# 新規商品データの作成
new_items_data = {
"price": [3200, 450],
"stock": [15, 200]
}
# インデックス(item_id)を明示的に指定してDataFrame作成
df_new_items = pd.DataFrame(new_items_data, index=[104, 105])
df_new_items.index.name = "item_id" # インデックス名をDBのカラム名と一致させる
print("--- 追加するデータ ---")
print(df_new_items)
# データベースへ書き込み
# if_exists='append' で既存テーブルに追加
df_new_items.to_sql(
"inventory",
conn,
if_exists="append",
index=True, # インデックス(item_id)も書き込む
index_label="item_id"
)
# --- 結果の確認 ---
print("\n--- 追加後のデータベースの状態 ---")
df_updated = pd.read_sql("SELECT * FROM inventory", conn, index_col="item_id")
print(df_updated)
if __name__ == "__main__":
manage_inventory_db()
実行結果
--- 初期データベースの状態 ---
(101, 1500, 50)
(102, 2800, 30)
(103, 900, 100)
=== read_sqlによる読み込み ===
price stock
item_id
101 1500 50
102 2800 30
103 900 100
=== to_sqlによる追加書き込み ===
--- 追加するデータ ---
price stock
item_id
104 3200 15
105 450 200
--- 追加後のデータベースの状態 ---
price stock
item_id
101 1500 50
102 2800 30
103 900 100
104 3200 15
105 450 200
SQLAlchemyを用いた汎用的な接続
実務でMySQL、PostgreSQL、OracleなどのRDBMSを使用する場合は、SQLAlchemy ライブラリの Engine を使用して接続を管理するのが一般的です。Pandasの read_sql や to_sql は、このEngineオブジェクトをコネクション引数として受け取ることができます。
import pandas as pd
from sqlalchemy import create_engine
def connect_external_database():
"""
SQLAlchemyを用いた接続例(コード例のみ)
※実行には実際のDBサーバー環境が必要です
"""
# 接続文字列の定義
# 書式: dialect+driver://username:password@host:port/database
db_connection_str = 'mysql+pymysql://myuser:mypassword@localhost:3306/mydatabase'
# エンジンの作成
db_engine = create_engine(db_connection_str)
try:
# コンテキストマネージャで接続を管理
with db_engine.connect() as connection:
# データの書き込み
# index=Falseにすることで、DataFrameのインデックス列はDBに保存されません
df_sample = pd.DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
df_sample.to_sql(
"target_table_name",
con=connection,
if_exists="append",
index=False
)
print("データの書き込みに成功しました。")
except Exception as e:
print(f"データベース操作中にエラーが発生しました: {e}")
# 注意: このコードは実行環境がないと動作しません
ポイント
- 接続文字列:
create_engineに渡すURL形式の文字列で、データベースの種類やドライバを指定します。 - 汎用性: コード内のSQL記述部分以外は、接続先がSQLiteでもMySQLでもPandas側のコード(
to_sqlなど)を変更する必要はほとんどありません。
これにより、分析環境と本番環境でデータベースが異なる場合でも、柔軟に対応することが可能です。
