Apache Kafka - New messages in topicトリガー(バッチ)

このページは機械翻訳により提供されています。翻訳内容と英語版に相違がある場合は、英語版が優先されます。

New messages in topicバッチトリガーは、選択したKafkaトピックの新規Kafkaメッセージをバッチで取得します。 AVROまたはProtobuf形式でKafkaメッセージおよびキーのスキーマを選択できます。

さらに、Kafkaメッセージの消費を開始する位置を定義するために初期オフセットを設定できます。 バッチサイズは、ジョブあたり1~100メッセージに設定できます。

更新済みトリガーバージョン

このトリガーには、オンプレミスエージェントバージョン2.20.0以降が必要です。このトリガーの以前のバージョンは非推奨です。

古いバージョンを使用している場合は、エージェントをアップグレードし、このトリガーを使用するようにレシピを更新します。 このバージョンでは、次が提供されます:

  • キューに格納されたメッセージがある状態でレシピが停止された場合の信頼性の向上
  • レシピのダウンタイム後のメッセージ消費処理の改善

入力

入力フィールド説明
トピックサブスクライブするKafkaトピックを選択します。 トリガーは、選択したトピックとそのすべてのパーティションからすべてのメッセージを消費します。 レシピ内でパーティション別にメッセージをフィルタリングできます。
メッセージスキーマソースメッセージスキーマソースを定義します。 オプションは次のとおりです:

  • Common data model - Workatoで定義されたスキーマを使用します
  • Schema registry - Kafkaクラスターで定義されたスキーマを使用します。 schema.registry.urlプロパティを設定する必要があります
  • Common data model - Protobuf - Workatoで定義されたProtobufスキーマを使用します
WorkatoはAVROおよびProtobuf形式のスキーマをサポートしています。
メッセージスキーマMessage schema sourceの選択内容に基づいてメッセージスキーマを選択します。
ProtobufメッセージスキーマスキーマレジストリからProtobufメッセージスキーマを選択します。

このフィールドは、メッセージスキーマソースとしてCommon data model - Protobufが選択されている場合に表示されます。
キースキーマタイプメッセージキーのスキーマタイプを設定します。 オプションは次のとおりです:

  • Plaintext - 単純な文字列キーを使用します
  • Schema registry - Kafkaクラスターのスキーマを使用します。 schema.registry.urlプロパティを設定する必要があります
WorkatoはAVROおよびProtobuf形式のスキーマをサポートしています。
キースキーマ使用可能なスキーマからキースキーマを選択します。

このフィールドは、キースキーマタイプとしてSchema registryが選択されている場合に表示されます。
初期オフセット選択したトピックからKafkaメッセージを消費する開始点を選択します。 オプションは次のとおりです:

  • Earliest - Kafkaクラスター内の使用可能なすべてのメッセージを取得します
  • Latest - すべての履歴メッセージを無視し、選択したKafkaトピックから新規メッセージのみを消費します
バッチサイズバッチごとに取得するメッセージ数を指定します。 バッチサイズは1~100メッセージの範囲です。 デフォルトは100です。

初期オフセットはトピックごとに1回のみ適用されます

Initial offsetフィールドは、選択したTopicに対してレシピを初めて実行する場合にのみ適用されます。

たとえば、レシピを停止し、ダウンタイム中に生成されたすべてのメッセージを無視するには、次の手順を実行します:

1

レシピを停止します。

2

Topicを変更するか、レシピを複製します。

3

Initial offsetフィールドでLatestを選択します。

4

編集済みまたは複製済みのレシピを開始します。

出力

このトリガーの出力はKafkaメッセージのバッチです。 バッチサイズは、Batch size入力設定によって決まります。 ダウンストリームステップで出力を使用するには、関連するデータピルをマッピングします。

出力フィールド説明
レコードKafkaトピックから消費されたメッセージのバッチを含むリスト。
メッセージ(レコード)選択したKafkaトピックから消費されたデータを含むメッセージフィールド。
キー(レコード)データを含むKafkaメッセージキーのフィールド。
メッセージヘッダー(レコード)メッセージとともにヘッダーとして保存されるキーと値のペアのリスト。
キー(メッセージヘッダー)ヘッダーキー名。
値(メッセージヘッダー)ヘッダー値。
サイズ(レコード)消費されたKafkaメッセージのサイズ。
タイムスタンプ(レコード)Kafkaメッセージのタイムスタンプ。
パーティション(レコード)Kafkaメッセージが消費されたパーティションID。
オフセット(レコード)Kafka内の一意のメッセージ識別子であるメッセージオフセット。

Last updated: