# How-toガイド - ハイブリッドポーリング+Webhookトリガー

Webhookは、顧客が購入を行ったときなど、時間的に敏感なさまざまなユースケースを提供します。ただし、Webhookイベントが失われる場合もあります。これは、任意のHTTPリクエストが影響を受ける一時的なネットワークエラーや、レシピが一時的に停止され、その期間中にWebhookが送信された場合などが原因です。

ハイブリッドポーリング+Webhookトリガーでは、トリガーがリアルタイムでジョブを生成するだけでなく、Webhookが失われた場合でもイベントを取得する追加の保証を提供します。

続行する前に

以下のガイドは、Webhookトリガーとポーリングトリガーの両方の知識を基に構築されています。それぞれのトリガーに関するガイドを最初に読んでください。

# ハイブリッドトリガーの実装の理由

ハイブリッドトリガーは、従来のWebhookトリガーよりも多くの利点を提供します。以下の3つの主要な改善点があります。

  1. Webhookイベントは、新しいレコードを取得するためにWorkatoにシグナルを送信します。これにより、スプーフィングされたWebhookが不正なデータを作成する可能性が低くなり、また、Webhookが失われた場合でも、新しいWebhookが最終的に受信されるまでに取得されます。
  2. 各ジョブのトリガー出力には、Webhookで送信されることが多いスキニーペイロードではなく、必要なすべてのデータが含まれます。
  3. レシピが一時的に停止された場合にジョブが失われるのを防ぎます。レシピが再開されると、Workatoはすぐにポーリングを実行してすべてのレコードを取得します。

# ハイブリッドトリガーの仕組み

ハイブリッドトリガーは、Webhookトリガーとポーリングトリガーの重要なラムダを組み合わせます。静的Webhookハイブリッドトリガーの場合、webhook_keyswebhook_keyのラムダと、pollのラムダが必要です。動的Webhookハイブリッドトリガーの場合、webhook_subscribewebhook_unsubscribeのラムダと、pollのラムダが必要です。

ハイブリッドトリガーを使用する場合、イベントを取得するための仕組みは2つの重要な方法で変更されます。

  1. Webhookイベントが受信されると、pollのラムダが呼び出され、前回のポーリングからのクロージャに基づいて新しいイベントを取得します。
  2. レシピが開始/再開されると、pollのラムダが呼び出され、前回のポーリングからのクロージャに基づいて新しいイベントを取得します。

TIP

トリガーにWebhookイベントのバーストが発生した場合、Workatoのエンジンはポーリングを適切にスロットルして、各WebhookイベントごとにAPIをポーリングすることなくすべてのレコードを取得します。これにより、ビジーウェブフックからのレート制限を回避できます。

たとえば、1分間に100個のWebhookイベントのバーストが受信された場合、最後のWebhookイベントが受信された時刻よりも後の時刻になるまで、4〜5回のポーリングが行われることが予想されます。

Workatoはまた、Webhookの停止時にレコードが遅延して取得されるようにするために、12時間ごとにpollのラムダを実行します。

# サンプルコネクタ - Chargebee

{
  title: 'My Chargebee connector',

  webhook_keys: lambda do |params, headers, payload|
    # Chargebeeのイベントは「subscription_changed」、「subscription_renewed」、「customer_changed」などの形式で提供されます
    # メインオブジェクトを取得するために.splitを使用します
    "#{payload['event_type']}".split("_").first
  end,

  # ここにさらなるコネクタのコードがあります
  triggers: {
    new_updated_subscription: {
      title: 'New/updated subscription',

      subtitle: "Chargebeeでサブスクリプションが作成/更新されたときにトリガーされます",

      description: lambda do |input, picklist_label|
        "Chargebeeでの新しい/更新された<span class='provider'>サブスクリプション</span>"
      end,

      help: lambda do |input, picklist_label, connection, webhook_base_url|
        webhook_base_url.present?の場合は次のようになります。
        <<~HTML
        Chargebeeからサブスクリプションが作成/更新されたときにジョブをすぐに作成します。このWebhookを設定するには、Chargebeeの「設定」=>「Webhook」=>「新規」の下にWebhookを登録し、subscription_changedおよびsubscription_createdのイベントを提供する必要があります。<br>
        <b>WebhookエンドポイントURL</b>
        <b class="tips__highlight">#{webhook_base_url}</b>
        HTML
      end,

      input_fields: lambda do
         [
           {
             name: 'since',
             type: :timestamp,
             optional: true
           }
         ]
      end,

      webhook_key: lambda do |connection, input|
        "subscription"
      end,

      poll: lambda do |connection, input, closure|
        page_size = 100
         closure = {} unless closure.present?
         closure['updated_since'] = (closure['updated_since'] || input['since'] || 1.hours.ago).to_time.utc.to_i 
         
         params = {
            "sort_by[asc]": 'updated_at',
            limit: page_size,
            "updated_at[after]": closure['updated_since']
         }
         
         params['offset'] = c```
closure['offset'] 

response = get("/api/v2/subscriptions", params)

if response['next_offset'].present?
   closure['offset'] = response['next_offset']
else
   closure['offset'] = nil
   closure['updated_since'] = response['list'].last[input['object']]['updated_at'] unless response['list'].size == 0 
end

{
   events: response['list'],
   next_poll: closure,
   can_poll_more: response['next_offset'].present?
} 
end,

dedup: lambda do |record|
   "#{record['subscription']['id']}@#{record['subscription']['updated_at']}"
end,

output_fields: lambda do |object_definitions|
   object_definitions['subscription']
end
}
},
# More connector code here
}

# ステップ1 - 選択したウェブフックトリガーを実装する

ビルドするアプリに応じて、適切なウェブフックトリガータイプを構築します。動的ウェブフックは、エンドユーザーが必要とするセットアップの量を最小限に抑えるため、常に優先されます。この例では、Chargebeeでは手動でウェブフックサブスクリプションを設定する必要があるため、静的ウェブフックトリガーを使用しています。詳細については、各ウェブフックのタイプに関するガイドを参照してください。

動的ウェブフックトリガーの場合、webhook_subscribewebhook_unsubscribeのラムダを定義する必要があります。 静的ウェブフックトリガーの場合、webhook_keyswebhook_keyのラムダを定義する必要があります。

# ステップ2 - ポーリングブロックを定義する

ハイブリッドトリガーを構築するためには、webhook_notificationのラムダを定義する代わりに、pollのラムダを定義する必要があります。pollのラムダは、他のポーリングトリガーと同様に機能する必要があります。つまり、新しいレコードを取得するためのAPIエンドポイントが必要です。詳細については、ポーリングトリガーガイドを参照してください。

poll: lambda do 
   page_size = 100
   closure = {} unless closure.present?
   closure['updated_since'] = (closure['updated_since'] || input['since'] || 1.hours.ago).to_time.utc.to_i 

   params = {
      "sort_by[asc]": 'updated_at',
      limit: page_size,
      "updated_at[after]": closure['updated_since']
   }

   params['offset'] = closure['offset'] 

   response = get("/api/v2/subscriptions", params)

   if response['next_offset'].present?
      closure['offset'] = response['next_offset']
   else
      closure['offset'] = nil
      closure['updated_since'] = response['list'].last[input['object']]['updated_at'] unless response['list'].size == 0 
   end

   {
      events: response['list'],
      next_poll: closure,
      can_poll_more: response['next_offset'].present?
   } 
end,

この例では、最後にポーリングした時間以降に作成/更新されたサブスクリプションを取得するために、ChargebeeのサブスクリプションAPIを単純にクエリします。

# ステップ4 - 出力フィールドと重複排除の定義

出力フィールドと重複排除を定義する際には、実際のウェブフックペイロードではなく、pollのラムダの出力に基づいて定義する必要があります。基本的には、ウェブフックペイロードはpollのラムダで受け取ったレコードに置き換えられます。

dedup: lambda do |record|
   "#{record['subscription']['id']}@#{record['subscription']['updated_at']}"
end,

output_fields: lambda do |object_definitions|
   object_definitions['subscription']
end

# レート制限

このトリガーは、ウェブフックゲートウェイの制限の対象です。


Last updated: 2024/2/13 16:59:53