
本文旨在解决在使用 Docker Compose 部署 Kafka 集群时,应用程序无法向 Kafka 主题发送消息的问题。我们将分析常见的配置错误,并提供修改建议,确保生产者能够正确连接到 Kafka Broker,从而成功发送消息。通过调整 Kafka 的监听器配置以及生产者端的 Broker 地址,可以有效解决此类连接问题。
在使用 Docker Compose 部署 Kafka 集群时,生产者无法发送消息到 Kafka 主题,并出现类似 Topic general-events not present in metadata after 60000 ms 的错误,通常是由于以下原因导致:
针对上述问题,可以采取以下步骤进行排查和解决:
Kafka 的 KAFKA_ADVERTISED_LISTENERS 环境变量至关重要,它决定了 Kafka Broker 如何向客户端公布自己的地址。确保该配置正确反映了客户端访问 Kafka 的方式。
在 docker-compose.yml 文件中,检查 Kafka 服务的 environment 部分:
kafka:
image: confluentinc/cp-kafka:6.1.1
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
expose:
- '29092'
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_MIN_INSYNC_REPLICAS: '1'注意事项:
生产者需要使用正确的 Broker 地址才能成功连接到 Kafka 集群。如果 Kafka Broker 的地址发生变化,或者生产者使用了错误的地址,就会导致连接失败。
在生产者代码中,检查 bootstrap.servers 配置项。确保它指向正确的 Kafka Broker 地址。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable {
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
private static final String TOPIC_NAME = "general-events";
private KafkaProducer<Long, String> kafkaProducer = null;
private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER";
public Producer() {
LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
Properties kafkaProps = new Properties();
// 使用环境变量或默认值配置 Kafka Broker 地址
String defaultClusterValue = "kafka:29092"; // 修改为 kafka:29092,容器内部访问
String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");
this.kafkaProducer = new KafkaProducer<>(kafkaProps);
}
}注意事项:
确保生产者容器和 Kafka 容器在同一个 Docker 网络中。可以使用 docker network inspect <network_name> 命令来检查容器的网络配置。
如果生产者容器和 Kafka 容器不在同一个网络中,可以使用 docker network connect <network_name> <container_name> 命令将生产者容器连接到 Kafka 容器所在的网络。
虽然 init-kafka 服务创建了 topic,但仍建议再次确认 topic 是否成功创建。可以在 Kafka 容器内部执行以下命令:
docker exec -it kafka bash kafka-topics --bootstrap-server kafka:29092 --list
确认列表中包含 general-events topic。
解决 Docker Compose 中 Kafka 消息发送失败的问题,关键在于确保 Kafka 的监听器配置正确,生产者使用正确的 Broker 地址,以及容器之间的网络连接正常。通过仔细检查这些配置,可以有效地解决此类连接问题,确保应用程序能够成功地向 Kafka 主题发送消息。
以上就是解决 Docker Compose 中 Kafka 消息发送失败问题的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号