# ELTパイプラインの設計概要
ELTパイプラインの設計は、複数のレシピとルックアップテーブルから構成され、データの抽出、ロード、変換のための堅牢なフレームワークを作成します。このフレームワークには、以下のコンポーネントが含まれます:
# 変更データキャプチャ(CDC)
変更データキャプチャは、ソースシステムの変更データを検出し、これらの変更をキャプチャする機能です。WorkatoのELTアクセラレータは、タイムスタンプベースの変更データキャプチャを使用しています。このアクセラレータは、制御テーブルとジョブテーブルを使用して、最後のロード日時に基づいてデータをポピュレートするインクリメンタルジョブを実行できます。
# リアルタイムストリーミングto S3
このアクセラレータは、Workatoの既存のS3ストリーミングアップロードアクションコネクタを活用して、ソースファイルをステージングに移動します。このタイプのリアルタイムストリーミングは、WorkatoのS3コネクタを介してデータレイクに大量のファイルを効率的に移動するためのものです。
# パイプラインオーケストレーション
制御テーブルとジョブテーブルは、パイプライン内のさまざまなジョブのスケジュールと依存関係を維持するための監査を行います。これにより、CDCが制御され、パイプラインジョブの実行、ジョブの再実行、トラブルシューティング、データのバックフィリングが行われます。ELTアクセラレータのスケジューラとディスパッチャレシピは、パイプラインオーケストレーションの調整に重要な役割を果たします。
# ジョブの種類
ユーザーは次の3つの異なるジョブタイプを実行できます:
- フルロード(FULL)
- フルロードは、ジョブが実行されるたびにソーステーブルのフルエクストラクトを指します。このタイプのジョブは通常、開始日を持たない場合があります。パイプラインを作成する際に頻度を決定できます。たとえば、通貨やコードなどの小〜中規模のルックアップテーブルをフルロードに割り当てることができます。
- インクリメンタルロード(INCR)
- すべてのインクリメンタルジョブはフルリフレッシュから始まります。リフレッシュはソースをターゲットに同期させ、その後、タイムベースのCDCアプローチを使用してインクリメンタルに切り替えます。パイプラインを作成する際にINCRの頻度を決定できます。ジョブが実行されるたびにエクストラクト全体をリフレッシュするのではなく、アクセラレータは前回パイプラインが実行された後に変更されたデータを取得します。たとえば、SalesforceからのOpportunitiesテーブルを完全に移行し、ロード頻度を毎日に設定するシナリオを考えてみましょう。この場合、毎日のデータ変更のみが抽出され、マージされます。
- エクストラクトロード(EXTR)
- エクストラクトロードは、通常、2つのタイムスタンプ間で指定された時間枠内からデータを抽出します。このロードタイプは、監査を含むカスタムレポートを作成するために使用できます。
# ステータス
ジョブは次のいずれかのステータスを持つ場合があります:
- 保留中
- パイプラインを作成した後、アクセラレータはジョブをディスパッチします。その後、現在のステータスが制御テーブルに保留中として更新されます。パイプラインのステータスが保留中になる別の状況は、連続するジョブの間です。
- アクティブ
- アクティブは、パイプラインが連続するジョブを実行するようにスケジュールされている状態です。
- 非アクティブ
- 非アクティブは、処理が完了した後にパイプラインが一時停止されている状態です。非アクティブなパイプラインは再実行されません。
- エラー
- エラーは、ジョブがエラーで終了した場合です。
- 処理中
- 処理中は、パイプラインが現在ジョブを実行している状態です。
- エクストラクト
- エクストラクトは、新しいエクストラクトタイプのパイプラインが作成されたときのステータスです。
# スケジューラ
スケジューラは、5分ごとに制御テーブルをクエリして、スケジューラの現在の時刻に基づいてパイプライン内のジョブの詳細を取得します。制御テーブルのnext_timeが一致する場合、スケジューラはジョブを取り上げます。条件が満たされる場合、スケジューラはジョブディスパッチャに非同期呼び出しを行います。アクセラレータはまた、Slack/Microsoft Teamsチャンネルに通知を送信します。同時に、アクセラレータはジョブのステータスを処理中に更新します。ジョブタイプがエクストラクトの場合、任意の開始時間と終了時間があります。エクストラクトジョブタイプは、スケジューラの次回実行で処理されます。インクリメンタルおよびフルジョブタイプは、前述のnext_timeロジックに従います。
# ディスパッチャ
ジョブディスパッチャは、同時に複数のジョブを処理でき、同時実行制限は5です。ディスパッチャはスケジューラからの入力を評価します。スケジューラはジョブをスケジュールし、それぞれのレシピに実行ワークロードをディスパッチします。ディスパッチャーが開始すると、スケジューラから受け取ったパイプラインのnext_timeをバッチの開始日時として記録します。Workatoは、ELTジョブテーブルに、ディスパッチャージョブとレシピIDを、受け取ったパイプラインのcontrol IDと一緒に記録します。
アクセラレータは、ジョブのロードタイプとソースシステムに応じて、異なるワークロードレシピを呼び出します。ジョブが正常に完了した後、アクセラレータは、対応するcontrol IDを使用してジョブテーブルを更新し、ディスパッチャージョブIDとバッチ終了日時を記録します。同時に、同じcontrol IDを使用して、アクセラレータはコントロールテーブルを更新し、last_successful_job_dt、last_successful_job_id、status、およびnext_timeを記録します。
アクセラレータは、パイプラインの設定時にユーザーが構成したジョブタイプとスケジューラの頻度(毎時、毎週、または毎月)に応じて、next_timeの値を計算します。
抽出ジョブタイプは一意であり、設定されたスケジュールで実行されず、ワンタイムのジョブです。この場合、アクセラレータはnext_timeの計算をスキップし、control tableのステータスをInactiveにすぐに更新します。
ディスパッチャーによって実行されたジョブがエラーになった場合、アクセラレータはjobs tableのis_successフィールドをFalseに更新します。また、control tableのStatusフィールドをErrorに更新します。その後、アクセラレータはMicrosoft Teams/SlackのコミュニケーションチャンネルにSuccessfulおよびUnsuccessfulのジョブ通知を送信します。
# エラーと通知
ELTパイプラインボットは、以下のエラーメッセージと通知をSlack/Microsoft Teamsの通知チャンネルに送信します。
- パイプラインが作成されました
- パイプラインが開始されました
- パイプラインが完了しました
- パイプラインエラー
通知のカスタマイズ方法については、ヒントとトラブルシューティング記事を参照してください。
Last updated: 2024/2/22 23:17:11