# How-to guides - Upload file via file streaming (Chunk ID)

In this segment, we will be going through the creation of actions that uploads files in a target application through file streaming and utilizing assigning IDs to chunks. We will be going through Azure Blob's API (opens new window) but this can be extended to many other best of breed cloud storage solutions.

# Sample connector

{
  title: 'Upload to Azure Blob Friend URL',

  # More connector code here
  actions: {
    upload_to_url: {
      input_fields: lambda do |_object_definitions|
        [
          { name: "file", type: "stream" }, # field type must be stream
          { name: "url", label: "Any friendly URL" }
        ]
      end,

      execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
        block_list = []
        # Calling workato.stream.in runs in a loop where the input should be file. 
        # It can accept both entire files or the output of a streaming-enabled download file action
        workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
          block_id = workato.uuid.encode_base64
          block_list << block_id
          put(input['url']).
            params("comp": "block", "blockid": block_id).
            request_body(chunk).
            presence # presence is required as a way to force the HTTP request to be sent. 
        end

        payload = {
          "Latest": block_list
        }
        
        { 
          "Etag" => put(input['url']).
                      params("comp": "blocklist").
                      payload(payload).
                      request_format_xml("BlockList").
                      response_format_raw.
                      after_response do |code, body, header|
                        header['Etag']
                      end
        }

      end,
      
      output_fields: lambda do |object_definitions|
        [
          { name: "Etag", type: "string" }
        ]
      end
    }
  }
  # More connector code here
}

# Step 1 - Action title, subtitles, description, and help

The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, subtitles, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.

To know more about this step, take a look at our SDK reference

# Step 2 - Define input fields

  input_fields: lambda do |object_definitions|
    [
      { name: "file", type: "stream" }, # field type must be stream
      { name: "url", label: "Any friendly URL" }
    ]
  end,

This component tells Workato what fields to show to a user trying to upload an object. In the case of this connector, we collect the file_name, the file which must be defined with type as stream and the url input for a friendly URL that we can upload this file to.

# Step 3 - Defining the execute key

In the execute action, we define the workato.stream.in which takes in the file stream input.

After calling workato.stream.in you're required to define a block that signifies how to upload this particular chunk of data received. In this block, we create a unique UUID that we base64 encode - as per Azure's requirements. We save this block_id in an array of blocks which we will send at the end to commit this entire file.

We then send a PUT request to the friendly Azure URL alongside this block_id. workato.stream.in continues to loop over this block until the stream consumer dictates that the file has ended.

After the stream is consumed, we send a final PUT request with the entire blocklist. This is in XML format as dictated by Azure Blob's API.

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    block_list = []
    # Calling workato.stream.in runs in a loop where the input should be file. 
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|  
      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).
        params("comp": "block", "blockid": block_id).
        request_body(chunk).
        presence
    end

    payload = {
      "Latest": block_list
    }
    
    { 
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).
                  request_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    }

  end,

# Step 4 - Defining output fields

This section tells us what datapills to show as the output of the trigger. The name attributes of each datapill should match the keys in the output of the execute key.

  output_fields: lambda do |object_definitions|
      output_fields: lambda do |object_definitions|
        [
          { name: "Etag", type: "string" }
        ]
      end
  end

# Variations

# Using our multistep framework to extend upload times

When defining the workato.stream.in method, you are able to define an additional named parameter for from, which can be used in conjunction with the checkpoint! method to extend the timeout of your upload action beyond Workato's limit of 180 seconds.

When checkpoint! is called, it checks if action's current execution time is larger than 120 seconds, and if so, refreshes the action timeout after a short waiting period. This can be used in conjunction with the from argument to tell Workato's streaming library where to continue from the last byte offset.

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    block_list = closure["block_list"].presence || []
    next_from = closure["next_from"].presence || 0

    # Calling workato.stream.in runs in a loop where the input should be file. 
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 

      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).p
        arams("comp": "block", "blockid": block_id).r
        equest_body(chunk).
        presence # presence is required as a way to force the HTTP request to be sent.

      # Call checkpoint unless it is the end of file.
      checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
    end

    payload = {
      "Latest": block_list
    }
    
    { 
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).r
                  equest_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    } 
  end

# Adjusting the default 10MB chunk size

When Workato attempts to retrieve a file chunk from an API, it defaults to requesting a 10MB chunk. In some cases, your API may require a larger minimum chunk size and you can override this default by declaring your own chunk size using the frame_size argument.

Take note that this does not guarantee that you will receive a chunk size of 20MB from all producers. You can make necessary precautions by storing a temporary buffer as well.

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # 20MB in bytes
    frame_size = 20971520 
    block_list = closure["block_list"].presence || []
    next_from = closure["next_from"].presence || 0
    buffer = ""

    # Calling workato.stream.in runs in a loop where the input should be file. 
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
      # save chunk to buffer
      buffer << chunk
      if !eof && buffer.size < frame_size
        next
      end
      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).p
        arams("comp": "block", "blockid": block_id).r
        equest_body(buffer).
        presence # presence is required as a way to force the HTTP request to be sent.

      #reset buffer
      buffer = ""

      # Call checkpoint unless it is the end of file.
      checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
    end

    payload = {
      "Latest": block_list
    }
    
    { 
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).r
                  equest_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    } 
  end


Last updated: 4/5/2023, 11:28:53 AM