# How-to guides - Multistep actions

In this segment, we will be going through the creation of actions that work with asynchronous APIs. Typically, when working with asynchronous APIs to kickstart a long running job or process in a target application, often times you'll send a request and expect an ID that corresponds to that job or process. Your action would then want to constantly check back with the API to see if the job is completed before retrieving results or moving on to the next step in the recipe.

For example, when you send a request to Google BigQuery to start a query, Google BigQuery might send you back the job ID. Your task would be to now regularly check back with Google BigQuery to see if the query is completed before retrieving the rows.

Rather than having the user configure this logic in the recipe, you can now embed this entire logic into a single action with "multi-step" actions on your custom connector. To use "multi-step" actions, the continue argument is used in conjunction with a dedicated method called reinvoke_after. Read on below on how to configure this!

Limitation in Workato's Test code tab

Multistep actions cannot be debugged in Workato's SDK Test code tab. To debug multistep actions, use our SDK Gem.

# Sample connector - Google BigQuery

{
  title: 'My Google BigQuery connector',

  # More connector code here
  actions: {
       query: {
        title: "Execute query",

        subtitle: "Execute query in BigQuery",

        description: "Run Query in BigQuery",

        help: "This query runs synchronously for 25 seconds. If the query takes longer than that, it turns into an asynchronous action. There is a limit of ~38 minutes for the query to complete. ",

        input_fields: lambda do 
          [
            { 
              name: "project_id", 
              control_type: 'select', 
              pick_list: 'projects', 
              optional: false 
            },
            { 
              name: "query", 
              optional: false 
            },
            { 
              name: 'wait_for_query', 
              control_type: 'checkbox',
              sticky: true,
            },
            {
              name: "output_fields",
              extends_schema: true,
              schema_neutral: false,
              sticky: true,
              control_type: 'schema-designer',
              label: 'Output columns',
              hint: 'Provide your output fields for your query if you are providing datapills in your query',
              item_label: 'Design your output columns',
              sample_data_type: 'csv' # json_input / xml
            },
          ]
        end,

        execute: lambda do |connection, input, eis, eos, continue|
          continue = {} unless continue.present?
          current_step = continue['current_step'] || 1
          max_steps = 30
          step_time = current_step * 60 # This helps us wait longer and longer as we increase in steps
          # Minimum step time is 60 seconds

          if current_step == 1 # First invocation    
            payload = {
              query: input['query'],
              timeoutMs: '25000',
              useLegacySql: false
            }
            url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries"
            response = post(url, payload)

            # If user wants to wait for query to complete and 
            # job isn't complete after 25s
            if response['jobComplete'] == false && input['wait_for_query'].is_true?
              reinvoke_after(
                seconds: step_time, 
                continue: { 
                  current_step: current_step + 1, 
                  jobid: response['jobReference']['jobId'] 
                }
              )
            # If user doesn't want to wait for query to complete and
            # job isn't complete after 25s
            elsif response['jobComplete'] == false
              { jobId: response['jobReference']['jobId'] }
            # Job is complete after 25s
            else
              call('format_rows', response)
            end
          # Subsequent invocations
          elsif current_step <= max_steps 
            url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}"
            response = get(url)
            # If job is still running
            if response['status']['state'] == "RUNNING"
              reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
            # If status is done but there is an error
            elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
              error(response.dig('status', 'errorResult'))
            # If status is done 
            else
              results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
              call('format_rows', results)
            end
          else
            error("Job took too long!")
          end
        end,

        output_fields: lambda do |object_definitions, config_fields|
          schema = [
            {
              name: "jobId"
            },
            {
              name: "totalRows"
            },
            {
              name: "pageToken"
            },
            {
              name: "rows",
              type: "array",
              of: "object",
              properties: object_definitions['query_output']
            }
          ]
        end,

        summarize_output: ['rows']
      },
}

# Step 1 - Action title, subtitle, description, and help

The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.

To know more about this step, take a look at our SDK reference

# Step 2 - Define input fields

  input_fields: lambda do 
    [
      { 
        name: "project_id", 
        control_type: 'select', 
        pick_list: 'projects', 
        optional: false 
      },
      { 
        name: "query", 
        optional: false 
      },
      { 
        name: 'wait_for_query', 
        control_type: 'checkbox',
        sticky: true,
      },
      {
        name: "output_fields",
        extends_schema: true,
        schema_neutral: false,
        sticky: true,
        control_type: 'schema-designer',
        label: 'Output columns',
        hint: 'Provide your output fields for your query if you are providing datapills in your query',
        item_label: 'Design your output columns',
        sample_data_type: 'csv' # json_input / xml
      },
    ]
  end,

This component tells Workato what fields to show to a user trying to execute the multistep action. In the case of executing a query in BigQuery for example, the user has to provide us the following:

  1. Google BigQuery GCP project ID
  2. The query to execute in Google BigQuery
  3. Whether the action should wait for the query to complete
  4. And the output columns expected from the query

# Step 3 - Defining the execute key

The execute key tells Workato the endpoint to send the request to and using which HTTP request method and also controls the entire logic as to how this action should interact with this asynchronous API. When configuring multistep actions, you will need to utilize the continue argument together with the reinvoke_after method. This will allow you to first invoke the execute lambda function to insert the query in Google BigQuery, optionally put the job to sleep to be woken up later to check if the query is done.

When the job is woken up, the execute lambda function is invoked again with the continue passed to it from the previous reinvoke_after call. This continue argument should have the job id of the Google BigQuery job created. We then check if the job is done. If it is still running, we put the job to sleep again. If the job is done, we can retrieve the results and send it as the output of the action.

TIP

Step time must be set to a minimum of 60 seconds. If anything lower is supplied, Workato default to 60 seconds.

  execute: lambda do |connection, input, eis, eos, continue|
    continue = {} unless continue.present? #For the first invocation, continue is nil
    current_step = continue['current_step'] || 1 #Instantiate current_step so we know what step we are on
    max_steps = 30 #IMPORTANT. you should set the max number of steps so your action doesn't continue forever. This will cause performance degradation in your recipes
    step_time = current_step * 10 # This helps us wait longer and longer as we increase in steps
    # Minimum step time is 60 seconds

    if current_step == 1 # First invocation    
      payload = {
        query: input['query'],
        timeoutMs: '25000',
        useLegacySql: false
      }
      #Request below sends the query to BigQuery
      response = post("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries", payload)

      #if Wait for query is false, the user can get the jobID back and get the results manually. 
      if response['jobComplete'] == false && input['wait_for_query'].is_true?
        # reinvoke_after accepts 2 arguments. 
        # seconds is an integer that tells us how long to put the job to sleep for. MINIMUM 5 SECONDS
        # continue is a hash is passed to the next invocation of the execute block when the job is woken up
        reinvoke_after(seconds: step_time, continue: { current_step: current_step + 1, jobid: response['jobReference']['jobId'] })
      elsif response['jobComplete'] == false
        { jobId: response['jobReference']['jobId'] }
      else
        call('format_rows', response)
      end
    elsif current_step <= max_steps  # Subsequent invocations
      response = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}")

      # If job is still running, put to sleep again
      if response['status']['state'] == "RUNNING"
        reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
      # If job is done but there was an error, raise an error
      elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
        error(response.dig('status', 'errorResult'))
      # Reaching here means job is done and there are results. 
      else
        results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
        call('format_rows', results)
      end
    # if we reach step 31, we need to raise an error and cancel the job.
    else
      error("Job took too long!")
    end
  end,

# Step 4 - Defining output fields

This section tells us what datapills to show as the output of the trigger. The name attributes of each datapill should match the keys in the output of the execute key.

output_fields: lambda do |object_definitions, config_fields|
  schema = [
    {
      name: "jobId"
    },
    {
      name: "totalRows"
    },
    {
      name: "pageToken"
    },
    {
      name: "rows",
      type: "array",
      of: "object",
      properties: object_definitions['query_output']
    }
  ]
end,

Object definitions

In this example, we make use of the output_fields given to us by the user in his input fields. Here is the object definition of query_output.

query_output: {
  fields: lambda do |connection, config_fields, object_definitions|
    next if config_fields['output_fields'].blank?
    parse_json(config_fields['output_fields'])
  end
}


Last updated: 10/23/2024, 10:06:07 PM