# ハウツーガイド - ファイルストリーミングを介してファイルをアップロードする方法(チャンクID)

このセグメントでは、ファイルストリーミングを介してターゲットアプリケーションにファイルをアップロードし、チャンクにIDを割り当てるアクションの作成方法について説明します。Azure BlobのAPI (opens new window)を使用しますが、これは多くの他のクラウドストレージソリューションにも拡張できます。

# サンプルコネクタ

{
  title: 'Azure BlobにアップロードするフレンドURL',

  # その他のコネクタコード

  actions: {
    upload_to_url: {
      input_fields: lambda do |_object_definitions|
        [
          { name: "file", type: "stream" }, # フィールドタイプはストリームである必要があります
          { name: "url", label: "フレンドリーなURL" }
        ]
      end,

      execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
        block_list = []
        # workato.stream.inを呼び出すと、入力がファイルであるループが実行されます。
        # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます
        workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
          block_id = workato.uuid.encode_base64
          block_list << block_id
          put(input['url']).
            params("comp": "block", "blockid": block_id).
            request_body(chunk).
            presence # HTTPリクエストを送信するために必要な存在
        end

        payload = {
          "Latest": block_list
        }
        
        { 
          "Etag" => put(input['url']).
                      params("comp": "blocklist").
                      payload(payload).
                      request_format_xml("BlockList").
                      response_format_raw.
                      after_response do |code, body, header|
                        header['Etag']
                      end
        }

      end,
      
      output_fields: lambda do |object_definitions|
        [
          { name: "Etag", type: "string" }
        ]
      end
    }
  }
  # その他のコネクタコード
}

# ステップ1 - アクションのタイトル、サブタイトル、説明、ヘルプ

良いアクションを作成するための最初のステップは、アクションが何を行うのか、どのように行うのか、ユーザーに追加のヘルプを提供するために適切に伝えることです。そのために、Workatoでは、アクションのタイトル、サブタイトル、説明を定義し、ヒントを提供することができます。単純に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を提供します。アクションの説明には、アクションが達成する仕様と説明、およびそれが接続するアプリケーションのコンテキストでの説明が含まれます。最後に、ヘルプセグメントでは、アクションを動作させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、SDKリファレンスを参照してください。

# ステップ2 - 入力フィールドの定義

  input_fields: lambda do |object_definitions|
    [
      { name: "file", type: "stream" }, # フィールドタイプはストリームである必要があります
      { name: "url", label: "フレンドリーなURL" }
    ]
  end,

このコンポーネントは、オブジェクトをアップロードしようとするユーザーに表示するフィールドをWorkatoに伝えます。このコネクタの場合、file_namefiletypestreamとして定義する必要があります)、およびこのファイルをアップロードするためのフレンドリーなURLのurl入力を収集します。

# ステップ3 - executeキーの定義

executeアクションでは、workato.stream.inを定義し、fileストリーム入力を受け取ります。

workato.stream.inを呼び出した後は、受け取ったデータのこの特定のチャンクをどのようにアップロードするかを示すブロックを定義する必要があります。このブロックでは、一意のUUIDを作成し、Azureの要件に従ってBase64エンコードします。このblock_idをブロックの配列に保存し、最後にこのファイル全体を確定するために送信します。

次に、このブロック_idと共にフレンドリーなAzure URLにPUTリクエストを送信します。workato.stream.inは、streamコンシューマがファイルの終了を指示するまで、このブロックを繰り返し実行し続けます。

ストリームが消費された後、最終的なPUTリクエストを送信し、ブロックリスト全体を送信します。これはAzure BlobのAPIによって指定されたXML形式です。 execute: lambda do |_connection, input, _input_schema, _output_schema, closure

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    block_list = []
    # workato.stream.inを呼び出すと、入力がファイルであるループが実行されます。
    # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます
    workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|  
      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).
        params("comp": "block", "blockid": block_id).
        request_body(chunk).
        presence
    end

    payload = {
      "Latest": block_list
    }
    
    { 
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).
                  request_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    }

  end,

# ステップ4 - 出力フィールドの定義

このセクションでは、トリガーの出力として表示するデータピルを定義します。各データピルのname属性は、executeキーの出力のキーと一致する必要があります。

output_fields: lambda do |object_definitions|
    output_fields: lambda do |object_definitions|
      [
        { name: "Etag", type: "string" }
      ]
    end
end

# バリエーション

# アップロード時間を延長するためのマルチステップフレームワークの使用

workato.stream.inメソッドを定義する際に、fromの追加の名前付きパラメータを定義することができます。これは、checkpoint!メソッドと組み合わせて、Workatoの180秒の制限を超えるアップロードアクションのタイムアウトを延長するために使用できます。

checkpoint!が呼び出されると、アクションの現在の実行時間が120秒を超えているかどうかをチェックし、短い待機期間の後にアクションのタイムアウトを更新します。これは、from引数と組み合わせて、Workatoのストリーミングライブラリに最後のバイトオフセットから続行する場所を伝えるために使用できます。

execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
  block_list = closure["block_list"].presence || []
  next_from = closure["next_from"].presence || 0

  # workato.stream.inを呼び出すと、入力がファイルであるループで実行されます。
  # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます。
  workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 

    block_id = workato.uuid.encode_base64
    block_list << block_id
    put(input['url']).p
      arams("comp": "block", "blockid": block_id).r
      equest_body(chunk).
      presence # presenceは、HTTPリクエストを送信するための方法として必要です。

    # ファイルの終わりでない場合は、checkpointを呼び出します。
    checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
  end

  payload = {
    "Latest": block_list
  }
  
  { 
    "Etag" => put(input['url']).
                params("comp": "blocklist").
                payload(payload).r
                equest_format_xml("BlockList").
                response_format_raw.
                after_response do |code, body, header|
                  header['Etag']
                end
  } 
end

# デフォルトの10MBのチャンクサイズの調整

WorkatoがAPIからファイルのチャンクを取得しようとすると、デフォルトで10MBのチャンクをリクエストします。一部の場合では、APIがより大きな最小チャンクサイズを要求する可能性があり、frame_size引数を使用してこのデフォルトを上書きすることができます。

ただし、これによってすべてのプロデューサーから20MBのチャンクサイズを受け取ることが保証されるわけではありません。必要な予防措置を取るために、一時バッファを保存することもできます。

execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
  # 20MBをバイト単位で指定
  frame_size = 20971520 
  block_list = closure["block_list"].presence || []
  next_from = closure["next_from"].presence || 0
  buffer = ""

  # workato.stream.inを呼び出すと、入力がファイルであるループで実行されます。
  # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます。
  workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
    # チャンクをバッファに保存
    buffer << chunk
    if !eof && buffer.size < frame_size
      next
    end
    block_id = workato.uuid.encode_base64
    block_list << block_id
    put(input['url']).p
      arams("comp": "block", "blockid": block_id).r
      equest_body(buffer).
      presence # presenceは、HTTPリクエストを送信するための方法として必要です。

    # バッファをリセット
    buffer = ""

    # ファイルの終わりでない場合は、checkpointを呼び出します。
    checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
  end

  payload = {
    "Latest": block_list
  }
  
  { 
    "Etag" => put(input['url']).
                params("comp": "blocklist").
                payload(payload).r
                equest_format_xml("BlockList").
                response_format_raw.
                after_response do |code, body, header|
                  header['Etag']
                end
  } 
end


Last updated: 2024/2/13 16:59:53