ハウツーガイド - マルチスレッドアクション
このセグメントでは、複数のスレッドにわたってリクエストを並列に送信できるアクションの作成について説明します。 データスループットはユーザーにとって重要な懸念事項になることがあります。また、場合によってはAPI自体が単一レコードの取り込みエンドポイントしかサポートしていない、またはリクエストのバッチサイズが単に小さすぎることがあります。
たとえば、Intercomのようなアプリに同期したい連絡先が数千件あるものの、APIが一度に1件の連絡先の挿入しかサポートしていない場合を考えてみます。 レシピビルダーにこれらの連絡先をループ処理して1件ずつ取り込むように指定できますが、レコードの取り込み時間が長くなる可能性があります。
数千件の連絡先をループ処理する代わりに、マルチスレッドアクションを使用してそれらをバッチ(たとえば、バッチサイズ1000)で処理し、APIリクエストを並列に送信することで、全体的なスループットを向上させ、実行時間を短縮できます。
注
SDK CLIツールでマルチスレッドアクションを構築する場合、リクエストは並列スレッドではなく順次送信されることに注意してください。
これにより、デバッグを容易にするためにリクエストを個別に確認できます。
アクションのタイムアウト
SDKアクションには180秒のタイムアウト制限があります。
サンプルコネクター - Intercom
{
title: 'My Intercom connector',
# More connector code here
actions: {
create_contact: {
title: "Create contacts",
subtitle: "Creates multiple contacts in Intercom",
description: "Create contacts in Intercom",
input_fields: lambda do
[
{
name: "contacts",
type: "array",
of: "object",
properties: [
{
control_type: "text",
label: "Role",
type: "string",
name: "role"
},
{
control_type: "text",
label: "External ID",
type: "string",
name: "external_id"
},
{
control_type: "text",
label: "Email",
type: "string",
name: "email"
},
{
control_type: "text",
label: "Phone",
type: "string",
name: "phone"
},
{
control_type: "text",
label: "Name",
type: "string",
name: "name"
}
]
}
]
end,
execute: lambda do |connection, input, eis, eos|
# Pre-processing of the data.
# For multithreading, we need to create an array of requests which we do over here.
number_of_batches = input['contacts'].size
batches = input['contacts'].map do |contact|
post("contacts", contact)
end
# Sending of the requests in simultaneously using the parallel method
# The output of a method is an array with 3 indexes.
# The first index is a boolean to indicate that all requests were successful.
# The second index is an array of the successful responses. Failed requests are indicated
results = parallel(
batches, # Each index in the batch array represents a single request
threads: 20, # The max number of threads. Defaults to 1 and max is 20
rpm: 100, # How many requests to send per minute
)
# Post-processing
# Boolean to tell the user that all records were successful
success = results[0]
# An array of all the responses for successful records
records_ingested = results[1].compact
# Collecting all the failed records into an array
records_failed = []
results[2].each_with_index do |item, index|
next unless item
failed_record = {
code: item,
record: input['contacts'][index]
}
records_failed << failed_record
end
{
success: success,
records_ingested: records_ingested,
records_failed: records_failed
}
end,
output_fields: lambda do |object_definitions, config_fields|
object_definitions['insert_contacts_output']
end
},
}ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ
優れたアクションを作成するための最初のステップは、アクションが何を行い、どのように実行するかを適切に伝え、ユーザーに追加のヘルプを提供することです。 そのため、Workatoではアクションのタイトルと説明を定義し、ヒントを提供できます。 簡単に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を示します。 アクションの説明には、そのアクションが何を実現するかについての仕様と説明、および接続先アプリケーションのコンテキストが含まれます。 最後に、ヘルプセグメントでは、アクションを機能させるために必要な追加情報をユーザーに提供します。
このステップの詳細については、SDKリファレンスを参照してください
ステップ2 - 入力フィールドの定義
このコンポーネントは、挿入バッチアクションを実行しようとしているユーザーに表示するフィールドをWorkatoに指示します。 たとえばIntercomに連絡先のバッチを挿入する場合、ユーザーは連絡先の配列(リスト)を提供する必要があります。
input_fields: lambda do
[
{
name: "contacts",
type: "array",
of: "object",
properties: [
{
control_type: "text",
label: "Role",
type: "string",
name: "role"
},
{
control_type: "text",
label: "External ID",
type: "string",
name: "external_id"
},
{
control_type: "text",
label: "Email",
type: "string",
name: "email"
},
{
control_type: "text",
label: "Phone",
type: "string",
name: "phone"
},
{
control_type: "text",
label: "Name",
type: "string",
name: "name"
}
]
}
]
end,ステップ3 - execute lambdaの定義
execute lambdaは次を担います
- APIに並列送信する一連のリクエストの準備
- リクエストの実際の送信
- データの後処理。
1) APIに並列送信する一連のリクエストの準備
execute lambdaの最初の部分では、まず連絡先ごとに1つのリクエストを含むリクエストの配列を作成します。 この時点ではリクエストは実際には送信されず、リクエストの配列がparallelメソッドに渡されたときにのみ送信されることに注意してください。
# Pre-processing of the data.
# For multithreading, we need to create an array of requests which we do over here.
number_of_batches = input['contacts'].size
batches = input['contacts'].map do |contact|
post("contacts", contact)
end2. リクエストの送信
次のステップではparallelメソッドを呼び出します。このメソッドは、リクエストの配列に加えて、スレッドの総数や必要なリクエストのスロットリングなどの実行用パラメーターを受け取ります。 rpmは任意であり、省略するとリクエストのスロットリングは行われないことに注意してください。
results = parallel(
batches, # Each index in the batch array represents a single request
threads: 20, # The max number of threads. Defaults to 1 and max is 20
rpm: 100, # How many requests to send per minute
)3. データの後処理
parallelメソッドの出力は、バッチ内の成功したリクエストと失敗したリクエストを示す配列です。 これは配列の2番目と3番目のインデックスで行われ、それぞれリクエストの成功レスポンスと失敗レスポンスに対応します。 いずれかの配列内のnull値は、対応する配列の同じ位置に値があることを示します。
parallelメソッドのサンプル出力
[
false, # Boolean that indicates all requests were successful
[
null, # null indicates that this request was unsuccesful
{ ... }, # The response from a successful API call
# ...
],
[
"409 Conflict", # the error message from a failed request
null, # null indicates the request was successful
# ...
],
]最後に、このアクションの出力に、正常に取り込まれたレコードと失敗したレコードの両方が含まれるように変換を行う必要があります。これにより、ユーザーは失敗したレコードを再試行したり、どこかに保存したりできます。
# Post-processing
# Boolean to tell the user that all records were successful
success = results[0]
# An array of all the responses for successful records
records_ingested = results[1].compact
# Collecting all the failed records into an array
records_failed = []
results[2].each_with_index do |item, index|
next unless item
failed_record = {
code: item,
record: input['contacts'][index]
}
records_failed << failed_record
end
{
success: success,
records_ingested: records_ingested,
records_failed: records_failed
}ステップ4 - 出力フィールドの定義
このセクションでは、トリガーの出力として表示するデータピルを指定します。 各データピルのname属性は、executeキーの出力に含まれるキーと一致している必要があります。
output_fields: lambda do |object_definitions, config_fields|
object_definitions['insert_contacts_output']
endLast updated: