Apache Kafka - トピック内の新規メッセージトリガー
New message in topicトリガーは、選択したKafkaトピックの新しいKafkaメッセージを取得します。 各メッセージで1つのジョブが作成されます。 AVROまたはProtobuf形式でKafkaメッセージおよびキーのスキーマを選択できます。
初期オフセットを設定して、Kafkaメッセージの消費を開始する位置を定義できます。 1つのジョブで複数のメッセージを処理するバッチ処理には、トピック内の新規メッセージトリガー(バッチ)を使用します。
更新済みトリガーバージョン
このトリガーには、オンプレミスエージェントバージョン2.20.0以降が必要です。このトリガーの以前のバージョンは非推奨です。
古いバージョンを使用している場合は、エージェントをアップグレードし、このトリガーを使用するようにレシピを更新します。 このバージョンでは、次が提供されます:
- キューに格納されたメッセージがある状態でレシピが停止された場合の信頼性の向上
- レシピのダウンタイム後のメッセージ消費処理の改善
入力
| 入力フィールド | 説明 |
|---|---|
| トピック | サブスクライブするKafkaトピックを選択します。 トリガーは、選択したトピックとそのすべてのパーティションからすべてのメッセージを消費します。 レシピ内でパーティション別にメッセージをフィルタリングできます。 |
| メッセージスキーマソース | メッセージスキーマソースを定義します。 オプションは次のとおりです:
|
| メッセージスキーマ | Message schema sourceの選択内容に基づいてメッセージスキーマを選択します。 |
| Protobufメッセージスキーマ | スキーマレジストリからProtobufメッセージスキーマを選択します。 このフィールドは、メッセージスキーマソースとしてCommon data model - Protobufが選択されている場合に表示されます。 |
| キースキーマタイプ | メッセージキーのスキーマタイプを設定します。 オプションは次のとおりです:
|
| キースキーマ | 使用可能なスキーマからキースキーマを選択します。 このフィールドは、キースキーマタイプとしてSchema registryが選択されている場合に表示されます。 |
| 初期オフセット | 選択したトピックからKafkaメッセージを消費する開始点を選択します。 オプションは次のとおりです:
|
初期オフセットはトピックごとに1回のみ適用されます
Initial offsetフィールドは、選択したTopicに対してレシピを初めて実行する場合にのみ適用されます。
たとえば、レシピを停止し、ダウンタイム中に生成されたすべてのメッセージを無視するには、次の手順を実行します:
1
レシピを停止します。
2
Topicを変更するか、レシピを複製します。
3
Initial offsetフィールドでLatestを選択します。
4
編集済みまたは複製済みのレシピを開始します。
出力
このトリガーの出力は、1つのKafkaメッセージです。 ダウンストリームステップで出力を使用するには、関連するデータピルをマッピングします。
| 出力フィールド | 説明 |
|---|---|
| メッセージ | 選択したKafkaトピックから消費されたデータを含むメッセージフィールド。 |
| メッセージヘッダー | メッセージとともにヘッダーとして保存されるキーと値のペアのリスト。 |
| サイズ | 消費されたKafkaメッセージのサイズ。 |
| タイムスタンプ | Kafkaメッセージのタイムスタンプ。 |
| キー | データを含むKafkaメッセージキーのフィールド。 |
| パーティション | Kafkaメッセージが消費されたパーティションID。 |
| オフセット | Kafka内の一意のメッセージ識別子であるメッセージオフセット。 |
Last updated: