ELT Pipeline for Snowflakeアクセラレーターの設計

ELT Pipeline for Snowflakeアクセラレーターは、データの抽出、ロード、変換を行う堅牢なフレームワークを作成するために連携する複数のレシピとルックアップ テーブルで構成されます。 このフレームワークは、次のコンポーネントで構成されます。
変更データキャプチャ(CDC)
変更データキャプチャとは、ソースシステム内の変更されたデータを検出し、それらの変更をキャプチャする機能です。 Workato ELTアクセラレーターは、タイムスタンプベースの変更データキャプチャを使用します。 このアクセラレーターは、制御テーブルとジョブテーブルを使用して、前回のロード日時に基づいてデータを入力する増分ジョブを実行できます。
S3へのリアルタイムストリーミング
このアクセラレーターは、Workatoの既存のS3 Streaming uploadアクションコネクターを活用して、ソースファイルをステージングに移動します。 このタイプのマイクロバッチによるリアルタイムストリーミングでは、WorkatoのS3 Connectorを通じて大量のファイルをデータレイクへ効率的に移動できます。
パイプラインオーケストレーション
制御テーブルとジョブテーブルは、パイプライン内のさまざまなジョブのスケジュール設定と依存関係の維持のために監査情報を保持します。 これはパイプラインジョブの実行、ジョブの再実行、トラブルシューティング、データのバックフィルを制御するため、CDCに役立ちます。 ELTアクセラレーターのschedulerレシピとdispatcherレシピは、パイプラインオーケストレーションの調整に重要な役割を果たします。
ジョブタイプ
ユーザーは3種類のジョブタイプを実行できます。
- フルロード(FULL)
- Fullとは、ジョブが実行されるたびにソーステーブルを完全に抽出することを指します。 通常、この種類のジョブには開始日がありません。 パイプラインを作成するときに頻度を決定できます。 たとえば、通貨やコードなどの小規模から中規模のルックアップ テーブルをフルロードに割り当てることができます。
- 増分ロード(INCR)
- すべての増分ジョブは、完全なリフレッシュから開始されます。 更新ではソースをターゲットに同期し、その後パイプラインは日時ベースのCDCアプローチを使用して増分に切り替わります。 パイプラインを作成するときにINCRの頻度を決定できます。 ジョブが実行されるたびに抽出全体をリフレッシュするのではなく、アクセラレーターは前回パイプラインが実行された後に変更されたデータを取得します。 たとえば、Salesforceの商談テーブルを完全に移行し、ロード頻度を毎日に設定するシナリオを考えます。 この状況では、日次のデータ変更のみが抽出されてマージされます。
- 抽出ロード(EXTR)
- 抽出ロードは、通常2つのタイムスタンプ間の、指定された時間枠内のデータを取得します。 このロードタイプを使用して、監査を含むカスタムレポートを作成できます。
ステータス
ジョブには、次のいずれかのステータスが設定される場合があります。
- Pending
- パイプラインを作成すると、アクセラレーターはジョブをディスパッチします。 その後、制御テーブル内の現在のステータスがPendingに更新されます。 パイプラインのステータスがPendingになるもう1つの状況は、連続するジョブの間です。
- アクティブ
- Activeは、パイプラインが連続するジョブを実行するようにスケジュールされている状態です。
- 非アクティブ
- InActiveは、処理の完了後にパイプラインが一時停止されている状態です。 InActiveのパイプラインは再度実行されません。
- エラー
- Errorは、ジョブでエラーが発生した状態です。
- 処理中
- Processingは、パイプラインが現在ジョブを実行している状態です。
- Extract
- Extractは、新しい抽出タイプのパイプラインが作成されたときのステータスです。
スケジューラー
Schedulerは5分間隔のポーラーであり、Schedulerの現在時刻に基づいてパイプライン内のジョブの詳細を取得するために制御テーブルにクエリを実行します。 制御テーブルのnext_timeが一致すると、Schedulerはジョブを取得します。 目的の条件が満たされると、SchedulerはジョブDispatcherに非同期呼び出しを行います。 アクセラレーターは、Slack/Microsoft Teamsチャネルにも通知を送信します。 同時に、アクセラレーターはジョブステータスをProcessingに更新します。 ジョブタイプがextractの場合、任意の開始時刻と終了時刻を設定できます。 Extractジョブタイプは、Schedulerの次回実行時に処理されます。 IncrementalおよびFullジョブタイプは、前述のnext_timeロジックに従います。
ディスパッチャー
ジョブDispatcherは同時に実行される複数のジョブを処理でき、その同時実行制限は5です。 DispatcherはSchedulerからの入力を評価し、ワークロードを実行するために各レシピへジョブをディスパッチします。 Dispatcherが開始するとすぐに、Schedulerから受信したパイプラインのnext_timeであるバッチ開始日時を記録します。 Workatoは、受信したパイプラインのcontrol IDとともに、dispatcher jobおよびrecipe IDをELTジョブテーブルに記録します。
アクセラレーターは、ジョブのload typeとソースシステムに応じて、それぞれ異なるワークロードレシピを呼び出します。 ジョブが正常に完了すると、アクセラレーターは該当するcontrol IDを使用して、dispatcher job IDおよびbatch end date-timeでJob tableを更新します。 同時に、同じcontrol IDを使用して、アクセラレーターはlast_successful_job_dt、last_successful_job_id、status、およびnext_timeでcontrol tableを更新します。
アクセラレーターは、パイプラインの設定時にユーザーが構成したjob typeとSchedulerのfrequency(毎時、毎週、または毎月)に従って、next_time値を計算します。
Extractジョブタイプは、一連のスケジュールで実行されず、1回限りのジョブであるため特殊です。 このシナリオでは、アクセラレーターはnext_timeの計算をスキップし、control tableのステータスをただちにInactiveに更新します。
Dispatcherによって実行されたジョブでエラーが発生した場合、アクセラレーターはjobs table内のis_successフィールドをFalseに更新します。 また、control table内のStatusフィールドをErrorに更新します。 その後、アクセラレーターはSuccessfulとUnsuccessfulの両方のジョブ通知をMicrosoft TeamsまたはSlackのコミュニケーションチャネルに送信します。
エラーと通知
ELT Pipelineボットは、次のエラーメッセージと通知をSlackまたはMicrosoft Teamsの通知チャネルに送信します。
- パイプラインが作成されました
- パイプラインが開始されました
- パイプラインが完了しました
- パイプラインエラー
通知のカスタマイズ方法については、ヒントとトラブルシューティングの記事を参照してください。
Last updated: