ハウツーガイド - ハイブリッドポーリング+Webhookトリガー

このページは機械翻訳により提供されています。翻訳内容と英語版に相違がある場合は、英語版が優先されます。

Webhookは、顧客が購入したときにイベントを受信するなど、本質的に時間依存性の高いさまざまなユースケースを可能にします。 ただし、Webhookイベントが失われるシナリオもあります。 これは、HTTPリクエストで発生し得る一時的なネットワークエラーが原因である場合や、単にレシピが短時間停止され、その間にWebhookが送信された場合に発生する可能性があります。

ハイブリッドポーリング+Webhookトリガーは、両方の利点を提供します。トリガーはリアルタイムで機能してジョブを生成できるだけでなく、Webhookが失われた場合でもイベントを取得できるという追加の保証も提供します。

続行する前に

以下のガイドは、Webhookトリガーとポーリングトリガーの両方に関する知識を前提としています。 最初に、それぞれのトリガーに関するガイドを読むことをお勧めします。

ハイブリッドトリガーを実装する理由

ハイブリッドトリガーには、従来のWebhookトリガーと比べて、3つの主な改善点による多数の利点があります。

  1. Webhookイベントは、ターゲットシステムから新しいレコードを取得するようWorkatoに通知するシグナルをトリガーします。 これにより、なりすましWebhookによって不正なデータが作成される可能性が低くなり、失われた可能性のあるWebhookも、最終的に新しいWebhookが受信されたときに取得されることが保証されます。
  2. 各ジョブのトリガー出力に、Webhookで送信されることが多い簡素なペイロードではなく、必要なすべてのデータを含めることができます。
  3. レシピが一定期間停止されている場合のジョブ損失を防ぎます。 レシピが再度開始されると、Workatoはすべてのレコードを取得するためにすぐにポーリングを実行します。

ハイブリッドトリガーのメカニズム

ハイブリッドトリガーは、Webhookトリガーとポーリングトリガーの重要なlambdaをそれぞれ組み合わせたものです。 静的Webhookハイブリッドトリガーでは、poll lambdaに加えて、webhook_keysおよびwebhook_key lambdaが必要です。 動的Webhookハイブリッドトリガーでは、poll lambdaに加えて、webhook_subscribeおよびwebhook_unsubscribe lambdaが必要です。

ハイブリッドトリガーを使用する場合、イベントを取得するメカニズムは2つの重要な点で変わります。

  1. Webhookイベントを受信すると、前回のポーリングのclosureに基づいて新しいイベントを取得するために、poll lambdaが呼び出されます。
  2. レシピが開始または再起動されると、前回のポーリングのclosureに基づいて新しいイベントを取得するために、poll lambdaが呼び出されます。

TIP

トリガーに対してWebhookイベントがバースト的に発生した場合、Workatoのエンジンはポーリングをインテリジェントにスロットリングし、WebhookイベントごとにAPIをポーリングすることなくすべてのレコードを取得します。 これにより、大量のWebhookによってレート制限に達することを防ぎます。

たとえば、1分間に100件のwebhookイベントが集中して受信された場合、4~5回のpollを受信することが予想されます。最後のpollは、最後のwebhookイベントが受信された時刻より後になります。

Workatoはまた、Webhookの停止が発生した場合でも多少の遅延でレコードが確実に取得されるように、12時間ごとにpoll lambdaを実行します。

サンプルコネクタ - Chargebee

ruby
{
  title: 'My Chargebee connector',

  webhook_keys: lambda do |params, headers, payload|
    # Chargebee events come in the form "subscription_changed", "subscription_renewed", "customer_changed" etc
    # Use .split to get the main object
    "#{payload['event_type']}".split("_").first
  end,

  # More connector code here
  triggers: {
    new_updated_subscription: {
      title: 'New/updated subscription',

      subtitle: "Triggers when a subscription is " \
      "created/updated in Chargebee",

      description: lambda do |input, picklist_label|
        "New/updated <span class='provider'>subscription</span> in " \
        "<span class='provider'>Chargebee</span>"
      end,

      help: lambda do |input, picklist_label, connection, webhook_base_url|
        next unless webhook_base_url.present?
        <<~HTML
        Creates a job immediately when a subscription is created/updated from Chargebee. To set this webhook up,
        you will need to register the webhook below in Chargebee under "settings" => "webhooks" => "new" and provide the events for subscription_changed and subscription_created. <br>
        <b>Webhook endpoint URL</b>
        <b class="tips__highlight">#{webhook_base_url}</b>
        HTML
      end,

      input_fields: lambda do
         [
           {
             name: 'since',
             type: :timestamp,
             optional: true
           }
         ]
      end,

      webhook_key: lambda do |connection, input|
        "subscription"
      end,

      poll: lambda do |connection, input, closure|
        page_size = 100
         closure = {} unless closure.present?
         closure['updated_since'] = (closure['updated_since'] || input['since'] || 1.hours.ago).to_time.utc.to_i

         params = {
            "sort_by[asc]": 'updated_at',
            limit: page_size,
            "updated_at[after]": closure['updated_since']
         }

         params['offset'] = closure['offset']

         response = get("/api/v2/subscriptions", params)

         if response['next_offset'].present?
           closure['offset'] = response['next_offset']
         else
           closure['offset'] = nil
           closure['updated_since'] = response['list'].last[input['object']]['updated_at'] unless response['list'].size == 0
         end

         {
           events: response['list'],
           next_poll: closure,
           can_poll_more: response['next_offset'].present?
         }
      end,

      dedup: lambda do |record|
        "#{record['subscription']['id']}@#{record['subscription']['updated_at']}"
      end,

      output_fields: lambda do |object_definitions|
        object_definitions['subscription']
      end
    }
  },
  # More connector code here
}

ステップ1 - 選択したWebhookトリガーの実装

コネクターを構築する対象のアプリに応じて、適切なWebhookトリガータイプを構築します。 動的Webhookは、エンドユーザーに必要なセットアップ量を最小限に抑えるため、常に推奨されます。 この例では、ChargebeeではWebhookサブスクリプションを手動でセットアップする必要があるため、静的Webhookトリガーを使用しています。 詳細については、各Webhookタイプに関するガイドを参照してください。

動的Webhookトリガーでは、webhook_subscribeおよびwebhook_unsubscribe lambdaを定義する必要があります。 静的Webhookトリガーでは、webhook_keysおよびwebhook_key lambdaを定義する必要があります。

ステップ2 - pollブロックの定義

ハイブリッドトリガーを構築するには、webhook_notification lambdaを定義する代わりに、poll lambdaを定義する必要があります。 poll lambdaは、他のポーリングトリガーと同様に機能する必要があり、新しいレコードを取得するためのAPIエンドポイントが必要です。 詳細については、ポーリングトリガーガイドを参照してください。

ruby
      poll: lambda do
        page_size = 100
         closure = {} unless closure.present?
         closure['updated_since'] = (closure['updated_since'] || input['since'] || 1.hours.ago).to_time.utc.to_i

         params = {
            "sort_by[asc]": 'updated_at',
            limit: page_size,
            "updated_at[after]": closure['updated_since']
         }

         params['offset'] = closure['offset']

         response = get("/api/v2/subscriptions", params)

         if response['next_offset'].present?
           closure['offset'] = response['next_offset']
         else
           closure['offset'] = nil
           closure['updated_since'] = response['list'].last[input['object']]['updated_at'] unless response['list'].size == 0
         end

         {
           events: response['list'],
           next_poll: closure,
           can_poll_more: response['next_offset'].present?
         }
      end,

この例では、単にChargebeeのサブスクリプションAPIをクエリして、前回ポーリングした後に作成または更新されたサブスクリプションを取得しています。

ステップ4 - 出力フィールドとdedupの定義

出力フィールドとdedupを定義する場合、実際のWebhookペイロードではなく、poll lambdaの出力に基づく必要があることに注意してください。 基本的に、Webhookペイロードは破棄され、poll lambdaで受信したレコードが使用されます。

ruby
      dedup: lambda do |record|
        "#{record['subscription']['id']}@#{record['subscription']['updated_at']}"
      end,

      output_fields: lambda do |object_definitions|
        object_definitions['subscription']
      end

レート制限

このトリガーには、webhookゲートウェイの制限が適用されます。

Last updated: