【Python】Pandasでデータベースと連携する(read_sql / to_sql)

データ分析のワークフローにおいて、データベース(DB)に保存されたデータを直接PandasのDataFrameとして読み込んだり、逆に加工したDataFrameをデータベースへ保存したりする処理は非常に重要です。PandasはSQLとの親和性が高く、標準機能でこれらをシームレスに行うことができます。

本記事では、標準ライブラリの sqlite3 および SQLAlchemy を使用して、データベースに対する読み書き(SELECT / INSERT)を行う方法を解説します。

目次

SQLiteを用いた基本操作

まずはPythonに標準で組み込まれている sqlite3 モジュールを使用し、メモリ上のデータベースに対して読み書きを行う例を紹介します。ここでは、店舗の商品在庫データを管理するシナリオを想定します。

1. データベースからDataFrameへの読み込み (read_sql)

pd.read_sql 関数を使用すると、SQLクエリの結果を直接DataFrameとして取得できます。

  • sql: 実行したいSQL文(SELECT文など)。
  • con: データベース接続オブジェクト(コネクション)。
  • index_col: DataFrameのインデックスとして使用する列名。

2. DataFrameからデータベースへの書き込み (to_sql)

df.to_sql メソッドを使用すると、DataFrameの内容をテーブルにINSERT(またはテーブル作成)できます。

  • name: ターゲットとなるテーブル名。
  • con: データベース接続オブジェクト。
  • if_exists: テーブルが既に存在する場合の挙動。
    • 'fail': エラーを発生させる(デフォルト)。
    • 'replace': テーブルを削除して作り直す。
    • 'append': 既存のテーブルに行を追加する。
  • index: DataFrameのインデックスをカラムとして書き込むかどうか。

以下は、これらを組み合わせた完全な実装コードです。

import sqlite3
import pandas as pd

def manage_inventory_db():
    """
    SQLiteデータベースに対するPandasの読み書き操作を実演する関数
    """
    
    # メモリ上のSQLiteデータベースに接続
    # (実ファイルを使用する場合は ":memory:" の代わりに "file.db" などを指定)
    with sqlite3.connect(":memory:") as conn:
        
        # --- 準備: テスト用テーブルとデータの作成 ---
        cur = conn.cursor()
        cur.execute("CREATE TABLE inventory (item_id INTEGER PRIMARY KEY, price INTEGER, stock INTEGER)")
        
        # 初期データの投入
        initial_data = [
            (101, 1500, 50),
            (102, 2800, 30),
            (103, 900, 100)
        ]
        cur.executemany("INSERT INTO inventory VALUES (?, ?, ?)", initial_data)
        conn.commit()
        print("--- 初期データベースの状態 ---")
        # 確認のためSQLで直接出力
        for row in cur.execute("SELECT * FROM inventory"):
            print(row)
        print("\n")


        # --- 1. データベースからDataFrameへ読み込み (read_sql) ---
        print("=== read_sqlによる読み込み ===")
        
        # SQLクエリを実行し、結果をDataFrameに格納
        # item_id列をDataFrameのインデックスとして設定
        df_inventory = pd.read_sql(
            "SELECT item_id, price, stock FROM inventory",
            conn,
            index_col="item_id"
        )
        
        print(df_inventory)
        print("\n")


        # --- 2. DataFrameからデータベースへ書き込み (to_sql) ---
        print("=== to_sqlによる追加書き込み ===")
        
        # 新規商品データの作成
        new_items_data = {
            "price": [3200, 450],
            "stock": [15, 200]
        }
        # インデックス(item_id)を明示的に指定してDataFrame作成
        df_new_items = pd.DataFrame(new_items_data, index=[104, 105])
        df_new_items.index.name = "item_id"  # インデックス名をDBのカラム名と一致させる
        
        print("--- 追加するデータ ---")
        print(df_new_items)
        
        # データベースへ書き込み
        # if_exists='append' で既存テーブルに追加
        df_new_items.to_sql(
            "inventory",
            conn,
            if_exists="append",
            index=True,          # インデックス(item_id)も書き込む
            index_label="item_id"
        )
        
        # --- 結果の確認 ---
        print("\n--- 追加後のデータベースの状態 ---")
        df_updated = pd.read_sql("SELECT * FROM inventory", conn, index_col="item_id")
        print(df_updated)

if __name__ == "__main__":
    manage_inventory_db()

実行結果

--- 初期データベースの状態 ---
(101, 1500, 50)
(102, 2800, 30)
(103, 900, 100)

=== read_sqlによる読み込み ===
         price  stock
item_id              
101       1500     50
102       2800     30
103        900    100

=== to_sqlによる追加書き込み ===
--- 追加するデータ ---
         price  stock
item_id              
104       3200     15
105        450    200

--- 追加後のデータベースの状態 ---
         price  stock
item_id              
101       1500     50
102       2800     30
103        900    100
104       3200     15
105        450    200

SQLAlchemyを用いた汎用的な接続

実務でMySQL、PostgreSQL、OracleなどのRDBMSを使用する場合は、SQLAlchemy ライブラリの Engine を使用して接続を管理するのが一般的です。Pandasの read_sqlto_sql は、このEngineオブジェクトをコネクション引数として受け取ることができます。

import pandas as pd
from sqlalchemy import create_engine

def connect_external_database():
    """
    SQLAlchemyを用いた接続例(コード例のみ)
    ※実行には実際のDBサーバー環境が必要です
    """
    
    # 接続文字列の定義
    # 書式: dialect+driver://username:password@host:port/database
    db_connection_str = 'mysql+pymysql://myuser:mypassword@localhost:3306/mydatabase'
    
    # エンジンの作成
    db_engine = create_engine(db_connection_str)

    try:
        # コンテキストマネージャで接続を管理
        with db_engine.connect() as connection:
            
            # データの書き込み
            # index=Falseにすることで、DataFrameのインデックス列はDBに保存されません
            df_sample = pd.DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
            df_sample.to_sql(
                "target_table_name",
                con=connection,
                if_exists="append",
                index=False
            )
            
            print("データの書き込みに成功しました。")

    except Exception as e:
        print(f"データベース操作中にエラーが発生しました: {e}")

# 注意: このコードは実行環境がないと動作しません

ポイント

  • 接続文字列: create_engine に渡すURL形式の文字列で、データベースの種類やドライバを指定します。
  • 汎用性: コード内のSQL記述部分以外は、接続先がSQLiteでもMySQLでもPandas側のコード(to_sqlなど)を変更する必要はほとんどありません。

これにより、分析環境と本番環境でデータベースが異なる場合でも、柔軟に対応することが可能です。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次