# How-to guides - Multi-threaded actions
In this segment, we will be going through the creation of actions that allow you to send requests in parallel across multiple threads. Data throughput can be a key concern for users and in some cases, APIs themselves may only support singleton ingestion endpoints or the batch sizes for requests are simply too low!
For example, imagine that you have thousands of contacts you want to sync into an app like Intercom (opens new window) but the API only supports inserting a single contact at a time. You could ask the recipe builder to loop over these contacts and ingest them 1 by 1 but that may lead to slow record ingestion times.
Rather than looping over thousands of contacts, you can use Multi-threaded actions to process them in batches (for example, of batch size 1000) and send API requests in parallel to increase overall throughput and reduce execution time.
NOTE
When you build multi-threaded actions on the SDK CLI tool, take note that requests will be sent sequentially instead of across parallel threads.
This allows you to inspect requests individually for easier debugging.
# Sample connector - Intercom
{
title: 'My Intercom connector',
# More connector code here
actions: {
create_contact: {
title: "Create contacts",
subtitle: "Creates multiple contacts in Intercom",
description: "Create contacts in Intercom",
input_fields: lambda do
[
{
name: "contacts",
type: "array",
of: "object",
properties: [
{
control_type: "text",
label: "Role",
type: "string",
name: "role"
},
{
control_type: "text",
label: "External ID",
type: "string",
name: "external_id"
},
{
control_type: "text",
label: "Email",
type: "string",
name: "email"
},
{
control_type: "text",
label: "Phone",
type: "string",
name: "phone"
},
{
control_type: "text",
label: "Name",
type: "string",
name: "name"
}
]
}
]
end,
execute: lambda do |connection, input, eis, eos|
# Pre-processing of the data.
# For multithreading, we need to create an array of requests which we do over here.
number_of_batches = input['contacts'].size
batches = input['contacts'].map do |contact|
post("contacts", contact)
end
# Sending of the requests in simultaneously using the parallel method
# The output of a method is an array with 3 indexes.
# The first index is a boolean to indicate that all requests were successful.
# The second index is an array of the successful responses. Failed requests are indicated
results = parallel(
batches, # Each index in the batch array represents a single request
threads: 20, # The max number of threads. Defaults to 1 and max is 20
rpm: 100, # How many requests to send per minute
)
# Post-processing
# Boolean to tell the user that all records were successful
success = results[0]
# An array of all the responses for successful records
records_ingested = results[1].compact
# Collecting all the failed records into an array
records_failed = []
results[2].each_with_index do |item, index|
next unless item
failed_record = {
code: item,
record: input['contacts'][index]
}
records_failed << failed_record
end
{
success: success,
records_ingested: records_ingested,
records_failed: records_failed
}
end,
output_fields: lambda do |object_definitions, config_fields|
object_definitions['insert_contacts_output']
end
},
}
# Step 1 - Action title, subtitle, description, and help
The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.
To know more about this step, take a look at our SDK reference
# Step 2 - Define input fields
This component tells Workato what fields to show to a user trying to execute the insert batch action. In the case of inserting a batch of contacts in Intercom for example, the user has to provide us with an array (list) of contacts.
input_fields: lambda do
[
{
name: "contacts",
type: "array",
of: "object",
properties: [
{
control_type: "text",
label: "Role",
type: "string",
name: "role"
},
{
control_type: "text",
label: "External ID",
type: "string",
name: "external_id"
},
{
control_type: "text",
label: "Email",
type: "string",
name: "email"
},
{
control_type: "text",
label: "Phone",
type: "string",
name: "phone"
},
{
control_type: "text",
label: "Name",
type: "string",
name: "name"
}
]
}
]
end,
# Step 3 - Defining the execute lambda
The execute lambda is responsible for
- Preparing the series of requests to send in parallel to the API
- The actual sending of the request
- Any post-processing of the data.
# 1. Preparing the series of requests to send in parallel to the API
In the first part of the execute lambda, we first create an array of requests with a single request for each contact. Take note that the requests are not actually sent out at this point but only when the array of requests is passed to the parallel
method.
# Pre-processing of the data.
# For multithreading, we need to create an array of requests which we do over here.
number_of_batches = input['contacts'].size
batches = input['contacts'].map do |contact|
post("contacts", contact)
end
# 2. Sending of the request
In the next step we call the parallel method which takes in the array of requests as well as parameters for the execution like the total number of threads and any throttling of requests required. Take note that rpm
is optional and excluding it will result in no throttling of requests.
results = parallel(
batches, # Each index in the batch array represents a single request
threads: 20, # The max number of threads. Defaults to 1 and max is 20
rpm: 100, # How many requests to send per minute
)
# 3. Post-processing of the data
The output of the parallel method is an array which describes the successful and failed requests in the batch. This is done in the second and third index of the array which correspond to successful responses and failed responses for requests respectively. Take note that null
values in either array indicate a value in the same position in its counterpart.
Sample output of the parallel method
[
false, # Boolean that indicates all requests were successful
[
null, # null indicates that this request was unsuccesful
{ ... }, # The response from a successful API call
# ...
],
[
"409 Conflict", # the error message from a failed request
null, # null indicates the request was successful
# ...
],
]
Lastly, we need to do some transformations to ensure that the output of this action contains both the successfully ingested records and the failed records so the user can retry these failed records or store this somewhere.
# Post-processing
# Boolean to tell the user that all records were successful
success = results[0]
# An array of all the responses for successful records
records_ingested = results[1].compact
# Collecting all the failed records into an array
records_failed = []
results[2].each_with_index do |item, index|
next unless item
failed_record = {
code: item,
record: input['contacts'][index]
}
records_failed << failed_record
end
{
success: success,
records_ingested: records_ingested,
records_failed: records_failed
}
# Step 4 - Defining output fields
This section tells us what datapills to show as the output of the trigger. The name
attributes of each datapill should match the keys in the output of the execute
key.
output_fields: lambda do |object_definitions, config_fields|
object_definitions['insert_contacts_output']
end
Last updated: 1/19/2022, 9:54:58 AM