# How to ガイド - マルチスレッドアクション

ここでは、複数のスレッドで並行してリクエストを送信できるアクションの作成について説明します。データスループットがユーザーにとって重要な懸念事項になることがあります。場合によっては、API 自体がシングルトンの取り込みエンドポイントしかサポートしていなかったり、リクエストのバッチサイズが小さすぎたりすることもあります。

たとえば、Intercom (opens new window) などのアプリに数千件の連絡先を同期させたいが、API では一度に1件の連絡先しか挿入できないという場合を考えてみましょう。連絡先をループして1つずつ取り込んでいくようレシピビルダーで設定することもできますが、レコードの取り込みに時間がかかる可能性があります。

マルチスレッドアクションを使用すれば、数千件の連絡先をループする代わりにバッチ (1,000件ごとなど) でまとめて処理し、複数の API リクエストを並行して送信することで、全体のスループットを向上させ、実行時間を短縮することができます。

SDK の CLI ツールでマルチスレッドアクションを作成する場合、リクエストは並列のスレッドではなく、順に送信されることに注意してください。

これにより、リクエストを個別に検査し、デバッグを容易にすることができます。

# サンプルコネクター - Intercom

{
  title: 'My Intercom connector',

  # More connector code here
  actions: {
    create_contact: {
        title: "Create contacts",

        subtitle: "Creates multiple contacts in Intercom",

        description: "Create contacts in Intercom",

        input_fields: lambda do 
          [
            {
              name: "contacts",
              type: "array",
              of: "object",
              properties: [
                {
                  control_type: "text",
                  label: "Role",
                  type: "string",
                  name: "role"
                },
                {
                  control_type: "text",
                  label: "External ID",
                  type: "string",
                  name: "external_id"
                },
                {
                  control_type: "text",
                  label: "Email",
                  type: "string",
                  name: "email"
                },
                {
                  control_type: "text",
                  label: "Phone",
                  type: "string",
                  name: "phone"
                },
                {
                  control_type: "text",
                  label: "Name",
                  type: "string",
                  name: "name"
                }
              ]
            }
          ]
        end,

        execute: lambda do |connection, input, eis, eos|
          # Pre-processing of the data. 
          # For multithreading, we need to create an array of requests which we do over here.
          number_of_batches = input['contacts'].size
          batches = input['contacts'].map do |contact|
            post("contacts", contact)
          end
          
          # Sending of the requests in simultaneously using the parallel method
          # The output of a method is an array with 3 indexes.
          # The first index is a boolean to indicate that all requests were successful.
          # The second index is an array of the successful responses. Failed requests are indicated
          results = parallel(
              batches, # Each index in the batch array represents a single request
              threads: 20, # The max number of threads. Defaults to 1 and max is 20
              rpm: 100, # How many requests to send per minute
              )


          # Post-processing
          # Boolean to tell the user that all records were successful
          success = results[0] 
          # An array of all the responses for successful records
          records_ingested = results[1].compact
          # Collecting all the failed records into an array
          records_failed = []
          results[2].each_with_index do |item, index|
            next unless item 
            failed_record = {
              code: item,
              record: input['contacts'][index]
            }
            records_failed << failed_record
          end
          
          {
            success: success,
            records_ingested: records_ingested,
            records_failed: records_failed
          }
        end,

        output_fields: lambda do |object_definitions, config_fields|
          object_definitions['insert_contacts_output']
        end

      },
}

# ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ

優れたアクションの作成に向けた最初のステップは、そのアクションが何をどのように実行するかを適切に伝え、さらにユーザーに追加のヘルプを提供することです。そのために Workato では、アクションのタイトルと説明を定義したり、ヒントを提供したりできるようにしています。簡単に言えば、タイトルはアクションの名称であり、サブタイトルはそのアクションのより詳しい内容を表します。続いて、アクションの説明は、接続先となるアプリケーションにおいてそのアクションがどのようなことを達成するかについて、仕様や解説を提供します。最後に、ヘルプのセグメントは、アクションをうまく機能させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、Workato の SDK リファレンスを参照してください。

# ステップ2 - 入力項目の定義

この部分では、バッチ挿入アクションの実行を試みるユーザーに向けて表示すべき項目を Workato に指示します。たとえば Intercom に連絡先のバッチを挿入する場合、ユーザーは連絡先の配列 (リスト) を提供する必要があります。

  input_fields: lambda do 
    [
      {
        name: "contacts",
        type: "array",
        of: "object",
        properties: [
          {
            control_type: "text",
            label: "Role",
            type: "string",
            name: "role"
          },
          {
            control_type: "text",
            label: "External ID",
            type: "string",
            name: "external_id"
          },
          {
            control_type: "text",
            label: "Email",
            type: "string",
            name: "email"
          },
          {
            control_type: "text",
            label: "Phone",
            type: "string",
            name: "phone"
          },
          {
            control_type: "text",
            label: "Name",
            type: "string",
            name: "name"
          }
        ]
      }
    ]
  end,

# ステップ3 - execute lambda の定義

execute lambda は以下を実行します。

  1. 一連のリクエストを並行して API に送信するための準備
  2. 実際のリクエストの送信
  3. データの後処理

# 1. 一連のリクエストを並行して API に送信するための準備

execute lambda の最初の部分では、まずリクエストの配列を作成します。連絡先ごとに1つのリクエストになります。この時点ではリクエストは送信されません。実際に送信されるのは、配列を parallel メソッドに渡したときであることに注意してください。

  # Pre-processing of the data. 
  # For multithreading, we need to create an array of requests which we do over here.
  number_of_batches = input['contacts'].size
  batches = input['contacts'].map do |contact|
    post("contacts", contact)
  end

# 2. リクエストの送信

次に parallel メソッドを呼び出します。このメソッドは、リクエストの配列と、実行に必要なパラメータ (スレッドの合計数や必要なリクエストの調整など) を引数に取ります。rpm は任意であり、これを指定しない場合、リクエストの調整は行われません。

results = parallel(
    batches, # Each index in the batch array represents a single request
    threads: 20, # The max number of threads. Defaults to 1 and max is 20
    rpm: 100, # How many requests to send per minute
    )

# 3. データの後処理

parallel メソッドの出力の配列には、バッチ内の成功したリクエストと失敗したリクエストが記述されています。これは配列の2番目と3番目のインデックスで行われ、それぞれリクエストの成功レスポンスと失敗レスポンスに対応しています。配列内の null 値は、対応する配列の同じ位置の値を示しています。

parallel メソッドのサンプル出力

[
  false, # Boolean that indicates all requests were successful
  [
    null, # null indicates that this request was unsuccesful
    { ... }, # The response from a successful API call
    # ...
  ],
  [
    "409 Conflict", # the error message from a failed request
    null, # null indicates the request was successful
    # ...
  ],
]

最後にいくつかの変換を行い、このアクションの出力に、取り込みに成功したレコードと失敗したレコードの両方が含まれていることを確認する必要があります。これにより、失敗したレコードを再試行したり、別の場所に格納したりすることができます。

  # Post-processing
  # Boolean to tell the user that all records were successful
  success = results[0] 
  # An array of all the responses for successful records
  records_ingested = results[1].compact
  # Collecting all the failed records into an array
  records_failed = []
  results[2].each_with_index do |item, index|
    next unless item 
    failed_record = {
      code: item,
      record: input['contacts'][index]
    }
    records_failed << failed_record
  end
  
  {
    success: success,
    records_ingested: records_ingested,
    records_failed: records_failed
  }

# ステップ4 - 出力項目の定義

このセクションでは、トリガーの出力として表示するデータピルを指示します。各データピルの name 属性は、execute キーの出力に含まれるキーに一致している必要があります。

output_fields: lambda do |object_definitions, config_fields|
  object_definitions['insert_contacts_output']
end


Last updated: 2023/8/31 1:07:14