Apache NiFi 2.x处理器:PublishKafka 2.2.0
包
org.apache.nifi | nifi-kafka-nar
描述
使用Kafka Producer API将FlowFile的内容作为消息或单个记录发送到Apache Kafka。要发送的消息可以是单个FlowFile,也可以使用用户指定的分隔符(如换行符)进行分隔,或者是配置的记录读取器能够读取的面向记录的数据。用于获取消息的互补NiFi处理器是ConsumeKafka。在使用
PublishStrategy.USE_WRAPPER时,要生成Kafka墓碑消息,只需将记录的值设置为“null”即可。
标签
Apache、Kafka、消息、发布/订阅、放置、记录、发送、avro、csv、json、日志
输入要求
必填
支持敏感动态属性
否
属性
交付保证
指定确保消息发送到Kafka的要求。对应Kafka客户端的acks属性。
显示名称 | 交付保证 |
描述 | 指定确保消息发送到Kafka的要求。对应Kafka客户端的acks属性。 |
API名称 | acks |
默认值 | all |
允许值 | 保证复制交付 |
表达式语言作用域
不支持
敏感
否
必填
是
压缩类型
指定发送到Kafka的记录的压缩策略。对应Kafka客户端的compression.type属性。
显示名称 | 压缩类型 |
描述 | 指定发送到Kafka的记录的压缩策略。对应Kafka客户端的compression.type属性。 |
API名称 | compression.type |
默认值 | none |
允许值 | none |
表达式语言作用域
不支持
敏感
否
必填
是
失败策略
指定处理器在无法将数据发布到Kafka时如何处理FlowFile。
显示名称 | 失败策略 |
描述 | 指定处理器在无法将数据发布到Kafka时如何处理FlowFile。 |
API名称 | Failure Strategy |
默认值 | Route to Failure |
允许值 | 路由到失败 |
表达式语言作用域
不支持
敏感
否
必填
是
FlowFile属性头模式
一个正则表达式,用于与所有FlowFile属性名称进行匹配。任何名称与该模式匹配的属性都将作为消息头添加到Kafka消息中。如果未指定,则不会将FlowFile属性添加为消息头。
显示名称 | FlowFile属性头模式 |
描述 | 一个正则表达式,用于与所有FlowFile属性名称进行匹配。任何名称与该模式匹配的属性都将作为消息头添加到Kafka消息中。如果未指定,则不会将FlowFile属性添加为消息头。 |
API名称 | FlowFile Attribute Header Pattern |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 否 |
依赖项
发布策略设置为[USE_VALUE]中的任意一个。
头编码
对于作为Kafka记录头添加的任何属性,此属性指示用于序列化消息头的字符编码。
显示名称 | 头编码 |
描述 | 对于作为Kafka记录头添加的任何属性,此属性指示用于序列化消息头的字符编码。 |
API名称 | Header Encoding |
默认值 | UTF-8 |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 是 |
依赖项
FlowFile属性头模式设置为任何指定值。
Kafka连接服务
为发布Kafka记录提供到Kafka代理的连接。
显示名称 | Kafka连接服务 |
描述 | 为发布Kafka记录提供到Kafka代理的连接。 |
API名称 | Kafka Connection Service |
服务接口 | org.apache.nifi.kafka.service.api.KafkaConnectionService |
服务实现 | org.apache.nifi.kafka.service.Kafka3ConnectionService |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 是 |
Kafka键
用于消息的键。如果未指定,则在FlowFile属性“kafka.key”存在时,将其用作消息键。请注意,同时设置Kafka键和分隔符可能会导致许多Kafka消息具有相同的键。通常这不是问题,因为Kafka不强制或假设消息和键的唯一性。但是,同时设置分隔符和Kafka键会在Kafka上存在数据丢失的风险。在Kafka进行主题压缩时,消息将基于此键进行去重。
显示名称 | Kafka键 |
描述 | 用于消息的键。如果未指定,则在FlowFile属性“kafka.key”存在时,将其用作消息键。请注意,同时设置Kafka键和分隔符可能会导致许多Kafka消息具有相同的键。通常这不是问题,因为Kafka不强制或假设消息和键的唯一性。但是,同时设置分隔符和Kafka键会在Kafka上存在数据丢失的风险。在Kafka进行主题压缩时,消息将基于此键进行去重。 |
API名称 | Kafka Key |
表达式语言作用域 | 环境变量和FlowFile属性 |
敏感 | 否 |
必填 | 否 |
Kafka键属性编码
发出的FlowFile具有名为“kafka.key”的属性。此属性规定了该属性的值应如何编码。
显示名称 | Kafka键属性编码 |
描述 | 发出的FlowFile具有名为“kafka.key”的属性。此属性规定了该属性的值应如何编码。 |
API名称 | Kafka Key Attribute Encoding |
默认值 | utf-8 |
允许值 | UTF-8编码 |
表达式语言作用域
不支持
敏感
否
必填
是
依赖项
发布策略设置为[USE_WRAPPER]中的任意一个。
最大请求大小
请求的最大字节数。对应Kafka客户端的max.request.size属性。
显示名称 | 最大请求大小 |
描述 | 请求的最大字节数。对应Kafka客户端的max.request.size属性。 |
API名称 | max.request.size |
默认值 | 1 MB |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 是 |
消息分隔符
指定用于在单个FlowFile中分隔多个消息的字符串(解释为UTF-8)。如果未指定,则FlowFile的整个内容将作为单个消息使用。如果指定,则FlowFile的内容将根据此分隔符进行拆分,并将每个部分作为单独的Kafka消息发送。要输入特殊字符(如换行符),请根据操作系统使用CTRL+Enter或Shift+Enter。
显示名称 | 消息分隔符 |
描述 | 指定用于在单个FlowFile中分隔多个消息的字符串(解释为UTF-8)。如果未指定,则FlowFile的整个内容将作为单个消息使用。如果指定,则FlowFile的内容将根据此分隔符进行拆分,并将每个部分作为单独的Kafka消息发送。要输入特殊字符(如换行符),请根据操作系统使用CTRL+Enter或Shift+Enter。 |
API名称 | Message Demarcator |
表达式语言作用域 | 环境变量和FlowFile属性 |
敏感 | 否 |
必填 | 否 |
消息键字段
输入记录中应作为Kafka消息键使用的字段名称。
显示名称 | 消息键字段 |
描述 | 输入记录中应作为Kafka消息键使用的字段名称。 |
API名称 | Message Key Field |
表达式语言作用域 | 环境变量和FlowFile属性 |
敏感 | 否 |
必填 | 否 |
依赖项
发布策略设置为[USE_VALUE]中的任意一个。
分区
指定记录的Kafka分区目标。
显示名称 | 分区 |
描述 | 指定记录的Kafka分区目标。 |
API名称 | partition |
表达式语言作用域 | 环境变量和FlowFile属性 |
敏感 | 否 |
必填 | 否 |
分区器类
指定用于计算消息分区ID的类。对应Kafka客户端的partitioner.class属性。
显示名称 | 分区器类 |
描述 | 指定用于计算消息分区ID的类。对应Kafka客户端的partitioner.class属性。 |
API名称 | partitioner.class |
默认值 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
允许值 | RoundRobinPartitioner |
表达式语言作用域
不支持
敏感
否
必填
是
发布策略
用于将传入的FlowFile记录发布到Kafka的格式。
显示名称 | 发布策略 |
描述 | 用于将传入的FlowFile记录发布到Kafka的格式。 |
API名称 | Publish Strategy |
默认值 | USE_VALUE |
允许值 | 使用内容作为记录值 |
表达式语言作用域
不支持
敏感
否
必填
是
依赖项
记录读取器设置为任何指定值。
记录键写入器
用于输出FlowFile的记录键写入器。
显示名称 | 记录键写入器 |
描述 | 用于输出FlowFile的记录键写入器。 |
API名称 | Record Key Writer |
服务接口 | org.apache.nifi.serialization.RecordSetWriterFactoryService |
服务实现 | org.apache.nifi.avro.AvroRecordSetWriter |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 否 |
依赖项
发布策略设置为[USE_WRAPPER]中的任意一个。
记录元数据策略
指定记录的元数据(主题和分区)应来自记录的元数据字段,还是来自配置的主题名称和分区/分区器类属性。
显示名称 | 记录元数据策略 |
描述 | 指定记录的元数据(主题和分区)应来自记录的元数据字段,还是来自配置的主题名称和分区/分区器类属性。 |
API名称 | Record Metadata Strategy |
默认值 | FROM_PROPERTIES |
允许值 | 从记录获取元数据 |
表达式语言作用域
不支持
敏感
否
必填
是
依赖项
发布策略设置为[USE_WRAPPER]中的任意一个。
记录读取器
用于传入FlowFile的记录读取器。
显示名称 | 记录读取器 |
描述 | 用于传入FlowFile的记录读取器。 |
API名称 | Record Reader |
服务接口 | org.apache.nifi.serialization.RecordReaderFactoryService |
服务实现 | org.apache.nifi.avro.AvroReader |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 否 |
记录写入器
用于在发送到Kafka之前序列化数据的记录写入器。
显示名称 | 记录写入器 |
描述 | 用于在发送到Kafka之前序列化数据的记录写入器。 |
API名称 | Record Writer |
服务接口 | org.apache.nifi.serialization.RecordSetWriterFactoryService |
服务实现 | org.apache.nifi.avro.AvroRecordSetWriter |
表达式语言作用域 | 不支持 |
敏感 | 否 |
必填 | 否 |
主题名称
处理器向其发布Kafka记录的Kafka主题的名称。
显示名称 | 主题名称 |
描述 | 处理器向其发布Kafka记录的Kafka主题的名称。 |
API名称 | Topic Name |
表达式语言作用域 | 环境变量和FlowFile属性 |
敏感 | 否 |
必填 | 是 |
事务ID前缀
指定KafkaProducer配置的transactional.id将是一个生成的UUID,并将使用配置的字符串作为前缀。
显示名称 | 事务ID前缀 |
描述 | 指定KafkaProducer配置的transactional.id将是一个生成的UUID,并将使用配置的字符串作为前缀。 |
API名称 | Transactional ID Prefix |
表达式语言作用域 | 在JVM级别定义的环境变量和系统属性 |
敏感 | 否 |
必填 | 否 |
依赖项
事务启用设置为[true]中的任意一个。
事务启用
指定在与Kafka通信时是否提供事务保证。如果在发送数据到Kafka时出现问题,并且此属性设置为false,则已发送到Kafka的消息将继续传递给消费者。如果设置为true,则Kafka事务将回滚,以便这些消息不会提供给消费者。将此属性设置为true要求将[交付保证]属性设置为[保证复制交付]。
显示名称 | 事务启用 |
描述 | 指定在与Kafka通信时是否提供事务保证。如果在发送数据到Kafka时出现问题,并且此属性设置为false,则已发送到Kafka的消息将继续传递给消费者。如果设置为true,则Kafka事务将回滚,以便这些消息不会提供给消费者。将此属性设置为true要求将[交付保证]属性设置为[保证复制交付]。 |
API名称 | Transactions Enabled |
默认值 | true |
允许值 | true |
表达式语言作用域
不支持
敏感
否
必填
是
关系
名称 | 描述 |
success | 所有内容都已发送到Kafka的FlowFile。 |
failure | 任何无法发送到Kafka的FlowFile都将被路由到此关系。 |
读取属性
名称 | 描述 |
kafka.tombstone | 如果此属性设置为“true”,并且处理器未配置分隔符,且FlowFile的内容为空,则将向Kafka发送一个零字节的墓碑消息。 |
写入属性
名称 | 描述 |
msg.count | 为该FlowFile发送到Kafka的消息数量。此属性仅添加到路由到success的FlowFile中。 |
另请参阅
org.apache.nifi.kafka.processors.ConsumeKafka