# How to ガイド - マルチステップアクション
ここでは、非同期 API を使用するアクションの作成について説明します。通常、非同期 API を使用してターゲットアプリケーションで長時間実行されるジョブやプロセスを開始するときは、多くの場合、リクエストを送信し、そのジョブやプロセスに対応する ID を受け取ります。アクションで API を定期的にチェックし、ジョブが完了していることを確認してから、結果を取得するかレシピ内の次のステップに進みます。
たとえば、クエリーを開始するように Google BigQuery にリクエストを送信すると、ジョブ ID が返されることがあります。この場合、Google BigQuery を定期的にチェックし、クエリーが完了していることを確認してから、結果の行を取得します。
このロジックをユーザーがレシピで設定する代わりに、カスタムコネクターの「マルチステップ」アクションを使用して、このロジック全体を単一のアクションに埋め込むことができるようになりました。「マルチステップ」アクションを使用するには、continue
引数を reinvoke_after
という専用メソッドと組み合わせて使用します。この設定方法については、以下を参照してください。
Workato のクラウドデバッガーの制限事項
マルチステップアクションを Workato のクラウドデバッガーでデバッグすることはできません。マルチステップアクションをデバッグするには、SDK Gem を使用します。
# サンプルコネクター - Google BigQuery
{
title: 'My Google BigQuery connector',
# More connector code here
actions: {
query: {
title: "Execute query",
subtitle: "Execute query in BigQuery",
description: "Run Query in BigQuery",
help: "This query runs synchronously for 25 seconds. If the query takes longer than that, it turns into an asynchronous action. There is a limit of ~38 minutes for the query to complete. ",
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: 'Output columns',
hint: 'Provide your output fields for your query if you are providing datapills in your query',
item_label: 'Design your output columns',
sample_data_type: 'csv' # json_input / xml
},
]
end,
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present?
current_step = continue['current_step'] || 1
max_steps = 30
step_time = current_step * 60 # This helps us wait longer and longer as we increase in steps
# Minimum step time is 60 seconds
if current_step == 1 # First invocation
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries"
response = post(url, payload)
# If user wants to wait for query to complete and
# job isn't complete after 25s
if response['jobComplete'] == false && input['wait_for_query'].is_true?
reinvoke_after(
seconds: step_time,
continue: {
current_step: current_step + 1,
jobid: response['jobReference']['jobId']
}
)
# If user doesn't want to wait for query to complete and
# job isn't complete after 25s
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
# Job is complete after 25s
else
call('format_rows', response)
end
# Subsequent invocations
elsif current_step <= max_steps
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}"
response = get(url)
# If job is still running
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# If status is done but there is an error
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# If status is done
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
else
error("Job took too long!")
end
end,
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,
summarize_output: ['rows']
},
}
# ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ
優れたアクションの作成に向けた最初のステップは、そのアクションが何をどのように実行するかを適切に伝え、さらにユーザーに追加のヘルプを提供することです。そのために Workato では、アクションのタイトルと説明を定義したり、ヒントを提供したりできるようにしています。簡単に言えば、タイトルはアクションの名称であり、サブタイトルはそのアクションのより詳しい内容を表します。続いて、アクションの説明は、接続先となるアプリケーションにおいてそのアクションがどのようなことを達成するかについて、仕様や解説を提供します。最後に、ヘルプのセグメントは、アクションをうまく機能させるために必要な追加情報をユーザーに提供します。
このステップの詳細については、Workato の SDK リファレンスを参照してください。
# ステップ2 - 入力項目の定義
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: 'Output columns',
hint: 'Provide your output fields for your query if you are providing datapills in your query',
item_label: 'Design your output columns',
sample_data_type: 'csv' # json_input / xml
},
]
end,
この部分では、マルチステップアクションの実行を試みるユーザーに向けて表示すべき項目を Workato に指示します。たとえば BigQuery でクエリーを実行する場合、ユーザーは以下を提供する必要があります。
- Google BigQuery GCP プロジェクト ID
- Google BigQuery で実行するクエリー
- アクションがクエリーの完了を待機するかどうか
- クエリーからの期待される出力列
# ステップ3 - execute キーの定義
execute キーでは、リクエスト送信先となるエンドポイントと、使用する HTTP リクエストメソッドを Workato に指示するとともに、アクションと非同期 API とのやり取りについてロジック全体を制御します。マルチステップアクションを設定する場合、continue
引数を reinvoke_after
メソッドとともに使用する必要があります。これにより、まず execute
lambda 関数を呼び出して Google BigQuery にクエリーを挿入し、オプションでジョブをスリープ状態にして後から起動し、クエリーが完了したかどうかを確認することができます。
ジョブを起動する際は、前の reinvoke_after
呼び出しから渡された continue
とともに execute
lambda 関数が再度呼び出されます。この continue
引数には、作成された Google BigQuery ジョブのジョブ ID が必要です。その後、ジョブが完了したかどうかを確認します。まだ実行中の場合はジョブを再度スリープさせ、ジョブが完了している場合は結果を取得し、アクションの出力として送信できます。
TIP
ステップ時間は60秒以上に設定する必要があります。これより短い時間を設定すると、Workato によりデフォルトで60秒に設定されます。
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present? #For the first invocation, continue is nil
current_step = continue['current_step'] || 1 #Instantiate current_step so we know what step we are on
max_steps = 30 #IMPORTANT. you should set the max number of steps so your action doesn't continue forever. This will cause performance degradation in your recipes
step_time = current_step * 10 # This helps us wait longer and longer as we increase in steps
# Minimum step time is 60 seconds
if current_step == 1 # First invocation
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
#Request below sends the query to BigQuery
response = post("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries", payload)
#if Wait for query is false, the user can get the jobID back and get the results manually.
if response['jobComplete'] == false && input['wait_for_query'].is_true?
# reinvoke_after accepts 2 arguments.
# seconds is an integer that tells us how long to put the job to sleep for. MINIMUM 5 SECONDS
# continue is a hash is passed to the next invocation of the execute block when the job is woken up
reinvoke_after(seconds: step_time, continue: { current_step: current_step + 1, jobid: response['jobReference']['jobId'] })
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
else
call('format_rows', response)
end
elsif current_step <= max_steps # Subsequent invocations
response = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}")
# If job is still running, put to sleep again
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# If job is done but there was an error, raise an error
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# Reaching here means job is done and there are results.
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
# if we reach step 31, we need to raise an error and cancel the job.
else
error("Job took too long!")
end
end,
# ステップ4 - 出力項目の定義
このセクションでは、トリガーの出力として表示するデータピルを指示します。各データピルの name
属性は、execute
キーの出力に含まれるキーに一致している必要があります。
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,
オブジェクトの定義
この例では、ユーザーが入力項目で指定した output_fields
を使用します。以下に query_output
のオブジェクトの定義を示します。
query_output: {
fields: lambda do |connection, config_fields, object_definitions|
next if config_fields['output_fields'].blank?
parse_json(config_fields['output_fields'])
end
}
Last updated: 2023/8/31 1:07:14