# How-toガイド - ファイルストリーミングを介したファイルのアップロード(Content-Rangeヘッダー)

このセグメントでは、ファイルストリーミングを介してターゲットアプリケーションにファイルをアップロードし、Content-Rangeヘッダーを利用するアクションの作成方法について説明します。

# サンプルコネクタ

{
  title: 'URLへのファイルアップロード',

  # 他のコネクタコードがここに入ります
  actions: {
    upload_to_url: {
      input_fields: lambda do |_object_definitions|
        [
          { name: "file_name", type: "string" },
          { name: "file", type: "stream" }, # フィールドタイプはstreamである必要があります
          { name: "url", label: "任意のURL" }
        ]
      end,

      execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
        # workato.stream.inを呼び出すと、入力がファイルであるループが実行されます。
        # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます
        workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
          put(input['url']).
            headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
            request_body(chunk).presence
        end

        # これにより、アップロードが確定します
        post(input['url'], { "commit": true } )
        
      end,
      
      output_fields: lambda do |object_definitions|
        [
          { name: "file_name", type: "string" },
          { name: "file_path", type: "string" },
          { name: "file_size", type: "integer" }
        ]
      end
    }
  }
  # 他のコネクタコードがここに入ります
}

# Step 1 - アクションのタイトル、サブタイトル、説明、およびヘルプの定義

良いアクションを作成するための最初のステップは、アクションが何を行い、どのように行うか、およびユーザーに追加のヘルプを提供することを適切に伝えることです。そのため、Workatoでは、アクションのタイトル、サブタイトル、説明を定義し、アクションの動作に必要な追加情報をユーザーに提供することができます。単純に言えば、タイトルはアクションのタイトルであり、サブタイトルはアクションの詳細を提供します。アクションの説明には、アクションが達成する仕様と説明が含まれ、それが接続するアプリケーションのコンテキストでどのように機能するかが説明されています。最後に、ヘルプセグメントでは、アクションを動作させるために必要な追加情報をユーザーに提供します。

このステップの詳細については、SDKリファレンスをご覧ください。

# Step 2 - 入力フィールドの定義

  input_fields: lambda do |object_definitions|
    [
      { name: "file_name", type: "string" },
      { name: "file", type: "stream" }, # フィールドタイプはstreamである必要があります
      { name: "url", label: "任意のURL" }
    ]
  end,

このコンポーネントは、オブジェクトをアップロードしようとするユーザーに表示するフィールドをWorkatoに伝えます。このコネクタの場合、file_namefiletypestreamで定義されている必要があります)、およびこのファイルをアップロードするためのフレンドリーなURLであるurlの入力を収集します。

# Step 3 - executeキーの定義

executeアクションでは、workato.stream.inを定義し、fileストリーム入力を受け取ります。

workato.stream.inを呼び出した後は、この特定のデータチャンクをアップロードする方法を示すブロックを定義する必要があります。このブロックでは、PUTリクエストをURLに送信し、標準のContent-Rangeヘッダーを使用してアップロードするファイルの一部を示します。workato.stream.inは、streamコンシューマがファイルの終了を指示するまで、このブロックを繰り返し実行し続けます。

ストリームが消費された後、新しいファイルとしてアップロード全体を確定するために、POSTリクエストを送信します。

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # workato.stream.inを呼び出すと、入力がファイルであるループが実行されます。
    # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力のいずれかを受け入れることができます
    workato.stream.in(input["file"]) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(chunk).
        presence # presenceは、HTTPリクエストを送信するための方法として必要です。
    end

    # これにより、アップロードが確定します
    post(input['url'], { "commit": true } )
    
  end,

NOTE

APIがワイルドカード範囲として*を受け入れることを前提としていることに注意してください。ファイルの合計サイズがまだわからない場合、APIがワイルドカード範囲を受け入れない場合は、レシピビルダーから合計ファイルサイズを収集するための入力フィールドを追加する必要があります。

# Step 4 - 出力フィールドの定義

このセクションでは、トリガーの出力として表示するデータピルを示します。各データピルのname属性は、executeキーの出力のキーと一致する必要があります。ここでは、最終的なPOSTリクエストの応答がfile_namefile_path、およびfile_sizeを返すと想定しています。

  output_fields: lambda do |object_definitions|
    [
    ```
{ name: "file_name", type: "string" },
      { name: "file_path", type: "string" },
      { name: "file_size", type: "integer" }
    ]
  end

# バリエーション

# アップロード時間を延長するためにマルチステップフレームワークを使用する

workato.stream.in メソッドを定義する際に、from の追加の名前付きパラメータを定義することができます。これは、checkpoint! メソッドと組み合わせて、Workato の180秒の制限を超えるアップロードアクションのタイムアウトを延長するために使用することができます。

checkpoint! が呼び出されると、アクションの現在の実行時間が120秒を超えているかどうかをチェックし、そうであれば短い待機時間の後にアクションのタイムアウトを更新します。これは、from 引数と組み合わせて、Workato のストリーミングライブラリに最後のバイトオフセットから続行する場所を伝えるために使用することができます。

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    next_from = closure["next_from"].presence || 0
    # workato.stream.in を呼び出すと、入力がファイルであるループ内で実行されます。
    # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力の両方を受け入れることができます。
    workato.stream.in(input["file"], from: next_from) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(chunk).
        presence # presence は、HTTPリクエストを送信するための方法として必要です。

        # ファイルの終わりでない場合は、checkpoint を呼び出します。
        checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
    end

    # これにより、アップロードが確定します
    post(input['url'], { "commit": true } )
    
  end

# デフォルトの10MBのチャンクサイズの調整

Workato がAPIからファイルのチャンクを取得しようとすると、デフォルトで10MBのチャンクをリクエストします。一部の場合では、APIがより大きな最小チャンクサイズを要求する場合があり、frame_size 引数を使用して独自のチャンクサイズを宣言することで、このデフォルトを上書きすることができます。

ただし、これによってすべてのプロデューサから20MBのチャンクサイズを受け取ることが保証されるわけではありません。必要な予防措置を取るために、一時バッファを保存することもできます。

  execute: lambda do |_connection, input, _input_schema, _output_schema, closure|
    # 20MB をバイト単位で表したもの
    frame_size = 20971520 
    next_from = closure["next_from"].presence || 0
    buffer = ""
    # workato.stream.in を呼び出すと、入力がファイルであるループ内で実行されます。
    # ファイル全体またはストリーミング対応のダウンロードファイルアクションの出力の両方を受け入れることができます。
    workato.stream.in(input["file"], from: next_from, frame_size: frame_size) do |chunk, starting_byte_range, ending_byte_range, eof, next_starting_byte_range| 
      # チャンクをバッファに保存
      buffer << chunk

      # ファイルの終わりでなく、バッファが必要なサイズよりも小さい場合は、次の繰り返しにスキップします
      if !eof && buffer.size < frame_size
        next
      end
      put(input['url']).
        headers("Content-Range": "bytes #{starting_byte_range}-#{ending_byte_range}/*").
        request_body(buffer).
        presence # presence は、HTTPリクエストを送信するための方法として必要です。

      # バッファをリセット
      buffer = ""

      # ファイルの終わりでない場合は、checkpoint を呼び出します。
      checkpoint!(continue: { next_from: next_starting_byte_range }) unless eof
    end

    # これにより、アップロードが確定します
    post(input['url'], { "commit": true } )
    
  end


Last updated: 2024/2/13 16:59:53