Apache Kafka - トピック内の新規メッセージトリガー

このページは機械翻訳により提供されています。翻訳内容と英語版に相違がある場合は、英語版が優先されます。

New message in topicトリガーは、選択したKafkaトピックの新しいKafkaメッセージを取得します。 各メッセージで1つのジョブが作成されます。 AVROまたはProtobuf形式でKafkaメッセージおよびキーのスキーマを選択できます。

初期オフセットを設定して、Kafkaメッセージの消費を開始する位置を定義できます。 1つのジョブで複数のメッセージを処理するバッチ処理には、トピック内の新規メッセージトリガー(バッチ)を使用します。

更新済みトリガーバージョン

このトリガーには、オンプレミスエージェントバージョン2.20.0以降が必要です。このトリガーの以前のバージョンは非推奨です。

古いバージョンを使用している場合は、エージェントをアップグレードし、このトリガーを使用するようにレシピを更新します。 このバージョンでは、次が提供されます:

  • キューに格納されたメッセージがある状態でレシピが停止された場合の信頼性の向上
  • レシピのダウンタイム後のメッセージ消費処理の改善

入力

入力フィールド説明
トピックサブスクライブするKafkaトピックを選択します。 トリガーは、選択したトピックとそのすべてのパーティションからすべてのメッセージを消費します。 レシピ内でパーティション別にメッセージをフィルタリングできます。
メッセージスキーマソースメッセージスキーマソースを定義します。 オプションは次のとおりです:

  • Common data model - Workatoで定義されたスキーマを使用します
  • Schema registry - Kafkaクラスターで定義されたスキーマを使用します。 schema.registry.urlプロパティを設定する必要があります
  • Common data model - Protobuf - Workatoで定義されたProtobufスキーマを使用します
WorkatoはAVROおよびProtobuf形式のスキーマをサポートしています。
メッセージスキーマMessage schema sourceの選択内容に基づいてメッセージスキーマを選択します。
ProtobufメッセージスキーマスキーマレジストリからProtobufメッセージスキーマを選択します。

このフィールドは、メッセージスキーマソースとしてCommon data model - Protobufが選択されている場合に表示されます。
キースキーマタイプメッセージキーのスキーマタイプを設定します。 オプションは次のとおりです:

  • Plaintext - 単純な文字列キーを使用します
  • Schema registry - Kafkaクラスターのスキーマを使用します。 schema.registry.urlプロパティを設定する必要があります
WorkatoはAVROおよびProtobuf形式のスキーマをサポートしています。
キースキーマ使用可能なスキーマからキースキーマを選択します。

このフィールドは、キースキーマタイプとしてSchema registryが選択されている場合に表示されます。
初期オフセット選択したトピックからKafkaメッセージを消費する開始点を選択します。 オプションは次のとおりです:

  • Earliest - Kafkaクラスター内の使用可能なすべてのメッセージを取得します
  • Latest - すべての履歴メッセージを無視し、選択したKafkaトピックから新規メッセージのみを消費します

初期オフセットはトピックごとに1回のみ適用されます

Initial offsetフィールドは、選択したTopicに対してレシピを初めて実行する場合にのみ適用されます。

たとえば、レシピを停止し、ダウンタイム中に生成されたすべてのメッセージを無視するには、次の手順を実行します:

1

レシピを停止します。

2

Topicを変更するか、レシピを複製します。

3

Initial offsetフィールドでLatestを選択します。

4

編集済みまたは複製済みのレシピを開始します。

出力

このトリガーの出力は、1つのKafkaメッセージです。 ダウンストリームステップで出力を使用するには、関連するデータピルをマッピングします。

出力フィールド説明
メッセージ選択したKafkaトピックから消費されたデータを含むメッセージフィールド。
メッセージヘッダーメッセージとともにヘッダーとして保存されるキーと値のペアのリスト。
サイズ消費されたKafkaメッセージのサイズ。
タイムスタンプKafkaメッセージのタイムスタンプ。
キーデータを含むKafkaメッセージキーのフィールド。
パーティションKafkaメッセージが消費されたパーティションID。
オフセットKafka内の一意のメッセージ識別子であるメッセージオフセット。

Last updated: