# サンプルユースケース - 変更データキャプチャ

毎日、ソースシステムで新しいデータが利用可能になり、これを取得してデスティネーションシステムにロードする必要があります。多くの場合、ソースアプリケーションは新規または更新されたデータだけではなく、データの完全な抽出しか提供できません。これにより、新規/更新されたデータの取得と取り込みが難しくなります。なぜなら、受信データと過去のデータを照合し、両者の差分のみを比較・抽出する必要があるからです。

SQL変換を使用すると、ユーザーは過去のデータと受信した抽出データを簡単に比較し、差分のみを1つのアクションで取得できます。過去のデータはWorkatoのファイルストレージ内に永続的に保存でき、外部データベースへの依存を軽減し、各抽出ごとに更新できます。受信データは、任意の業務アプリケーション、データベース、またはファイルシステムからのものが可能です。

SQL変換は抽出データを取得し、永続化された過去のデータと比較し、変更されたデータを出力として生成します。このユーティリティは、数百万行のデータ量を簡単に処理できるほど強力です。

# サンプルレシピ:オンプレミスシステムから抽出を取得し、変更されたデータを見つけてS3にロードする

次のシナリオを考えてみましょう。ある企業が次のようなビジネスプロセスを持っています。

毎日、企業はすべての連絡先を抽出し、その記録をオンプレミスシステム上のCSVデータファイルとして利用可能にします。企業はこの抽出から新たに追加された連絡先のみを取得し、S3のようなクラウドストレージ先に送信したいと考えています。その後、別のWorkatoレシピがここからこの情報を取得して、ターゲットを絞ったマーケティングを行います。

連絡先の件数が非常に多いため(約150万件)、企業の一般的なワークフローは多くの手順で構成されています。Workatoレシピがソースからデータを抽出し、企業はすべての連絡先を外部データベースに過去のデータとして保存し、企業は新しい抽出を別のテーブルにロードし、2つのテーブルの差分を見つけて変更されたデータを取得しなければなりません。その後、さらなる処理のためにデータをWorkatoにロードし直す必要があります。

このプロセスは非常に面倒で、外部データベースシステムへの依存を生み出します。

SQL変換を使用すると、3つの簡単な手順で同じワークフローを実現できます!

Recipe workflowレシピのワークフロー

1

レシピのトリガーで、データを抽出する予定のソース(オンプレミスシステム)を設定し、新しい受信抽出を探すように設定します。

2

SQL変換コネクタの Query CSV アクションを設定します。これにより、抽出データと過去のデータを比較し、変更されたデータを出力として生成できます。

3

データをS3などのクラウドストレージ先に送信します。

# 変更データキャプチャのためにSQL変換を活用する方法

このセクションでは、SQL変換を変更データキャプチャに活用できるように、 Query CSV アクションのさまざまなセクションを設定する方法を説明します。

Follow along

See this recipe link to follow along and modify a sample recipe to suit your own workflow.

# データソースの設定

SQL変換がクエリを実行するさまざまなデータソースを接続します。

1

Source #1 を接続するには、次のフィールドに入力します。私たちの例では、 Source #1 はオンプレミスシステムからの受信抽出データです。

  • データソース名

  • Data source name に意味のある名前を付けます。例えば contacts_extract のように。

  • データソースタイプ

  • Data source type を選択します。私たちの例では、アップストリームのオンプレミスシステムからCSVデータが入ってくるため、これは CSV content stream です。

  • CSVストリーム入力

  • データソースを CSV content stream に設定した後、CSVストリーム入力を設定できます。これは、オンプレミスファイルのトリガーから来るファイル内容を渡す場所です。

  • データスキーマ

  • Data schema を設定します。いくつかのサンプル連絡先データを含むCSVファイルをインポートすることで、これを簡単に行うことができます。

Data source #1ソース #1 の設定、受信データ抽出

2

CSV固有のオプションを設定します。これらには次のフィールドが含まれます。

  • CSVヘッダ行を無視

  • これにより、受信データに無視すべき見出し列が含まれており、データの一部として考慮しないように指定できます。

  • カラム区切り文字

  • CSVファイルで列を区切るために使用される区切り文字を選択します。利用可能なオプションには、 , (カンマ)、 ; (セミコロン)などがあります。

3

Source 2 を接続するには、次のフィールドに入力します。私たちの例では、このソースは過去のデータを指します。このデータは、Workatoの内部永続ファイルストレージシステムであるFileStorageを使用して簡単に保存および処理できます。

  • データソース名

  • Data source name に意味のある名前を付けます。例えば contacts_historical のように。

  • データソースタイプ

  • Data source type を選択します。私たちの例では、これは FileStorage file です。

  • FileStorageファイルパス

  • 過去のデータファイルが利用可能なFileStorage内のパスを指定します。

  • データスキーマ

  • Data schema を設定します。いくつかのサンプル連絡先データを含むCSVファイルをインポートすることで、これを簡単に行うことができます。スキーマは、ソースから来る列の左から右への順序に一致します。

Data source #2ソース #2 の設定、過去のデータ

4

CSV固有のオプションを設定します。これらには次のフィールドが含まれます。

  • CSVヘッダ行を無視

  • これにより、受信データに無視すべき見出し列が含まれており、データの一部として考慮しないように指定できます。

  • カラム区切り文字

  • CSVファイルで列を区切るために使用される区切り文字を選択します。利用可能なオプションには、 , (カンマ)、 ; (セミコロン)などがあります。

# クエリ設定

次に、データソース上で動作し、変換された出力を生成するクエリを設定します。

この例では、目的は連絡先の抽出から新しいレコードのみを取得することです。したがって、クエリは2つのデータソース( contacts_extractcontacts_historical)間で Contacts_Id を比較し、Contacts_Idcontacts_extract に存在し contacts_historical には存在しないレコードを返します。これは、Workatoが過去のデータにない新しく作成された連絡先を返すことを意味します。

Query setup

# 出力設定

最後に、次のフィールドを設定して出力の形式を定義します。

私たちの例では、新しいレコードデータをS3に送信してさらに処理します。

1

次のフィールドに入力します。

  • 出力タイプ

  • 出力のタイプを選択します。CSV content stream を使用して、コンテンツをストリーム可能なデータピルとしてダウンストリームのアクションと共有します。

  • ヘッダ行を含める

  • データの列名をファイルのヘッダ行として追加する必要がある場合は Yes に設定します。これは、レポートを生成するためにファイルを使用する予定の場合に便利です。デフォルト値は No です。

  • カラム区切り文字

  • CSVファイルで列を区切るために使用される区切り文字を選択します。利用可能なオプションには、 , (カンマ)、 ; (セミコロン)などがあります。

Output setup

2

Upload file in Amazon S3 アクションを選択します。

3

次のフィールドを設定します。

  • バケット名

  • バケットの正確な大文字小文字区別のある名前。

  • オブジェクト名

  • オブジェクト/ファイルの正確な大文字小文字区別のある名前。

  • リージョン

  • お使いのリージョンを見つけるには、S3で Bucket > Properties > Static website hosting をナビゲートして、Endpoint URL 内のリージョンを見つけます。例:us-west-2

  • コンテンツ

  • アップロードする予定のファイル内容。Step 2 Query CSV data アクションから CVS contents データピルを渡します。

  • 高速化エンドポイントを使用

  • デフォルトは false です。高速化エンドポイントを使用するには true に設定します。S3で Bucket > Properties > Transfer acceleration に移動して、高速化エンドポイントが有効になっているか確認します。

S3 upload action setup

使用例

さらに多くの使用例を見るには、以下のガイドをご覧ください:


Last updated: 2024/12/18 21:44:08