テックカリキュラム

ETLパイプラインの最適化|SQL×スクリプト×差分処理の実践設計

ETLパイプラインの最適化|SQL×スクリプト×差分処理の実践設計

ETL(Extract, Transform, Load)は、システム間のデータ連携やDWH構築に不可欠なプロセスです。しかし、**ETLパイプラインが複雑・非効率**になると、バッチ遅延・障害対応・保守負担が増大します。本記事では、SQLだけで完結するETL設計から、スクリプトやAirflowとの連携、そして差分更新を実現するCDC戦略まで、実務で使える最適化手法を解説します。


SQLオンリーで完結するETL処理設計

軽量・高速なETLを実現するには、可能な限り処理をSQLだけで完結させるのが理想です。PostgreSQLやMySQLなど多くのRDBMSは、変換・加工処理を行うための機能が豊富に備わっています。

1. 一時テーブル+MERGE戦略

大量データを一時テーブルにインポートし、本番テーブルと比較してINSERT/UPDATE/DELETEを最適化。

-- PostgreSQL 15+ の例(MERGE構文) MERGE INTO customers AS target USING staging_customers AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET name = source.name, email = source.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (source.id, source.name, source.email); 

2. WITH句(CTE)による処理分割

WITH cleaned AS ( SELECT id, TRIM(name) AS name_cleaned FROM staging_users ) INSERT INTO users (id, name) SELECT id, name_cleaned FROM cleaned; 

SQLでETLを構築する最大の利点は、「パフォーマンスと可搬性の高さ」にあります。処理ロジックがDB上に集約されていれば、システム変更や移行もスムーズです。


SQL+外部スクリプト(Python, Shell, Airflow)の連携

SQLだけでは対応できない、外部APIの呼び出し・複雑な分岐処理・ファイル操作などには、PythonやShell、ジョブ管理ツールとの連携が有効です。

1. Python+SQLAlchemyでETL実行

from sqlalchemy import create_engine import pandas as pd engine = create_engine("postgresql+psycopg2://user:pass@localhost/db") df = pd.read_csv("import.csv") df.to_sql("staging_table", con=engine, if_exists="replace", index=False) # SQLで変換 with engine.begin() as conn: conn.execute(""" INSERT INTO main_table SELECT * FROM staging_table WHERE NOT EXISTS (SELECT 1 FROM main_table WHERE id = staging_table.id) """) 

2. Shell+psql でSQL実行

#!/bin/bash psql -U myuser -d mydb -f ./etl_step1.sql psql -U myuser -d mydb -f ./etl_step2.sql 

3. Airflow DAGでETLジョブを管理

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime with DAG("daily_etl", start_date=datetime(2025, 1, 1), schedule_interval="@daily") as dag: extract = BashOperator( task_id="extract_data", bash_command="python scripts/extract.py" ) transform = BashOperator( task_id="transform_data", bash_command="psql -U user -d db -f sql/transform.sql" ) load = BashOperator( task_id="load_data", bash_command="python scripts/load.py" ) extract >> transform >> load 

ポイント: Airflowを使えば依存関係・失敗リカバリ・リトライ設定もコードで管理可能です。


CDC(Change Data Capture)戦略と差分更新

フルロード(全件再投入)は非効率です。変更分だけを抽出・反映するCDC(Change Data Capture)により、ETLの負荷を劇的に下げることができます。

1. 更新日時ベースの差分抽出

SELECT * FROM orders WHERE updated_at > (SELECT last_sync_time FROM etl_status WHERE table_name = 'orders'); 

ETL終了後に etl_status を更新して、次回実行時に差分を抽出。

2. レプリケーションログベース(Debezium + Kafka)

よりリアルタイムなCDCを実現するには、DBのWAL(Write Ahead Log)やバイナリログを読み取り、変更データのみをストリーム処理します。

  • Debezium:PostgreSQL / MySQL の変更をKafkaに送信
  • Apache Kafka:変更イベントの中継と保存
  • Snowpipe(Snowflake)や Kinesis Firehose:DWHへの差分連携

注意点: 初回はフルロード + その後CDCに切り替える「ハイブリッド戦略」が実務で多く採用されています。


まとめ

ETLパイプラインを最適化するためには、目的と環境に応じて次のような戦略を使い分ける必要があります:

  • SQLオンリー:パフォーマンス重視の小規模ETLやDWH前処理
  • SQL+スクリプト:複雑なフローやAPI連携を含むETL
  • CDC戦略:リアルタイム性と効率性を両立

最終的には「保守性・拡張性・可視化」を考慮したパイプライン設計が、スケーラブルなデータ活用基盤のカギになります。