# How to ガイド - マルチステップアクション

ここでは、非同期 API を使用するアクションの作成について説明します。通常、非同期 API を使用してターゲットアプリケーションで長時間実行されるジョブやプロセスを開始するときは、多くの場合、リクエストを送信し、そのジョブやプロセスに対応する ID を受け取ります。アクションで API を定期的にチェックし、ジョブが完了していることを確認してから、結果を取得するかレシピ内の次のステップに進みます。

たとえば、クエリーを開始するように Google BigQuery にリクエストを送信すると、ジョブ ID が返されることがあります。この場合、Google BigQuery を定期的にチェックし、クエリーが完了していることを確認してから、結果の行を取得します。

このロジックをユーザーがレシピで設定する代わりに、カスタムコネクターの「マルチステップ」アクションを使用して、このロジック全体を単一のアクションに埋め込むことができるようになりました。「マルチステップ」アクションを使用するには、continue 引数を reinvoke_after という専用メソッドと組み合わせて使用します。この設定方法については、以下を参照してください。

Workato のクラウドデバッガーの制限事項

マルチステップアクションを Workato のクラウドデバッガーでデバッグすることはできません。マルチステップアクションをデバッグするには、SDK Gem を使用します。

# サンプルコネクター - Google BigQuery

{
  title: 'My Google BigQuery connector',

  # More connector code here
  actions: {
       query: {
        title: "Execute query",

        subtitle: "Execute query in BigQuery",

        description: "Run Query in BigQuery",

        help: "This query runs synchronously for 25 seconds. If the query takes longer than that, it turns into an asynchronous action. There is a limit of ~38 minutes for the query to complete. ",

        input_fields: lambda do 
          [
            { 
              name: "project_id", 
              control_type: 'select', 
              pick_list: 'projects', 
              optional: false 
            },
            { 
              name: "query", 
              optional: false 
            },
            { 
              name: 'wait_for_query', 
              control_type: 'checkbox',
              sticky: true,
            },
            {
              name: "output_fields",
              extends_schema: true,
              schema_neutral: false,
              sticky: true,
              control_type: 'schema-designer',
              label: 'Output columns',
              hint: 'Provide your output fields for your query if you are providing datapills in your query',
              item_label: 'Design your output columns',
              sample_data_type: 'csv' # json_input / xml
            },
          ]
        end,

        execute: lambda do |connection, input, eis, eos, continue|
          continue = {} unless continue.present?
          current_step = continue['current_step'] || 1
          max_steps = 30
          step_time = current_step * 60 # This helps us wait longer and longer as we increase in steps
          # Minimum step time is 60 seconds

          if current_step == 1 # First invocation    
            payload = {
              query: input['query'],
              timeoutMs: '25000',
              useLegacySql: false
            }
            url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries"
            response = post(url, payload)

            # If user wants to wait for query to complete and 
            # job isn't complete after 25s
            if response['jobComplete'] == false && input['wait_for_query'].is_true?
              reinvoke_after(
                seconds: step_time, 
                continue: { 
                  current_step: current_step + 1, 
                  jobid: response['jobReference']['jobId'] 
                }
              )
            # If user doesn't want to wait for query to complete and
            # job isn't complete after 25s
            elsif response['jobComplete'] == false
              { jobId: response['jobReference']['jobId'] }
            # Job is complete after 25s
            else
              call('format_rows', response)
            end
          # Subsequent invocations
          elsif current_step <= max_steps 
            url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}"
            response = get(url)
            # If job is still running
            if response['status']['state'] == "RUNNING"
              reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
            # If status is done but there is an error
            elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
              error(response.dig('status', 'errorResult'))
            # If status is done 
            else
              results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
              call('format_rows', results)
            end
          else
            error("Job took too long!")
          end
        end,

        output_fields: lambda do |object_definitions, config_fields|
          schema = [
            {
              name: "jobId"
            },
            {
              name: "totalRows"
            },
            {
              name: "pageToken"
            },
            {
              name: "rows",
              type: "array",
              of: "object",
              properties: object_definitions['query_output']
            }
          ]
        end,

        summarize_output: ['rows']
      },
}

# ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ

優れたアクションの作成に向けた最初のステップは、そのアクションが何をどのように実行するかを適切に伝え、さらにユーザーに追加のヘルプを提供することです。そのために Workato では、アクションのタイトルと説明を定義したり、ヒントを提供したりできるようにしています。簡単に言えば、タイトルはアクションの名称であり、サブタイトルはそのアクションのより詳しい内容を表します。続いて、アクションの説明は、接続先となるアプリケーションにおいてそのアクションがどのようなことを達成するかについて、仕様や解説を提供します。最後に、ヘルプのセグメントは、アクションをうまく機能させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、Workato の SDK リファレンスを参照してください。

# ステップ2 - 入力項目の定義

  input_fields: lambda do 
    [
      { 
        name: "project_id", 
        control_type: 'select', 
        pick_list: 'projects', 
        optional: false 
      },
      { 
        name: "query", 
        optional: false 
      },
      { 
        name: 'wait_for_query', 
        control_type: 'checkbox',
        sticky: true,
      },
      {
        name: "output_fields",
        extends_schema: true,
        schema_neutral: false,
        sticky: true,
        control_type: 'schema-designer',
        label: 'Output columns',
        hint: 'Provide your output fields for your query if you are providing datapills in your query',
        item_label: 'Design your output columns',
        sample_data_type: 'csv' # json_input / xml
      },
    ]
  end,

この部分では、マルチステップアクションの実行を試みるユーザーに向けて表示すべき項目を Workato に指示します。たとえば BigQuery でクエリーを実行する場合、ユーザーは以下を提供する必要があります。

  1. Google BigQuery GCP プロジェクト ID
  2. Google BigQuery で実行するクエリー
  3. アクションがクエリーの完了を待機するかどうか
  4. クエリーからの期待される出力列

# ステップ3 - execute キーの定義

execute キーでは、リクエスト送信先となるエンドポイントと、使用する HTTP リクエストメソッドを Workato に指示するとともに、アクションと非同期 API とのやり取りについてロジック全体を制御します。マルチステップアクションを設定する場合、continue 引数を reinvoke_after メソッドとともに使用する必要があります。これにより、まず execute lambda 関数を呼び出して Google BigQuery にクエリーを挿入し、オプションでジョブをスリープ状態にして後から起動し、クエリーが完了したかどうかを確認することができます。

ジョブを起動する際は、前の reinvoke_after 呼び出しから渡された continue とともに execute lambda 関数が再度呼び出されます。この continue 引数には、作成された Google BigQuery ジョブのジョブ ID が必要です。その後、ジョブが完了したかどうかを確認します。まだ実行中の場合はジョブを再度スリープさせ、ジョブが完了している場合は結果を取得し、アクションの出力として送信できます。

TIP

ステップ時間は60秒以上に設定する必要があります。これより短い時間を設定すると、Workato によりデフォルトで60秒に設定されます。

  execute: lambda do |connection, input, eis, eos, continue|
    continue = {} unless continue.present? #For the first invocation, continue is nil
    current_step = continue['current_step'] || 1 #Instantiate current_step so we know what step we are on
    max_steps = 30 #IMPORTANT. you should set the max number of steps so your action doesn't continue forever. This will cause performance degradation in your recipes
    step_time = current_step * 10 # This helps us wait longer and longer as we increase in steps
    # Minimum step time is 60 seconds

    if current_step == 1 # First invocation    
      payload = {
        query: input['query'],
        timeoutMs: '25000',
        useLegacySql: false
      }
      #Request below sends the query to BigQuery
      response = post("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries", payload)

      #if Wait for query is false, the user can get the jobID back and get the results manually. 
      if response['jobComplete'] == false && input['wait_for_query'].is_true?
        # reinvoke_after accepts 2 arguments. 
        # seconds is an integer that tells us how long to put the job to sleep for. MINIMUM 5 SECONDS
        # continue is a hash is passed to the next invocation of the execute block when the job is woken up
        reinvoke_after(seconds: step_time, continue: { current_step: current_step + 1, jobid: response['jobReference']['jobId'] })
      elsif response['jobComplete'] == false
        { jobId: response['jobReference']['jobId'] }
      else
        call('format_rows', response)
      end
    elsif current_step <= max_steps  # Subsequent invocations
      response = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}")

      # If job is still running, put to sleep again
      if response['status']['state'] == "RUNNING"
        reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
      # If job is done but there was an error, raise an error
      elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
        error(response.dig('status', 'errorResult'))
      # Reaching here means job is done and there are results. 
      else
        results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
        call('format_rows', results)
      end
    # if we reach step 31, we need to raise an error and cancel the job.
    else
      error("Job took too long!")
    end
  end,

# ステップ4 - 出力項目の定義

このセクションでは、トリガーの出力として表示するデータピルを指示します。各データピルの name 属性は、execute キーの出力に含まれるキーに一致している必要があります。

output_fields: lambda do |object_definitions, config_fields|
  schema = [
    {
      name: "jobId"
    },
    {
      name: "totalRows"
    },
    {
      name: "pageToken"
    },
    {
      name: "rows",
      type: "array",
      of: "object",
      properties: object_definitions['query_output']
    }
  ]
end,

オブジェクトの定義

この例では、ユーザーが入力項目で指定した output_fields を使用します。以下に query_output のオブジェクトの定義を示します。

query_output: {
  fields: lambda do |connection, config_fields, object_definitions|
    next if config_fields['output_fields'].blank?
    parse_json(config_fields['output_fields'])
  end
}


Last updated: 2023/8/31 1:07:14