ハウツーガイド: ファイルストリーミングによるファイルのアップロード(Content-Rangeヘッダー)

このページは機械翻訳により提供されています。翻訳内容と英語版に相違がある場合は、英語版が優先されます。

このセグメントでは、ファイルストリーミングを通じてContent-Rangeヘッダーを利用し、ターゲットアプリケーションにファイルをアップロードするアクションの作成について説明します。

アクションのタイムアウト

SDKアクションには180秒のタイムアウト制限があります。

タイムアウト制限を超過するファイルを転送するには、ファイルストリーミングアクションでcheckpoint!メソッドを使用できます。 追加情報については、マルチステップフレームワークを使用してアップロード時間を延長するセクションを参照してください。

サンプルコネクター

ruby
{
  title: 'Upload file to URL',

  # More connector code here
  actions: {
    upload_to_url: {
      input_fields: lambda do |_object_definitions|
        [
          { name: "file_name", type: "string" },
          { name: "file", type: "stream" }, # field type must be stream
          { name: "url", label: "Any friendly URL" }
        ]
      end,

      execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
        # Calling workato.stream.in runs in a loop where the input should be file.
        # It can accept both entire files or the output of a streaming-enabled download file action
        workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
          put(input['url']).
            headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
            request_body(chunk).presence
        end

        # This commits the upload
        post(input['url'], { "commit": true } )

      end,

      output_fields: lambda do |object_definitions|
        [
          { name: "file_name", type: "string" },
          { name: "file_path", type: "string" },
          { name: "file_size", type: "integer" }
        ]
      end
    }
  }
  # More connector code here
}

ステップ1: アクションのタイトル、サブタイトル、説明、ヘルプ

優れたアクションを作成するための最初のステップは、アクションが何を行い、どのように実行するかを適切に伝え、ユーザーに追加のヘルプを提供することです。 これを行うために、Workatoではアクションのタイトル、サブタイトル、説明を定義し、ヒントを提供できます。 簡単に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を示します。 アクションの説明には、そのアクションが何を実現するかについての仕様と説明、および接続先アプリケーションのコンテキストが含まれます。 最後に、ヘルプセグメントでは、アクションを機能させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、SDKリファレンスを参照してください

ステップ2 - 入力フィールドの定義

ruby
  input_fields: lambda do |object_definitions|
    [
      { name: "file_name", type: "string" },
      { name: "file", type: "stream" }, # field type must be stream
      { name: "url", label: "Any friendly URL" }
    ]
  end,

このコンポーネントは、オブジェクトをアップロードしようとしているユーザーに表示するフィールドをWorkatoに伝えます。 このコネクターの場合、file_nametypestreamとして定義する必要があるfile、およびこのファイルのアップロード先として使用できるわかりやすいURL用のurl入力を収集します。

ステップ3 - executeキーの定義

executeアクションでは、fileストリーム入力を受け取るworkato.stream.inを定義します。

workato.stream.inを呼び出した後、受信したこの特定のデータチャンクをアップロードする方法を示すブロックを定義する必要があります。 このブロックでは、アップロードしているファイルの部分を示すために、標準のContent-RangeヘッダーとともにURLにPUTリクエストを送信します。 workato.stream.inは、streamコンシューマーがファイルの終了を示すまで、このブロックをループし続けます。

ストリームが消費された後、アップロード全体を新しいファイルとしてコミットするためにPOSTリクエストを送信します。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(chunk).
        presence # presence is required as a way to force the HTTP request to be sent.
    end

    # This commits the upload
    post(input['url'], { "commit": true } )

  end,

INFO

APIが*をワイルドカード範囲として受け入れ、合計ファイルサイズがまだ不明であることを示すものと想定している点に注意してください。 場合によっては、APIがワイルドカード範囲を受け入れないことがあります。その場合は、レシピビルダーから合計ファイルサイズを収集する入力フィールドを追加する必要があります。

ステップ4 - 出力フィールドの定義

このセクションでは、トリガーの出力として表示するデータピルを指定します。 各データピルのname属性は、executeキーの出力に含まれるキーと一致している必要があります。 ここでは、最後のPOSTリクエストからのレスポンスがfile_namefile_pathfile_sizeを返すものと想定しています。

ruby
  output_fields: lambda do |object_definitions|
    [
      { name: "file_name", type: "string" },
      { name: "file_path", type: "string" },
      { name: "file_size", type: "integer" }
    ]
  end

バリエーション

マルチステップフレームワークを使用したアップロード時間の延長

workato.stream.inメソッドを定義するときに、from用の追加の名前付きパラメーターを定義できます。これはcheckpoint!メソッドと組み合わせて使用することで、アップロードアクションのタイムアウトをWorkatoの180秒の制限を超えて延長できます。

checkpoint!が呼び出されると、アクションの現在の実行時間が120秒を超えているかどうかを確認し、超えている場合は短い待機期間の後にアクションのタイムアウトを更新します。 これをfrom引数と組み合わせて使用すると、最後のバイトオフセットからどこで続行するかをWorkatoのストリーミングライブラリに伝えることができます。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    next_from = closure["next_from"].presence || 0
    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(chunk).
        presence # presence is required as a way to force the HTTP request to be sent.

        # Call checkpoint unless it is the end of file.
        checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
    end

    # This commits the upload
    post(input['url'], { "commit": true } )

  end

デフォルトの10MBチャンクサイズの調整

WorkatoがAPIからファイルチャンクを取得しようとすると、デフォルトでは10MBのチャンクをリクエストします。 場合によっては、APIがより大きい最小チャンクサイズを必要とすることがあります。その場合は、frame_size引数を使用して独自のチャンクサイズを宣言することで、このデフォルトを上書きできます。

これにより、すべてのプロデューサーから20MBのチャンクサイズを受け取ることが保証されるわけではない点に注意してください。 一時バッファーも保存することで、必要な予防策を講じることができます。

ruby
  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # 20MB in bytes
    frame_size = 20971520
    next_from = closure["next_from"].presence || 0
    buffer = ""
    # Calling workato.stream.in runs in a loop where the input should be file.
    # It can accept both entire files or the output of a streaming-enabled download file action
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range|
      # save chunk to buffer
      buffer << chunk

      # If not end of file and buffer is less than require size, skip to next iteration
      if !eof && buffer.size < frame_size
        next
      end
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(buffer).
        presence # presence is required as a way to force the HTTP request to be sent.

      #reset buffer
      buffer = ""

      # Call checkpoint unless it is the end of file.
      checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
    end

    # This commits the upload
    post(input['url'], { "commit": true } )

  end

Last updated: