ハウツーガイド: ファイルストリーミングによるファイルのアップロード(チャンクID)
このセグメントでは、ファイルストリーミングを通じてターゲットアプリケーションにファイルをアップロードし、チャンクにIDを割り当てるアクションの作成について説明します。 ここではAzure BlobのAPIを取り上げますが、この方法は、他の多くの高品質なクラウドストレージソリューションにも拡張できます。
アクションのタイムアウト
SDKアクションには180秒のタイムアウト制限があります。
タイムアウト制限を超過するファイルを転送するには、ファイルストリーミングアクションでcheckpoint!メソッドを使用できます。 追加情報については、マルチステップフレームワークを使用してアップロード時間を延長するセクションを参照してください。
サンプルコネクター
{
title: 'Upload to Azure Blob Friend URL',
# More connector code here
actions: {
upload_to_url: {
input_fields: lambda do |_object_definitions|
[
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = []
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).
params("comp": "block", "blockid": block_id).
request_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).
request_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end,
output_fields: lambda do |object_definitions|
[
{ name: "Etag", type: "string" }
]
end
}
}
# More connector code here
}ステップ1: アクションのタイトル、サブタイトル、説明、ヘルプ
優れたアクションを作成するための最初のステップは、アクションが何を行い、どのように実行するかを適切に伝え、ユーザーに追加のヘルプを提供することです。 これを行うために、Workatoではアクションのタイトル、サブタイトル、説明を定義し、ヒントを提供できます。 簡単に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を示します。 アクションの説明には、そのアクションが何を実現するかについての仕様と説明、および接続先アプリケーションのコンテキストが含まれます。 最後に、ヘルプセグメントでは、アクションを機能させるために必要な追加情報をユーザーに提供します。
このステップの詳細については、SDKリファレンスを参照してください
ステップ2 - 入力フィールドの定義
input_fields: lambda do |object_definitions|
[
{ name: "file", type: "stream" }, # field type must be stream
{ name: "url", label: "Any friendly URL" }
]
end,このコンポーネントは、オブジェクトをアップロードしようとしているユーザーに表示するフィールドをWorkatoに伝えます。 このコネクターの場合、file_name、typeをstreamとして定義する必要があるfile、およびこのファイルのアップロード先として使用できるわかりやすいURL用のurl入力を収集します。
ステップ3 - executeキーの定義
executeアクションでは、fileストリーム入力を受け取るworkato.stream.inを定義します。
workato.stream.inを呼び出した後、受信したこの特定のデータチャンクをアップロードする方法を示すブロックを定義する必要があります。 このブロックでは、Azureの要件に従って、Base64エンコードする一意のUUIDを作成します。 このblock_idをブロックの配列に保存し、最後に送信して、このファイル全体をコミットします。
その後、このblock_idとともに、フレンドリなAzure URLにPUTリクエストを送信します。 workato.stream.inは、streamコンシューマーがファイルの終了を示すまで、このブロックをループし続けます。
ストリームが消費された後、ブロックリスト全体を含む最後のPUTリクエストを送信します。 これはAzure BlobのAPIで規定されているXML形式です。
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = []
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).
params("comp": "block", "blockid": block_id).
request_body(chunk).
presence
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).
request_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
end,ステップ4 - 出力フィールドの定義
このセクションでは、トリガーの出力として表示するデータピルを指定します。 各データピルのname属性は、executeキーの出力に含まれるキーと一致している必要があります。
output_fields: lambda do |object_definitions|
output_fields: lambda do |object_definitions|
[
{ name: "Etag", type: "string" }
]
end
endバリエーション
マルチステップフレームワークを使用したアップロード時間の延長
workato.stream.inメソッドを定義するときに、from用の追加の名前付きパラメーターを定義できます。これはcheckpoint!メソッドと組み合わせて使用することで、アップロードアクションのタイムアウトをWorkatoの180秒の制限を超えて延長できます。
checkpoint!が呼び出されると、アクションの現在の実行時間が120秒を超えているかどうかを確認し、超えている場合は短い待機期間の後にアクションのタイムアウトを更新します。 これをfrom引数と組み合わせて使用すると、最後のバイトオフセットからどこで続行するかをWorkatoのストリーミングライブラリに伝えることができます。
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
block_list = closure["block_list"].presence || []
next_from = closure["next_from"].presence || 0
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).p
arams("comp": "block", "blockid": block_id).r
equest_body(chunk).
presence # presence is required as a way to force the HTTP request to be sent.
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).r
equest_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
endデフォルトの10MBチャンクサイズの調整
WorkatoがAPIからファイルチャンクを取得しようとすると、デフォルトでは10MBのチャンクをリクエストします。 場合によっては、APIがより大きい最小チャンクサイズを必要とすることがあります。その場合は、frame_size引数を使用して独自のチャンクサイズを宣言することで、このデフォルトを上書きできます。
これにより、すべてのプロデューサーから20MBのチャンクサイズを受け取ることが保証されるわけではない点に注意してください。 一時バッファーも保存することで、必要な予防策を講じることができます。
execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
# 20MB in bytes
frame_size = 20971520
block_list = closure["block_list"].presence || []
next_from = closure["next_from"].presence || 0
buffer = ""
# Calling workato.stream.in runs in a loop where the input should be file.
# It can accept both entire files or the output of a streaming-enabled download file action
workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
# save chunk to buffer
buffer << chunk
if !eof && buffer.size < frame_size
next
end
block_id = workato.uuid.encode_base64
block_list << block_id
put(input['url']).p
arams("comp": "block", "blockid": block_id).r
equest_body(buffer).
presence # presence is required as a way to force the HTTP request to be sent.
#reset buffer
buffer = ""
# Call checkpoint unless it is the end of file.
checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
end
payload = {
"Latest": block_list
}
{
"Etag" => put(input['url']).
params("comp": "blocklist").
payload(payload).r
equest_format_xml("BlockList").
response_format_raw.
after_response do |code, body, header|
header['Etag']
end
}
endLast updated: