# How-to guides - Upload file via file streaming (Content-Range headers)
In this segment, we will be going through the creation of actions that uploads files in a target application through file streaming and utilizing Content-Range Headers.
# Sample connector
{
title: 'Upload file to URL',
# More connector code here
actions: {
upload_to_url: {
input_fields: lambda do |_object_definitions|
[
{ name: "file_name", type: "string" },
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
put(input['url']).
headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
request_body(chunk).presence
end
# This commits the upload
post(input['url'], { "commit": true } )
end,
output_fields: lambda do |object_definitions|
[
{ name: "file_name", type: "string" },
{ name: "file_path", type: "string" },
{ name: "file_size", type: "integer" }
]
end
}
}
# More connector code here
}
# Step 1 - Action title, subtitles, description, and help
The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, subtitles, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.
To know more about this step, take a look at our SDK reference
# Step 2 - Define input fields
input_fields: lambda do |object_definitions|
[
{ name: "file_name", type: "string" },
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,
This component tells Workato what fields to show to a user trying to upload an object. In the case of this connector, we collect the file_name
, the file
which must be defined with type
as stream
and the url
input for a friendly URL that we can upload this file to.
# Step 3 - Defining the execute key
In the execute action, we define the workato.stream.in
which takes in the file
stream input.
After calling workato.stream.in
you're required to define a block that signifies how to upload this particular chunk of data received. In this block, we send a PUT request to the URL alongside standard Content-Range
headers to denote the part of the file we are uploading. workato.stream.in
continues to loop over this block until the stream
consumer dictates that the file has ended.
After the stream is consumed, we send a POST request to commit the entire upload as a new file.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
put(input['url']).
headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
request_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
end
# This commits the upload
post(input['url'], { "commit": true } )
end,
NOTE
Take note that we assume the API accepts *
as a wildcard range, indicating that the total file size is not yet known. In some cases, APIs may not accept a wildcard range and you should add an input field to collect the total file size from the recipe builder.
# Step 4 - Defining output fields
This section tells us what datapills to show as the output of the trigger. The name
attributes of each datapill should match the keys in the output of the execute
key. Here, we assume the response from the final POST request returns the file_name
, file_path
and file_size
.
output_fields: lambda do |object_definitions|
[
{ name: "file_name", type: "string" },
{ name: "file_path", type: "string" },
{ name: "file_size", type: "integer" }
]
end
# Variations
# Using our multistep framework to extend upload times
When defining the workato.stream.in
method, you are able to define an additional named parameter for from
, which can be used in conjunction with the checkpoint!
method to extend the timeout of your upload action beyond Workato's limit of 180 seconds.
When checkpoint!
is called, it checks if action's current execution time is larger than 120 seconds, and if so, refreshes the action timeout after a short waiting period. This can be used in conjunction with the from
argument to tell Workato's streaming library where to continue from the last byte offset.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
next_from = closure["next_from"].presence || 0
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
put(input['url']).
headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
request_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
end
# This commits the upload
post(input['url'], { "commit": true } )
end
# Adjusting the default 10MB chunk size
When Workato attempts to retrieve a file chunk from an API, it defaults to requesting a 10MB chunk. In some cases, your API may require a larger minimum chunk size and you can override this default by declaring your own chunk size using the frame_size
argument.
Take note that this does not guarantee that you will receive a chunk size of 20MB from all producers. You can make necessary precautions by storing a temporary buffer as well.
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
# 20MB in bytes
frame_size = 20971520
next_from = closure["next_from"].presence || 0
buffer = ""
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
# save chunk to buffer
buffer << chunk
# If not end of file and buffer is less than require size, skip to next iteration
if !eof && buffer.size < frame_size
next
end
put(input['url']).
headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
request_body(buffer).
presence # presence is required as a way to force the HTTP request to be sent.
#reset buffer
buffer = ""
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
end
# This commits the upload
post(input['url'], { "commit": true } )
end
Last updated: 4/5/2023, 11:28:53 AM