ハウツーガイド - マルチステップアクション
このセグメントでは、非同期APIと連携するアクションの作成について説明します。 通常、ターゲットアプリケーションで長時間実行されるジョブまたはプロセスを開始するために非同期APIを使用する場合、多くの場合、リクエストを送信し、そのジョブまたはプロセスに対応するIDを受け取ることを想定します。 その後、結果を取得する前、またはレシピの次のステップに進む前に、アクションはAPIに継続的に確認してジョブが完了したかどうかを確認します。
たとえば、クエリを開始するためにGoogle BigQueryにリクエストを送信すると、Google BigQueryからジョブIDが返される場合があります。 この時点でのタスクは、行を取得する前に、Google BigQueryに定期的に確認してクエリが完了したかどうかを確認することです。
ユーザーにこのロジックをレシピで設定させるのではなく、カスタムコネクターで"multi-step"アクションを使用して、このロジック全体を単一のアクションに埋め込むことができます。 "multi-step"アクションを使用するには、continue引数をreinvoke_afterという専用メソッドと組み合わせて使用します。 設定方法については、以下をお読みください。
テストコードタブでのデバッグ制限
WorkatoのSDK テストコードタブでは、マルチステップアクションをデバッグできません。 これにより、uncaught throw :return_from_actionエラーがトリガーされる場合があります。 代わりに、SDK Gemを使用してマルチステップアクションをデバッグしてください。
レシピエディターでテストする前に、必ずカスタムコネクターの最新バージョンをリリースしてください。 これにより、デバッグ中に行った変更が適用されます。
アクションのタイムアウト
SDKアクションには180秒のタイムアウト制限があります。
サンプルコネクター - Google BigQuery
{
title: '自分のGoogle BigQueryコネクター',
# その他のコネクターコードをここに記述
actions: {
query: {
title: "クエリを実行",
subtitle: "BigQueryでクエリを実行",
description: "BigQueryでクエリを実行",
help: "このクエリは25秒間同期的に実行されます。クエリにそれより長い時間がかかる場合、非同期アクションになります。クエリの完了には約38分の制限があります。 ",
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: '出力列',
hint: 'クエリでデータピルを提供する場合は、クエリの出力フィールドを指定してください',
item_label: '出力列を設計',
sample_data_type: 'csv' # json_input / xml
},
]
end,
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present?
current_step = continue['current_step'] || 1
max_steps = 30
step_time = current_step * 60 # ステップが増えるにつれて待機時間を長くできます
# 最小ステップ時間は60秒です
if current_step == 1 # 初回呼び出し
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries"
response = post(url, payload)
# ユーザーがクエリの完了を待機することを希望し、
# 25秒後もジョブが完了していない場合
if response['jobComplete'] == false && input['wait_for_query'].is_true?
reinvoke_after(
seconds: step_time,
continue: {
current_step: current_step + 1,
jobid: response['jobReference']['jobId']
}
)
# ユーザーがクエリの完了を待機することを希望せず、
# 25秒後もジョブが完了していない場合
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
# ジョブは25秒後に完了
else
call('format_rows', response)
end
# 後続の呼び出し
elsif current_step <= max_steps
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}"
response = get(url)
# ジョブがまだ実行中の場合
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# ステータスは完了だがエラーがある場合
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# ステータスが完了の場合
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
else
error("ジョブに時間がかかりすぎました。")
end
end,
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,
summarize_output: ['rows']
},
}ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ
優れたアクションを作成するための最初のステップは、アクションが何を行い、どのように実行するかを適切に伝え、ユーザーに追加のヘルプを提供することです。 そのため、Workatoではアクションのタイトルと説明を定義し、ヒントを提供できます。 簡単に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を示します。 アクションの説明には、そのアクションが何を実現するかについての仕様と説明、および接続先アプリケーションのコンテキストが含まれます。 最後に、ヘルプセグメントでは、アクションを機能させるために必要な追加情報をユーザーに提供します。
このステップの詳細については、SDKリファレンスを参照してください
ステップ2 - 入力フィールドの定義
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: '出力列',
hint: 'クエリでデータピルを提供する場合は、クエリの出力フィールドを指定してください',
item_label: '出力列を設計',
sample_data_type: 'csv' # json_input / xml
},
]
end,このコンポーネントは、マルチステップアクションを実行しようとしているユーザーに表示するフィールドをWorkatoに指示します。 たとえばBigQueryでクエリを実行する場合、ユーザーは次の情報を提供する必要があります。
- Google BigQuery GCPプロジェクトID
- Google BigQueryで実行するクエリ
- アクションがクエリの完了を待機するかどうか
- およびクエリから想定される出力列
ステップ3 - executeキーの定義
executeキーは、リクエストの送信先エンドポイントと使用するHTTPリクエストメソッドをWorkatoに指示し、このアクションがこの非同期APIとどのようにやり取りするかに関するロジック全体も制御します。 マルチステップアクションを設定する場合は、continue引数をreinvoke_afterメソッドと組み合わせて使用する必要があります。 これにより、まずexecuteラムダ関数を呼び出してGoogle BigQueryにクエリを挿入し、必要に応じてジョブをスリープ状態にして、後で起動してクエリが完了したかどうかを確認できます。
ジョブが起動されると、前回のreinvoke_after呼び出しから渡されたcontinueとともに、executeラムダ関数が再度呼び出されます。 このcontinue引数には、作成されたGoogle BigQueryジョブのジョブIDが含まれている必要があります。 次に、ジョブが完了したかどうかを確認します。 まだ実行中の場合は、ジョブを再度スリープ状態にします。 ジョブが完了している場合は、結果を取得してアクションの出力として送信できます。
TIP
ステップ時間は最低60秒に設定する必要があります。 それより低い値が指定された場合、Workatoはデフォルトで60秒になります。
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present? #初回呼び出しでは、continueはnilです
current_step = continue['current_step'] || 1 #current_stepをインスタンス化して、現在のステップを把握できるようにします
max_steps = 30 #重要。アクションが永続的に継続しないように、ステップの最大数を設定する必要があります。設定しないとレシピのパフォーマンス低下を引き起こします
step_time = current_step * 10 # ステップが増えるにつれて待機時間を長くできます
# 最小ステップ時間は60秒です
if current_step == 1 # 初回呼び出し
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
#以下のリクエストでクエリをBigQueryに送信します
response = post("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries", payload)
#クエリの待機がfalseの場合、ユーザーはjobIDを取得して結果を手動で取得できます。
if response['jobComplete'] == false && input['wait_for_query'].is_true?
# reinvoke_afterは2つの引数を受け入れます。
# secondsは、ジョブをどのくらいスリープ状態にするかを示す整数です。最小5秒
# continueは、ジョブが起動されたときにexecuteブロックの次の呼び出しに渡されるハッシュです
reinvoke_after(seconds: step_time, continue: { current_step: current_step + 1, jobid: response['jobReference']['jobId'] })
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
else
call('format_rows', response)
end
elsif current_step <= max_steps # 後続の呼び出し
response = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}")
# ジョブがまだ実行中の場合、再度スリープ状態にします
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# ジョブが完了しているがエラーがあった場合、エラーを発生させます
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# ここに到達した場合、ジョブが完了し、結果が存在することを意味します。
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
# ステップ31に到達した場合、エラーを発生させてジョブをキャンセルする必要があります。
else
error("ジョブに時間がかかりすぎました。")
end
end,ステップ4 - 出力フィールドの定義
このセクションでは、トリガーの出力として表示するデータピルを指定します。 各データピルのname属性は、executeキーの出力に含まれるキーと一致している必要があります。
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,オブジェクト定義
この例では、ユーザーが入力フィールドで指定したoutput_fieldsを使用します。 以下はquery_outputのオブジェクト定義です。
query_output: {
fields: lambda do |connection, config_fields, object_definitions|
next if config_fields['output_fields'].blank?
parse_json(config_fields['output_fields'])
end
}Last updated: