首页 > 运维 > CentOS > 正文

CentOS怎么配置Kafka PHP_CentOS搭建Kafka并配置PHP客户端教程

蓮花仙者
发布: 2025-09-03 12:22:01
原创
660人浏览过
首先启动Zookeeper,再启动Kafka服务,通过jps检查进程,并使用kafka-topics.sh创建主题,通过生产者和消费者命令行工具测试消息收发,确认服务正常;随后安装librdkafka库和rdkafka PHP扩展,配置php.ini启用扩展,最后通过PHP代码使用RdKafka\Producer和RdKafka\KafkaConsumer类实现消息的发送与接收,关键参数包括bootstrap.servers、group.id、acks、enable.auto.commit等,确保通信正常与消息可靠性。

centos怎么配置kafka php_centos搭建kafka并配置php客户端教程

在CentOS上配置Kafka并搭建PHP客户端,核心步骤包括安装Java环境、部署Zookeeper和Kafka服务器本身,随后在PHP环境中安装并配置

librdkafka
登录后复制
库以及
rdkafka
登录后复制
PHP扩展。这是一个相对直接的过程,但细节处理不当很容易遇到各种坑。

解决方案

搭建Kafka和PHP客户端,我们通常会按部就班地来。我个人觉得,先搞定服务端,再考虑客户端,这样思路会比较清晰。

1. Java环境准备 Kafka是基于JVM运行的,所以Java是必不可少的。我一般会直接用OpenJDK。

sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
java -version # 检查版本,确保是Java 8或更高
登录后复制

确保

JAVA_HOME
登录后复制
环境变量也设置好了,虽然很多时候系统会自动处理,但明确一下总是好的。

2. Zookeeper安装与配置 Kafka依赖Zookeeper来管理集群元数据。

# 下载Zookeeper,我通常会去Apache官网找最新的稳定版
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz
sudo mv apache-zookeeper-3.8.3-bin /opt/zookeeper
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
登录后复制

编辑

zoo.cfg
登录后复制
,主要关注
dataDir
登录后复制
,我习惯放在
/var/lib/zookeeper
登录后复制

立即学习PHP免费学习笔记(深入)”;

# zoo.cfg
dataDir=/var/lib/zookeeper
clientPort=2181
登录后复制

创建数据目录:

sudo mkdir -p /var/lib/zookeeper
登录后复制

启动Zookeeper:

/opt/zookeeper/bin/zkServer.sh start
登录后复制

你可以通过

jps
登录后复制
命令查看是否有
QuorumPeerMain
登录后复制
进程,或者用
zkCli.sh -server 127.0.0.1:2181
登录后复制
连接测试。

3. Kafka安装与配置 现在轮到Kafka本身了。

# 下载Kafka,同样去官网找最新稳定版
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -zxvf kafka_2.13-3.6.1.tgz
sudo mv kafka_2.13-3.6.1 /opt/kafka
登录后复制

编辑Kafka的配置文件

server.properties
登录后复制

cd /opt/kafka/config
vim server.properties
登录后复制

几个关键配置:

  • broker.id
    登录后复制
    : 每个Kafka实例的唯一ID,集群中不能重复。
  • listeners
    登录后复制
    : 监听地址,我通常设成
    PLAINTEXT://:9092
    登录后复制
    PLAINTEXT://你的IP地址:9092
    登录后复制
    。如果只是本机测试,
    0.0.0.0:9092
    登录后复制
    也行。
  • log.dirs
    登录后复制
    : Kafka存储消息日志的目录,建议独立挂载磁盘。
  • zookeeper.connect
    登录后复制
    : Zookeeper地址,比如
    localhost:2181
    登录后复制

启动Kafka:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
登录后复制

同样可以用

jps
登录后复制
查看是否有
Kafka
登录后复制
进程。

4. PHP客户端安装与配置 PHP与Kafka交互主要通过

librdkafka
登录后复制
C库和
rdkafka
登录后复制
PHP扩展。

安装

librdkafka
登录后复制
这个库是核心,PHP扩展只是它的一个包装。

# 安装依赖
sudo yum install gcc-c++ make zlib-devel openssl-devel -y

# 下载librdkafka,通常用最新稳定版
git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
./configure
make
sudo make install
登录后复制

如果遇到权限问题,

sudo make install
登录后复制
可能会报错,确保
/usr/local/lib
登录后复制
等路径是可写的,或者调整安装路径。 安装完成后,更新共享库缓存:

sudo ldconfig
登录后复制

安装

rdkafka
登录后复制
PHP扩展:

# 确保你安装了php-devel,这是编译PHP扩展的必要组件
sudo yum install php-devel -y

# 通过PECL安装rdkafka
sudo pecl install rdkafka
登录后复制

安装过程中,它可能会问你

librdkafka
登录后复制
的安装路径,通常直接回车默认即可,因为它会尝试在标准路径找到。

安装成功后,需要在

php.ini
登录后复制
中启用这个扩展。

echo "extension=rdkafka.so" | sudo tee -a /etc/php.ini # 或者你的php.ini路径
登录后复制

重启PHP-FPM或Apache/Nginx服务,让配置生效。

sudo systemctl restart php-fpm # 或 apache/nginx
登录后复制

检查扩展是否加载成功:

php -m | grep rdkafka
登录后复制

如果能看到

rdkafka
登录后复制
,那就说明客户端环境也搞定了。

如何在CentOS系统上启动并测试Kafka服务?

启动Kafka服务,说起来不复杂,但需要注意顺序。首先,Zookeeper必须先跑起来,因为Kafka启动时会去连接它。我通常的习惯是先确认Zookeeper服务是健康的。

你可以用

zkServer.sh status
登录后复制
命令来检查Zookeeper的状态,确保它处于
Mode: standalone
登录后复制
Mode: follower/leader
登录后复制
。如果Zookeeper没问题,接着就可以启动Kafka了。

我们之前已经通过

kafka-server-start.sh -daemon /opt/kafka/config/server.properties
登录后复制
让Kafka在后台运行了。要确认它是否真的跑起来,最直接的方法是看进程:

琅琅配音
琅琅配音

全能AI配音神器

琅琅配音208
查看详情 琅琅配音
jps
登录后复制

如果看到

QuorumPeerMain
登录后复制
(Zookeeper)和
Kafka
登录后复制
这两个进程,那基本上就没问题了。

命令行测试Kafka: Kafka自带了一些命令行工具,非常适合做初步的功能测试。

创建Topic:

/opt/kafka/bin/kafka-topics.sh --create --topic my_test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
登录后复制

这里

--bootstrap-server
登录后复制
就是你Kafka监听的地址。

查看Topic列表:

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
登录后复制

你应该能看到

my_test_topic
登录后复制

生产者测试:

/opt/kafka/bin/kafka-console-producer.sh --topic my_test_topic --bootstrap-server localhost:9092
登录后复制

输入一些消息,比如

Hello Kafka from Producer!
登录后复制
,然后回车。

消费者测试: 打开另一个终端窗口,运行消费者:

/opt/kafka/bin/kafka-console-consumer.sh --topic my_test_topic --bootstrap-server localhost:9092 --from-beginning
登录后复制

你将看到生产者发送的消息。如果这些都正常工作,那么恭喜你,Kafka服务在CentOS上已经成功启动并可以进行基本的数据交互了。这套测试流程,我每次部署都会走一遍,确保基础功能没问题。

PHP应用如何与Kafka进行生产者和消费者交互?

PHP应用与Kafka的交互,核心就是使用

rdkafka
登录后复制
扩展提供的API。它的设计理念其实很直接,无非就是初始化一个生产者或消费者实例,然后进行消息的发送或接收。

PHP生产者示例:

<?php

$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092'); // Kafka broker地址
$conf->set('client.id', 'php-producer-app'); // 客户端ID,方便调试

// 设置消息发送失败时的回调,这对于生产环境很重要
$conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        // 这里可以记录日志,或者进行重试逻辑
        error_log(sprintf("Message delivery failed: %s (%s)", $message->errstr(), $message->err));
    } else {
        // error_log(sprintf("Message delivered to topic %s [%d] at offset %d", $message->topic_name, $message->partition, $message->offset));
    }
});

// 创建生产者实例
$producer = new RdKafka\Producer($conf);

// 获取一个Topic的生产器
$topic = $producer->newTopic("my_test_topic"); // 替换成你的Topic名称

$message = "Hello from PHP Producer! " . date('Y-m-d H:i:s');
$key = "my_key"; // 消息的key,用于分区

// 发送消息
// RD_KAFKA_PARTITION_UA 表示由librdkafka自动选择分区
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message, $key);

echo "Sent message: " . $message . "\n";

// 循环调用poll,等待消息发送结果回调
// 这是一个非阻塞的等待,实际应用中可能需要更复杂的循环或异步处理
for ($i = 0; $i < 10; $i++) {
    $producer->poll(100); // Poll for 100ms
    if ($producer->getOutQLen() === 0) { // 如果发送队列为空,说明消息已发出
        break;
    }
}

if ($producer->getOutQLen() > 0) {
    echo "Still " . $producer->getOutQLen() . " messages in queue, waiting for delivery.\n";
    // 强制等待所有消息发送完成
    $producer->flush(10000); // 最多等待10秒
}

if ($producer->getOutQLen() === 0) {
    echo "All messages delivered.\n";
} else {
    echo "Some messages failed to deliver.\n";
}

?>
登录后复制

这里

$producer->poll()
登录后复制
$producer->flush()
登录后复制
是关键,它们负责处理内部事件和消息的回调。我第一次用的时候,就因为没调用
poll
登录后复制
,导致消息一直发不出去,排查了好久。

PHP消费者示例:

<?php

$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('group.id', 'php-consumer-group'); // 消费者组ID

// 设置自动提交offset,也可以手动控制
$conf->set('enable.auto.commit', 'true');
$conf->set('auto.offset.reset', 'earliest'); // 如果没有offset,从最早的消息开始消费

// 设置错误回调
$conf->setErrorCb(function ($kafka, $err, $reason) {
    error_log(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});

// 创建消费者实例
$consumer = new RdKafka\KafkaConsumer($conf);

// 订阅Topic
$consumer->subscribe(['my_test_topic']); // 可以订阅多个Topic

echo "Waiting for messages... (Press Ctrl+C to stop)\n";

while (true) {
    $message = $consumer->consume(120*1000); // 最多等待120秒

    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            // 收到消息
            echo sprintf("Message received: Topic %s, Partition %d, Offset %d, Key %s, Payload: %s\n",
                $message->topic_name,
                $message->partition,
                $message->offset,
                $message->key,
                $message->payload
            );
            // 如果是手动提交offset,这里需要调用 $consumer->commit($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            // 到达分区末尾,但没有新消息
            echo "No more messages, waiting for new ones...\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            // 等待超时,没有收到消息
            echo "Waiting for messages timed out...\n";
            break;
        default:
            // 其他错误
            error_log(sprintf("Consumer error: %s (%s)", $message->errstr(), $message->err));
            break;
    }
}

?>
登录后复制

消费者这边,

$consumer->consume()
登录后复制
是阻塞的,它会一直等待直到有消息或者超时。
group.id
登录后复制
非常重要,它决定了消费者在哪个组里,以及消息的分配策略。我建议在生产环境中使用手动提交offset,这样可以更好地控制消息的处理逻辑,避免重复消费或消息丢失。

配置Kafka PHP客户端时,有哪些关键参数需要注意?

配置

rdkafka
登录后复制
PHP客户端时,有些参数确实是“重中之重”,它们直接影响到客户端的行为、性能以及消息的可靠性。我个人在实践中,最常关注和调整的就是下面这些:

  1. bootstrap.servers
    登录后复制
    : 这是最基础也最重要的参数,指定Kafka broker的地址列表。格式通常是
    host1:port1,host2:port2
    登录后复制
    。如果只配置一个,当这个broker挂掉时,客户端就无法连接了。所以,我总是建议配置至少两个或更多的broker地址,即使是单机测试,也尽量模拟集群环境,这样客户端就能自动发现并连接到可用的broker。

  2. client.id
    登录后复制
    : 客户端ID,一个任意字符串,用于在Kafka日志和监控中识别你的客户端。虽然不是强制的,但强烈建议设置一个有意义的ID,尤其是在复杂的系统里,这对于排查问题简直是救命稻草。比如
    php-web-order-producer
    登录后复制
    或者
    php-analytics-consumer
    登录后复制

  3. 生产者相关参数:

    • acks
      登录后复制
      : 这个参数控制生产者发送消息的可靠性。
      • 0
        登录后复制
        : 生产者发送后不等待任何确认。性能最好,但消息丢失风险最高。
      • 1
        登录后复制
        : Leader副本收到消息后即确认。平衡了性能和可靠性。
      • all
        登录后复制
        (或
        -1
        登录后复制
        ): Leader副本收到消息,并等待所有ISR(In-Sync Replicas)副本都同步完成后才确认。可靠性最高,但性能最低。 我通常会根据业务对消息丢失的容忍度来选择,对于核心业务,
        all
        登录后复制
        是首选。
    • message.timeout.ms
      登录后复制
      : 消息发送的超时时间,超过这个时间还没收到确认,就会被认为是失败。
    • retries
      登录后复制
      /
      message.send.max.retries
      登录后复制
      : 消息发送失败后的重试次数。配合
      retry.backoff.ms
      登录后复制
      (重试间隔)一起使用,可以提高消息的发送成功率。但要注意,重试可能导致消息重复,下游消费者需要具备幂等性处理能力。
    • compression.codec
      登录后复制
      : 消息压缩算法,如
      gzip
      登录后复制
      snappy
      登录后复制
      lz4
      登录后复制
      zstd
      登录后复制
      。选择合适的压缩算法可以在网络传输和存储上节省资源,但会增加CPU开销。
  4. 消费者相关参数:

    • group.id
      登录后复制
      : 消费者组ID。这是Kafka实现消息广播和负载均衡的关键。同一
      group.id
      登录后复制
      下的消费者会共同消费一个Topic的不同分区,实现负载均衡;不同
      group.id
      登录后复制
      下的消费者则会收到所有消息,实现消息广播。理解并正确使用
      group.id
      登录后复制
      至关重要。
    • enable.auto.commit
      登录后复制
      : 是否自动提交offset。设置为
      true
      登录后复制
      时,
      rdkafka
      登录后复制
      会定期自动提交消费者已处理的offset。这很方便,但如果程序在提交前崩溃,可能导致消息重复消费。我个人更倾向于设置为
      false
      登录后复制
      ,然后手动控制offset的提交,这样可以更精确地控制消息处理的事务性。
    • auto.offset.reset
      登录后复制
      : 当消费者组第一次启动,或者之前提交的offset无效(比如数据已过期)时,如何处理。
      • earliest
        登录后复制
        : 从Topic的最早可用offset开始消费。
      • latest
        登录后复制
        : 从Topic的最新offset开始消费。
      • none
        登录后复制
        : 如果没有有效offset,直接抛出错误。 这个参数的选择取决于你的业务需求,是希望重新处理所有历史消息,还是只关心新消息。
    • max.poll.interval.ms
      登录后复制
      : 消费者两次
      consume()
      登录后复制
      调用之间的最大允许间隔。如果超过这个时间没有调用
      consume()
      登录后复制
      ,Kafka会认为这个消费者已经挂了,并触发Rebalance,将它的分区分配给组内其他消费者。对于长时间处理消息的场景,需要注意调整这个值。

这些参数的调整,往往需要结合实际的业务场景和对Kafka的理解。没有一劳永逸的配置,只有最适合你当前系统的配置。我建议在开发和测试阶段就多尝试不同的参数组合,观察其对系统行为的影响。

以上就是CentOS怎么配置Kafka PHP_CentOS搭建Kafka并配置PHP客户端教程的详细内容,更多请关注php中文网其它相关文章!

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号