ETL(Extract, Transform, Load)は、システム間のデータ連携やDWH構築に不可欠なプロセスです。しかし、**ETLパイプラインが複雑・非効率**になると、バッチ遅延・障害対応・保守負担が増大します。本記事では、SQLだけで完結するETL設計から、スクリプトやAirflowとの連携、そして差分更新を実現するCDC戦略まで、実務で使える最適化手法を解説します。
SQLオンリーで完結するETL処理設計
軽量・高速なETLを実現するには、可能な限り処理をSQLだけで完結させるのが理想です。PostgreSQLやMySQLなど多くのRDBMSは、変換・加工処理を行うための機能が豊富に備わっています。
1. 一時テーブル+MERGE戦略
大量データを一時テーブルにインポートし、本番テーブルと比較してINSERT/UPDATE/DELETEを最適化。
-- PostgreSQL 15+ の例(MERGE構文) MERGE INTO customers AS target USING staging_customers AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET name = source.name, email = source.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (source.id, source.name, source.email);
2. WITH句(CTE)による処理分割
WITH cleaned AS ( SELECT id, TRIM(name) AS name_cleaned FROM staging_users ) INSERT INTO users (id, name) SELECT id, name_cleaned FROM cleaned;
SQLでETLを構築する最大の利点は、「パフォーマンスと可搬性の高さ」にあります。処理ロジックがDB上に集約されていれば、システム変更や移行もスムーズです。
SQL+外部スクリプト(Python, Shell, Airflow)の連携
SQLだけでは対応できない、外部APIの呼び出し・複雑な分岐処理・ファイル操作などには、PythonやShell、ジョブ管理ツールとの連携が有効です。
1. Python+SQLAlchemyでETL実行
from sqlalchemy import create_engine import pandas as pd engine = create_engine("postgresql+psycopg2://user:pass@localhost/db") df = pd.read_csv("import.csv") df.to_sql("staging_table", con=engine, if_exists="replace", index=False) # SQLで変換 with engine.begin() as conn: conn.execute(""" INSERT INTO main_table SELECT * FROM staging_table WHERE NOT EXISTS (SELECT 1 FROM main_table WHERE id = staging_table.id) """)
2. Shell+psql でSQL実行
#!/bin/bash psql -U myuser -d mydb -f ./etl_step1.sql psql -U myuser -d mydb -f ./etl_step2.sql
3. Airflow DAGでETLジョブを管理
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime with DAG("daily_etl", start_date=datetime(2025, 1, 1), schedule_interval="@daily") as dag: extract = BashOperator( task_id="extract_data", bash_command="python scripts/extract.py" ) transform = BashOperator( task_id="transform_data", bash_command="psql -U user -d db -f sql/transform.sql" ) load = BashOperator( task_id="load_data", bash_command="python scripts/load.py" ) extract >> transform >> load
ポイント: Airflowを使えば依存関係・失敗リカバリ・リトライ設定もコードで管理可能です。
CDC(Change Data Capture)戦略と差分更新
フルロード(全件再投入)は非効率です。変更分だけを抽出・反映するCDC(Change Data Capture)により、ETLの負荷を劇的に下げることができます。
1. 更新日時ベースの差分抽出
SELECT * FROM orders WHERE updated_at > (SELECT last_sync_time FROM etl_status WHERE table_name = 'orders');
ETL終了後に etl_status を更新して、次回実行時に差分を抽出。
2. レプリケーションログベース(Debezium + Kafka)
よりリアルタイムなCDCを実現するには、DBのWAL(Write Ahead Log)やバイナリログを読み取り、変更データのみをストリーム処理します。
- Debezium:PostgreSQL / MySQL の変更をKafkaに送信
- Apache Kafka:変更イベントの中継と保存
- Snowpipe(Snowflake)や Kinesis Firehose:DWHへの差分連携
注意点: 初回はフルロード + その後CDCに切り替える「ハイブリッド戦略」が実務で多く採用されています。
まとめ
ETLパイプラインを最適化するためには、目的と環境に応じて次のような戦略を使い分ける必要があります:
- SQLオンリー:パフォーマンス重視の小規模ETLやDWH前処理
- SQL+スクリプト:複雑なフローやAPI連携を含むETL
- CDC戦略:リアルタイム性と効率性を両立
最終的には「保守性・拡張性・可視化」を考慮したパイプライン設計が、スケーラブルなデータ活用基盤のカギになります。