# How-to guides - Upload file via file streaming (Chunk ID)
In this segment, we will be going through the creation of actions that uploads files in a target application through file streaming and utilizing assigning IDs to chunks. We will be going through Azure Blob's API (opens new window) but this can be extended to many other best of breed cloud storage solutions.
# Sample connector
{
title: 'Upload to Azure Blob Friend URL',
# More connector code here
actions: {
upload_to_url: {
input_fields: lambda do |_object_definitions|
[
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = []
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).
params("comp": "block", "blockid": block_id).
request_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).
request_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end,
output_fields: lambda do |object_definitions|
[
{ name: "Etag", type: "string" }
]
end
}
}
# More connector code here
}
# Step 1 - Action title, subtitles, description, and help
The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, subtitles, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.
To know more about this step, take a look at our SDK reference
# Step 2 - Define input fields
input_fields: lambda do |object_definitions|
[
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,
This component tells Workato what fields to show to a user trying to upload an object. In the case of this connector, we collect the file_name
, the file
which must be defined with type
as stream
and the url
input for a friendly URL that we can upload this file to.
# Step 3 - Defining the execute key
In the execute action, we define the workato.stream.in
which takes in the file
stream input.
After calling workato.stream.in
you're required to define a block that signifies how to upload this particular chunk of data received. In this block, we create a unique UUID that we base64 encode - as per Azure's requirements. We save this block_id
in an array of blocks which we will send at the end to commit this entire file.
We then send a PUT request to the friendly Azure URL alongside this block_id. workato.stream.in
continues to loop over this block until the stream
consumer dictates that the file has ended.
After the stream is consumed, we send a final PUT request with the entire blocklist. This is in XML format as dictated by Azure Blob's API.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = []
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).
params("comp": "block", "blockid": block_id).
request_body(chunk).
presence
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).
request_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end,
# Step 4 - Defining output fields
This section tells us what datapills to show as the output of the trigger. The name
attributes of each datapill should match the keys in the output of the execute
key.
output_fields: lambda do |object_definitions|
output_fields: lambda do |object_definitions|
[
{ name: "Etag", type: "string" }
]
end
end
# Variations
# Using our multistep framework to extend upload times
When defining the workato.stream.in
method, you are able to define an additional named parameter for from
, which can be used in conjunction with the checkpoint!
method to extend the timeout of your upload action beyond Workato's limit of 180 seconds.
When checkpoint!
is called, it checks if action's current execution time is larger than 120 seconds, and if so, refreshes the action timeout after a short waiting period. This can be used in conjunction with the from
argument to tell Workato's streaming library where to continue from the last byte offset.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = closure["block_list"].presence || []
next_from = closure["next_from"].presence || 0
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).p
arams("comp": "block", "blockid": block_id).r
equest_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).r
equest_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end
# Adjusting the default 10MB chunk size
When Workato attempts to retrieve a file chunk from an API, it defaults to requesting a 10MB chunk. In some cases, your API may require a larger minimum chunk size and you can override this default by declaring your own chunk size using the frame_size
argument.
Take note that this does not guarantee that you will receive a chunk size of 20MB from all producers. You can make necessary precautions by storing a temporary buffer as well.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
# 20MB in bytes
frame_size = 20971520
block_list = closure["block_list"].presence || []
next_from = closure["next_from"].presence || 0
buffer = ""
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
# save chunk to buffer
buffer << chunk
if !eof && buffer.size < frame_size
next
end
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).p
arams("comp": "block", "blockid": block_id).r
equest_body(buffer).
presence # presence is required as a way to force the HTTP request to be sent.
#reset buffer
buffer = ""
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).r
equest_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end
Last updated: 4/5/2023, 11:28:53 AM