Apache Kafka - New messages in topicトリガー(バッチ)
New messages in topicバッチトリガーは、選択したKafkaトピックの新規Kafkaメッセージをバッチで取得します。 AVROまたはProtobuf形式でKafkaメッセージおよびキーのスキーマを選択できます。
さらに、Kafkaメッセージの消費を開始する位置を定義するために初期オフセットを設定できます。 バッチサイズは、ジョブあたり1~100メッセージに設定できます。
更新済みトリガーバージョン
このトリガーには、オンプレミスエージェントバージョン2.20.0以降が必要です。このトリガーの以前のバージョンは非推奨です。
古いバージョンを使用している場合は、エージェントをアップグレードし、このトリガーを使用するようにレシピを更新します。 このバージョンでは、次が提供されます:
- キューに格納されたメッセージがある状態でレシピが停止された場合の信頼性の向上
- レシピのダウンタイム後のメッセージ消費処理の改善
入力
| 入力フィールド | 説明 |
|---|---|
| トピック | サブスクライブするKafkaトピックを選択します。 トリガーは、選択したトピックとそのすべてのパーティションからすべてのメッセージを消費します。 レシピ内でパーティション別にメッセージをフィルタリングできます。 |
| メッセージスキーマソース | メッセージスキーマソースを定義します。 オプションは次のとおりです:
|
| メッセージスキーマ | Message schema sourceの選択内容に基づいてメッセージスキーマを選択します。 |
| Protobufメッセージスキーマ | スキーマレジストリからProtobufメッセージスキーマを選択します。 このフィールドは、メッセージスキーマソースとしてCommon data model - Protobufが選択されている場合に表示されます。 |
| キースキーマタイプ | メッセージキーのスキーマタイプを設定します。 オプションは次のとおりです:
|
| キースキーマ | 使用可能なスキーマからキースキーマを選択します。 このフィールドは、キースキーマタイプとしてSchema registryが選択されている場合に表示されます。 |
| 初期オフセット | 選択したトピックからKafkaメッセージを消費する開始点を選択します。 オプションは次のとおりです:
|
| バッチサイズ | バッチごとに取得するメッセージ数を指定します。 バッチサイズは1~100メッセージの範囲です。 デフォルトは100です。 |
初期オフセットはトピックごとに1回のみ適用されます
Initial offsetフィールドは、選択したTopicに対してレシピを初めて実行する場合にのみ適用されます。
たとえば、レシピを停止し、ダウンタイム中に生成されたすべてのメッセージを無視するには、次の手順を実行します:
1
レシピを停止します。
2
Topicを変更するか、レシピを複製します。
3
Initial offsetフィールドでLatestを選択します。
4
編集済みまたは複製済みのレシピを開始します。
出力
このトリガーの出力はKafkaメッセージのバッチです。 バッチサイズは、Batch size入力設定によって決まります。 ダウンストリームステップで出力を使用するには、関連するデータピルをマッピングします。
| 出力フィールド | 説明 |
|---|---|
| レコード | Kafkaトピックから消費されたメッセージのバッチを含むリスト。 |
| メッセージ(レコード) | 選択したKafkaトピックから消費されたデータを含むメッセージフィールド。 |
| キー(レコード) | データを含むKafkaメッセージキーのフィールド。 |
| メッセージヘッダー(レコード) | メッセージとともにヘッダーとして保存されるキーと値のペアのリスト。 |
| キー(メッセージヘッダー) | ヘッダーキー名。 |
| 値(メッセージヘッダー) | ヘッダー値。 |
| サイズ(レコード) | 消費されたKafkaメッセージのサイズ。 |
| タイムスタンプ(レコード) | Kafkaメッセージのタイムスタンプ。 |
| パーティション(レコード) | Kafkaメッセージが消費されたパーティションID。 |
| オフセット(レコード) | Kafka内の一意のメッセージ識別子であるメッセージオフセット。 |
Last updated: