首页 > Java > java教程 > 正文

在 Apache Flink 中消费带键 Kafka 记录的实践教程

心靈之曲
发布: 2025-11-05 17:43:14
原创
900人浏览过

在 Apache Flink 中消费带键 Kafka 记录的实践教程

本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。

1. 理解带键 Kafka 记录及其重要性

在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:

  • 消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。
  • 状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。
  • 数据路由:消费者可以根据键来过滤或路由消息。

当使用 kafka-console-producer.sh 并指定 --property "parse.key=true" --property "key.separator=:" 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。

2. Flink KafkaSource 的默认行为与限制

Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。

以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;

public class FlinkValueOnlyKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics("test3")
                .setGroupId("1")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();

        env.execute("Flink Value-Only Kafka Consumer");
    }
}
登录后复制

3. 自定义 KafkaRecordDeserializationSchema 读取带键记录

要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。

3.1 定义自定义反序列化器

首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3 对象中输出。

Upscale
Upscale

AI图片放大工具

Upscale 85
查看详情 Upscale
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import java.io.IOException;

/**
 * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。
 * 输出类型为 Tuple3<Key, Value, Timestamp>
 */
public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema<Tuple3<String, String, Long>> {

    // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化
    private transient StringDeserializer keyDeserializer;
    private transient StringDeserializer valueDeserializer;

    /**
     * 在反序列化器初始化时调用,用于设置内部状态。
     * 通常在这里初始化 Kafka 客户端的反序列化器。
     */
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器
        // 假设键和值都是字符串,使用 StringDeserializer
        keyDeserializer = new StringDeserializer();
        valueDeserializer = new StringDeserializer();
    }

    /**
     * 核心反序列化逻辑。
     *
     * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。
     * @param out    用于收集反序列化结果的 Collector。
     * @throws IOException 如果反序列化过程中发生 I/O 错误。
     */
    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Tuple3<String, String, Long>> out) throws IOException {
        // 反序列化键
        String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null;
        // 反序列化值
        String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null;
        // 获取时间戳
        long timestamp = record.timestamp();

        // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出
        out.collect(new Tuple3<>(key, value, timestamp));
    }

    /**
     * 返回此反序列化器生产的数据类型信息。
     * Flink 使用此信息进行类型检查和序列化。
     */
    @Override
    public TypeInformation<Tuple3<String, String, Long>> getProducedType() {
        // 使用 TypeHint 来获取泛型类型信息
        return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint<Tuple3<String, String, Long>>() {});
    }
}
登录后复制

注意事项:

  • open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。
  • deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。
  • getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。

3.2 在 Flink KafkaSource 中使用自定义反序列化器

接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkKeyedKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址
        String topic = "test3";
        String groupId = "1";

        // 构建 KafkaSource,并指定我们自定义的反序列化器
        KafkaSource<Tuple3<String, String, Long>> source = KafkaSource.<Tuple3<String, String, Long>>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器
                .build();

        // 从 KafkaSource 创建数据流
        DataStream<Tuple3<String, String, Long>> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");

        // 对数据流进行操作,现在可以访问键、值和时间戳
        stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)
              .print();

        // 执行 Flink 作业
        env.execute("Flink Keyed Kafka Consumer");
    }
}
登录后复制

3.3 Kafka 生产者示例(用于测试)

为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
登录后复制

然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。

4. 总结

通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。

以上就是在 Apache Flink 中消费带键 Kafka 记录的实践教程的详细内容,更多请关注php中文网其它相关文章!

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号