首先启动Zookeeper,再启动Kafka服务,通过jps检查进程,并使用kafka-topics.sh创建主题,通过生产者和消费者命令行工具测试消息收发,确认服务正常;随后安装librdkafka库和rdkafka PHP扩展,配置php.ini启用扩展,最后通过PHP代码使用RdKafka\Producer和RdKafka\KafkaConsumer类实现消息的发送与接收,关键参数包括bootstrap.servers、group.id、acks、enable.auto.commit等,确保通信正常与消息可靠性。

在CentOS上配置Kafka并搭建PHP客户端,核心步骤包括安装Java环境、部署Zookeeper和Kafka服务器本身,随后在PHP环境中安装并配置
librdkafka
rdkafka
搭建Kafka和PHP客户端,我们通常会按部就班地来。我个人觉得,先搞定服务端,再考虑客户端,这样思路会比较清晰。
1. Java环境准备 Kafka是基于JVM运行的,所以Java是必不可少的。我一般会直接用OpenJDK。
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y java -version # 检查版本,确保是Java 8或更高
确保
JAVA_HOME
2. Zookeeper安装与配置 Kafka依赖Zookeeper来管理集群元数据。
# 下载Zookeeper,我通常会去Apache官网找最新的稳定版 wget https://downloads.apache.org/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz sudo mv apache-zookeeper-3.8.3-bin /opt/zookeeper cd /opt/zookeeper/conf cp zoo_sample.cfg zoo.cfg
编辑
zoo.cfg
dataDir
/var/lib/zookeeper
立即学习“PHP免费学习笔记(深入)”;
# zoo.cfg dataDir=/var/lib/zookeeper clientPort=2181
创建数据目录:
sudo mkdir -p /var/lib/zookeeper
启动Zookeeper:
/opt/zookeeper/bin/zkServer.sh start
你可以通过
jps
QuorumPeerMain
zkCli.sh -server 127.0.0.1:2181
3. Kafka安装与配置 现在轮到Kafka本身了。
# 下载Kafka,同样去官网找最新稳定版 wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar -zxvf kafka_2.13-3.6.1.tgz sudo mv kafka_2.13-3.6.1 /opt/kafka
编辑Kafka的配置文件
server.properties
cd /opt/kafka/config vim server.properties
几个关键配置:
broker.id
listeners
PLAINTEXT://:9092
PLAINTEXT://你的IP地址:9092
0.0.0.0:9092
log.dirs
zookeeper.connect
localhost:2181
启动Kafka:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
同样可以用
jps
Kafka
4. PHP客户端安装与配置 PHP与Kafka交互主要通过
librdkafka
rdkafka
安装librdkafka
# 安装依赖 sudo yum install gcc-c++ make zlib-devel openssl-devel -y # 下载librdkafka,通常用最新稳定版 git clone https://github.com/confluentinc/librdkafka.git cd librdkafka ./configure make sudo make install
如果遇到权限问题,
sudo make install
/usr/local/lib
sudo ldconfig
安装rdkafka
# 确保你安装了php-devel,这是编译PHP扩展的必要组件 sudo yum install php-devel -y # 通过PECL安装rdkafka sudo pecl install rdkafka
安装过程中,它可能会问你
librdkafka
安装成功后,需要在
php.ini
echo "extension=rdkafka.so" | sudo tee -a /etc/php.ini # 或者你的php.ini路径
重启PHP-FPM或Apache/Nginx服务,让配置生效。
sudo systemctl restart php-fpm # 或 apache/nginx
检查扩展是否加载成功:
php -m | grep rdkafka
如果能看到
rdkafka
启动Kafka服务,说起来不复杂,但需要注意顺序。首先,Zookeeper必须先跑起来,因为Kafka启动时会去连接它。我通常的习惯是先确认Zookeeper服务是健康的。
你可以用
zkServer.sh status
Mode: standalone
Mode: follower/leader
我们之前已经通过
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
jps
如果看到
QuorumPeerMain
Kafka
命令行测试Kafka: Kafka自带了一些命令行工具,非常适合做初步的功能测试。
创建Topic:
/opt/kafka/bin/kafka-topics.sh --create --topic my_test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
这里
--bootstrap-server
查看Topic列表:
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
你应该能看到
my_test_topic
生产者测试:
/opt/kafka/bin/kafka-console-producer.sh --topic my_test_topic --bootstrap-server localhost:9092
输入一些消息,比如
Hello Kafka from Producer!
消费者测试: 打开另一个终端窗口,运行消费者:
/opt/kafka/bin/kafka-console-consumer.sh --topic my_test_topic --bootstrap-server localhost:9092 --from-beginning
你将看到生产者发送的消息。如果这些都正常工作,那么恭喜你,Kafka服务在CentOS上已经成功启动并可以进行基本的数据交互了。这套测试流程,我每次部署都会走一遍,确保基础功能没问题。
PHP应用与Kafka的交互,核心就是使用
rdkafka
PHP生产者示例:
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092'); // Kafka broker地址
$conf->set('client.id', 'php-producer-app'); // 客户端ID,方便调试
// 设置消息发送失败时的回调,这对于生产环境很重要
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
// 这里可以记录日志,或者进行重试逻辑
error_log(sprintf("Message delivery failed: %s (%s)", $message->errstr(), $message->err));
} else {
// error_log(sprintf("Message delivered to topic %s [%d] at offset %d", $message->topic_name, $message->partition, $message->offset));
}
});
// 创建生产者实例
$producer = new RdKafka\Producer($conf);
// 获取一个Topic的生产器
$topic = $producer->newTopic("my_test_topic"); // 替换成你的Topic名称
$message = "Hello from PHP Producer! " . date('Y-m-d H:i:s');
$key = "my_key"; // 消息的key,用于分区
// 发送消息
// RD_KAFKA_PARTITION_UA 表示由librdkafka自动选择分区
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message, $key);
echo "Sent message: " . $message . "\n";
// 循环调用poll,等待消息发送结果回调
// 这是一个非阻塞的等待,实际应用中可能需要更复杂的循环或异步处理
for ($i = 0; $i < 10; $i++) {
$producer->poll(100); // Poll for 100ms
if ($producer->getOutQLen() === 0) { // 如果发送队列为空,说明消息已发出
break;
}
}
if ($producer->getOutQLen() > 0) {
echo "Still " . $producer->getOutQLen() . " messages in queue, waiting for delivery.\n";
// 强制等待所有消息发送完成
$producer->flush(10000); // 最多等待10秒
}
if ($producer->getOutQLen() === 0) {
echo "All messages delivered.\n";
} else {
echo "Some messages failed to deliver.\n";
}
?>这里
$producer->poll()
$producer->flush()
poll
PHP消费者示例:
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'php-consumer-group'); // 消费者组ID
// 设置自动提交offset,也可以手动控制
$conf->set('enable.auto.commit', 'true');
$conf->set('auto.offset.reset', 'earliest'); // 如果没有offset,从最早的消息开始消费
// 设置错误回调
$conf->setErrorCb(function ($kafka, $err, $reason) {
error_log(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
// 创建消费者实例
$consumer = new RdKafka\KafkaConsumer($conf);
// 订阅Topic
$consumer->subscribe(['my_test_topic']); // 可以订阅多个Topic
echo "Waiting for messages... (Press Ctrl+C to stop)\n";
while (true) {
$message = $consumer->consume(120*1000); // 最多等待120秒
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
// 收到消息
echo sprintf("Message received: Topic %s, Partition %d, Offset %d, Key %s, Payload: %s\n",
$message->topic_name,
$message->partition,
$message->offset,
$message->key,
$message->payload
);
// 如果是手动提交offset,这里需要调用 $consumer->commit($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 到达分区末尾,但没有新消息
echo "No more messages, waiting for new ones...\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// 等待超时,没有收到消息
echo "Waiting for messages timed out...\n";
break;
default:
// 其他错误
error_log(sprintf("Consumer error: %s (%s)", $message->errstr(), $message->err));
break;
}
}
?>消费者这边,
$consumer->consume()
group.id
配置
rdkafka
bootstrap.servers
host1:port1,host2:port2
client.id
php-web-order-producer
php-analytics-consumer
生产者相关参数:
acks
0
1
all
-1
all
message.timeout.ms
retries
message.send.max.retries
retry.backoff.ms
compression.codec
gzip
snappy
lz4
zstd
消费者相关参数:
group.id
group.id
group.id
group.id
enable.auto.commit
true
rdkafka
false
auto.offset.reset
earliest
latest
none
max.poll.interval.ms
consume()
consume()
这些参数的调整,往往需要结合实际的业务场景和对Kafka的理解。没有一劳永逸的配置,只有最适合你当前系统的配置。我建议在开发和测试阶段就多尝试不同的参数组合,观察其对系统行为的影响。
以上就是CentOS怎么配置Kafka PHP_CentOS搭建Kafka并配置PHP客户端教程的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号