ハウツーガイド: ファイルストリーミングによるファイルのアップロード(チャンクID)

このページは機械翻訳により提供されています。翻訳内容と英語版に相違がある場合は、英語版が優先されます。

このセグメントでは、ファイルストリーミングを通じてターゲットアプリケーションにファイルをアップロードし、チャンクにIDを割り当てるアクションの作成について説明します。 ここではAzure BlobのAPIを取り上げますが、この方法は、他の多くの高品質なクラウドストレージソリューションにも拡張できます。

アクションのタイムアウト

SDKアクションには180秒のタイムアウト制限があります。

タイムアウト制限を超過するファイルを転送するには、ファイルストリーミングアクションでcheckpoint!メソッドを使用できます。 追加情報については、マルチステップフレームワークを使用してアップロード時間を延長するセクションを参照してください。

サンプルコネクター

ruby
{
  title: 'Upload to Azure Blob Friend URL',

  # More connector code here
  actions: {
    upload_to_url: {
      input_fields: lambda do |_object_definitions|
        [
          { name: "file", type: "stream" }, # field type must be stream
          { name: "url", label: "Any friendly URL" }
        ]
      end,

      execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
        block_list = []
        # Calling workato.stream.in runs in a loop where the input should be file.
        # It can accept both entire files or the output of a streaming-enabled download file action
        workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
          block_id = workato.uuid.encode_base64
          block_list << block_id
          put(input['url']).
            params("comp": "block", "blockid": block_id).
            request_body(chunk).
            presence # presence is required as a way to force the HTTP request to be sent.
        end

        payload = {
          "Latest": block_list
        }

        {
          "Etag" => put(input['url']).
                      params("comp": "blocklist").
                      payload(payload).
                      request_format_xml("BlockList").
                      response_format_raw.
                      after_response do |code, body, header|
                        header['Etag']
                      end
        }

      end,

      output_fields: lambda do |object_definitions|
        [
          { name: "Etag", type: "string" }
        ]
      end
    }
  }
  # More connector code here
}

ステップ1: アクションのタイトル、サブタイトル、説明、ヘルプ

優れたアクションを作成するための最初のステップは、アクションが何を行い、どのように実行するかを適切に伝え、ユーザーに追加のヘルプを提供することです。 これを行うために、Workatoではアクションのタイトル、サブタイトル、説明を定義し、ヒントを提供できます。 簡単に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を示します。 アクションの説明には、そのアクションが何を実現するかについての仕様と説明、および接続先アプリケーションのコンテキストが含まれます。 最後に、ヘルプセグメントでは、アクションを機能させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、SDKリファレンスを参照してください

ステップ2 - 入力フィールドの定義

ruby
  input_fields: lambda do |object_definitions|
    [
      { name: "file", type: "stream" }, # field type must be stream
      { name: "url", label: "Any friendly URL" }
    ]
  end,

このコンポーネントは、オブジェクトをアップロードしようとしているユーザーに表示するフィールドをWorkatoに伝えます。 このコネクターの場合、file_nametypestreamとして定義する必要があるfile、およびこのファイルのアップロード先として使用できるわかりやすいURL用のurl入力を収集します。

ステップ3 - executeキーの定義

executeアクションでは、fileストリーム入力を受け取るworkato.stream.inを定義します。

workato.stream.inを呼び出した後、受信したこの特定のデータチャンクをアップロードする方法を示すブロックを定義する必要があります。 このブロックでは、Azureの要件に従って、Base64エンコードする一意のUUIDを作成します。 このblock_idをブロックの配列に保存し、最後に送信して、このファイル全体をコミットします。

その後、このblock_idとともに、フレンドリなAzure URLにPUTリクエストを送信します。 workato.stream.inは、streamコンシューマーがファイルの終了を示すまで、このブロックをループし続けます。

ストリームが消費された後、ブロックリスト全体を含む最後のPUTリクエストを送信します。 これはAzure BlobのAPIで規定されているXML形式です。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    block_list = []
    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).
        params("comp": "block", "blockid": block_id).
        request_body(chunk).
        presence
    end

    payload = {
      "Latest": block_list
    }

    {
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).
                  request_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    }

  end,

ステップ4 - 出力フィールドの定義

このセクションでは、トリガーの出力として表示するデータピルを指定します。 各データピルのname属性は、executeキーの出力に含まれるキーと一致している必要があります。

ruby
  output_fields: lambda do |object_definitions|
      output_fields: lambda do |object_definitions|
        [
          { name: "Etag", type: "string" }
        ]
      end
  end

バリエーション

マルチステップフレームワークを使用したアップロード時間の延長

workato.stream.inメソッドを定義するときに、from用の追加の名前付きパラメーターを定義できます。これはcheckpoint!メソッドと組み合わせて使用することで、アップロードアクションのタイムアウトをWorkatoの180秒の制限を超えて延長できます。

checkpoint!が呼び出されると、アクションの現在の実行時間が120秒を超えているかどうかを確認し、超えている場合は短い待機期間の後にアクションのタイムアウトを更新します。 これをfrom引数と組み合わせて使用すると、最後のバイトオフセットからどこで続行するかをWorkatoのストリーミングライブラリに伝えることができます。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    block_list = closure["block_list"].presence || []
    next_from = closure["next_from"].presence || 0

    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|

      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).p
        arams("comp": "block", "blockid": block_id).r
        equest_body(chunk).
        presence # presence is required as a way to force the HTTP request to be sent.

      # Call checkpoint unless it is the end of file.
      checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
    end

    payload = {
      "Latest": block_list
    }

    {
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).r
                  equest_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    }
  end

デフォルトの10MBチャンクサイズの調整

WorkatoがAPIからファイルチャンクを取得しようとすると、デフォルトでは10MBのチャンクをリクエストします。 場合によっては、APIがより大きい最小チャンクサイズを必要とすることがあります。その場合は、frame_size引数を使用して独自のチャンクサイズを宣言することで、このデフォルトを上書きできます。

これにより、すべてのプロデューサーから20MBのチャンクサイズを受け取ることが保証されるわけではない点に注意してください。 一時バッファーも保存することで、必要な予防策を講じることができます。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # 20MB in bytes
    frame_size = 20971520
    block_list = closure["block_list"].presence || []
    next_from = closure["next_from"].presence || 0
    buffer = ""

    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
      # save chunk to buffer
      buffer << chunk
      if !eof && buffer.size < frame_size
        next
      end
      block_id = workato.uuid.encode_base64
      block_list << block_id
      put(input['url']).p
        arams("comp": "block", "blockid": block_id).r
        equest_body(buffer).
        presence # presence is required as a way to force the HTTP request to be sent.

      #reset buffer
      buffer = ""

      # Call checkpoint unless it is the end of file.
      checkpoint!(continue: { next_from: next_starting_byte_range, block_list: block_list }) unless eof
    end

    payload = {
      "Latest": block_list
    }

    {
      "Etag" => put(input['url']).
                  params("comp": "blocklist").
                  payload(payload).r
                  equest_format_xml("BlockList").
                  response_format_raw.
                  after_response do |code, body, header|
                    header['Etag']
                  end
    }
  end

Last updated: