0

0

如何将键值列表拆分为独立的 Kafka 消息并写入 Topic

碧海醫心

碧海醫心

发布时间:2026-01-20 20:33:08

|

853人浏览过

|

来源于php中文网

原创

如何将键值列表拆分为独立的 Kafka 消息并写入 Topic

本文介绍在 kafka streams 中,如何将包含多个键和值的 list 结构(如 `list` 和 `list`)逐对展开为独立的 `(k, v)` 消息,并分别序列化后写入目标 topic。核心方案是使用 `process()`(v3.0+)或 `transform()`(旧版)自定义处理器实现流式扁平化。

在 Kafka Streams 应用中,当上游数据以批量形式组织(例如一个事件携带 List keys 和 List values),而下游消费者期望接收单键单值的原子消息时,必须对数据流进行「扁平化」(flattening)。原代码中直接调用 selectKey() 和 mapValues() 仅能替换或转换当前记录的键/值,无法生成多条新记录——这是 Kafka Streams 的“一进一出”语义限制。

✅ 正确做法是使用有状态处理算子:process()(推荐,Kafka Streams ≥ 3.0)transform()(旧版),它们允许在 ProcessorContext 中多次调用 context.forward(),从而将单条输入记录映射为多条输出记录。

以下是以 Kafka Streams 3.4+ 为例的完整实现:

Teleporthq
Teleporthq

一体化AI网站生成器,能够快速设计和部署静态网站

下载
// 定义 ProcessorSupplier(推荐使用 lambda + anonymous class 简化)
stream.process(
    () -> new Processor() {
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, GenericRecord value) {
            // 假设 util.fetchKeys/fetchValues 接收原始 value 并返回对应列表
            List keys   = util.fetchKeys(key, value);   // 或仅传 value,依业务而定
            List values = util.fetchValues(value);

            // 安全校验:确保长度一致,避免 IndexOutOfBoundsException
            int size = Math.min(keys.size(), values.size());
            for (int i = 0; i < size; i++) {
                context.forward(
                    keys.get(i),
                    values.get(i),
                    To.all().withTimestamp(context.timestamp()) // 可选:继承原始时间戳
                );
            }
        }
    },
    Named.as("flatten-processor")
).to("out-topic", 
    Produced.with(Serdes.String(), yourAvroValueSerde) // keySerde 与 valueSerde 需匹配实际类型
);

⚠️ 注意事项:

  • 序列化器一致性:Produced.with(...) 中指定的 keySerde 和 valueSerde 必须与 context.forward() 所传对象的实际类型严格匹配(如 String 键配 Serdes.String(),Avro GenericRecord 值配对应的 SpecificAvroSerde 或自定义 Avro Serde)。
  • 空值/长度不匹配防护:务必校验 keys 和 values 列表非空且长度兼容,否则可能抛出 IndexOutOfBoundsException 或静默丢弃数据。
  • 时间戳处理:默认 forward() 使用系统当前时间,若需保留原始事件时间戳,请显式调用 To.all().withTimestamp(context.timestamp())。
  • 状态与容错:该 Processor 无本地状态,因此无需注册 StateStore;若后续需聚合或去重,可扩展为 Transformer 并启用 RocksDB 存储。

? 总结:Kafka Streams 不支持开箱即用的“一对多”映射,但通过 process() 自定义处理器可精准控制每条输入记录产生的输出数量与内容。这是处理嵌套结构、批量解包、协议转换等场景的标准实践。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

201

2024.02.23

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

318

2023.08.02

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

13

2026.01.20

PS使用蒙版相关教程
PS使用蒙版相关教程

本专题整合了ps使用蒙版相关教程,阅读专题下面的文章了解更多详细内容。

60

2026.01.19

java用途介绍
java用途介绍

本专题整合了java用途功能相关介绍,阅读专题下面的文章了解更多详细内容。

87

2026.01.19

java输出数组相关教程
java输出数组相关教程

本专题整合了java输出数组相关教程,阅读专题下面的文章了解更多详细内容。

39

2026.01.19

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

10

2026.01.19

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Django 教程
Django 教程

共28课时 | 3.3万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.2万人学习

Sass 教程
Sass 教程

共14课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号