
在CentOS系统上整合Hadoop分布式文件系统(HDFS)与Apache Kafka,通常会把Kafka当作数据的生成器或接收器,并且将数据存储到HDFS或者从HDFS获取数据。下面是一个简化版的应用场景,演示了怎样利用Kafka把数据存储到HDFS。
<code>kafka-topics.sh --create --topic order-created-topic --partitions 12 --replication-factor 3 --bootstrap-server localhost:9092</code>
<code>Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<string string=""> producer = new KafkaProducer(props);
producer.send(new ProducerRecord("order-created-topic", orderId, orderJson));
producer.close();
</string></code><code>SparkConf conf = new SparkConf().setAppName("Kafka to HDFS");
conf.setMaster("local[*]");
JavaPairRDD<string string=""> lines = KafkaUtils.createDirectStream(
conf,
"order-created-topic",
new StringDeserializer(),
new StringDeserializer()
).mapToPair(record -> new Tuple2(record.value(), record.key()));
lines.saveAsHadoopFile("/path/to/hdfs/directory",
new TextOutputFormat<string string="">(),
"org.apache.hadoop.mapred.lib.MultipleTextOutputFormat",
new Configuration(false)
);
</string></string></code>请记住,上述代码样本和配置或许得依据实际环境做出改动。在真实应用里,还需要顾及到数据的序列化方法、错误处理、资源配置等细节。另外,对于生产环境,还需考量安全配置,例如SSL/TLS加密以及认证。
以上就是CentOS HDFS与Kafka集成应用案例的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号