首页 > php框架 > Swoole > 正文

Swoole如何实现一个简单的MQTT服务器

裘德小鎮的故事
发布: 2025-09-30 21:30:04
原创
726人浏览过
Swoole可通过TCP服务器实现MQTT协议解析,核心包括处理CONNECT、PUBLISH、SUBSCRIBE等报文,管理客户端订阅关系与消息转发,需手动解析变长头部与主题长度,支持PINGREQ心跳与连接状态维护,配合mosquitto工具测试基础通信,适用于轻量级物联网场景,但生产环境需扩展QoS、TLS、持久化等机制。

swoole如何实现一个简单的mqtt服务器

Swoole 是一个强大的 PHP 扩展,支持异步、并发和长连接网络编程,非常适合用来构建自定义协议的服务器,比如 MQTT 服务。虽然 Swoole 不直接内置 MQTT 协议解析,但你可以基于其 TCP 或 WebSocket 服务器功能,手动实现一个轻量级的 MQTT 服务器。

理解 MQTT 协议基础

MQTT 是一种基于发布/订阅模式的轻量级消息协议,常用于物联网场景。它使用二进制报文格式,通过 TCP 传输。关键报文类型包括:CONNECT、PUBLISH、SUBSCRIBE、PINGREQ、DISCONNECT 等。

要实现简单 MQTT 服务器,你至少需要:

  • 解析 MQTT 固定头部(1字节控制类型+标志 + 变长长度)
  • 解析 CONNECT 报文中的客户端 ID、用户名、密码等
  • 支持客户端订阅主题(SUBSCRIBE)
  • 支持消息发布(PUBLISH)并转发给订阅者
  • 维护客户端连接状态和订阅关系

使用 Swoole 创建 TCP 服务器

下面是一个基于 Swoole 实现的极简 MQTT 服务器骨架:

$server = new Swoole\Server('0.0.0.0', 1883);

// 存储客户端订阅的主题,格式:topic => [fd1, fd2, ...]
$subscribers = [];

$server->on('connect', function ($serv, $fd) {
    echo "Client: {$fd} connected.\n";
});

$server->on('receive', function ($serv, $fd, $reactorId, $data) use (&$subscribers) {
    if (strlen($data) < 2) return;

    $firstByte = ord($data[0]);
    $msgType = ($firstByte & 0xF0) >> 4;
    // 解析剩余长度(MQTT 变长编码)
    $i = 1;
    $remainingLength = 0;
    $multiplier = 1;
    do {
        if ($i >= strlen($data)) return;
        $byte = ord($data[$i]);
        $remainingLength += ($byte & 127) * $multiplier;
        $multiplier *= 128;
        $i++;
    } while (($byte & 128) > 0);

    $payload = substr($data, $i, $remainingLength);

    switch ($msgType) {
        case 1: // CONNECT
            handleConnect($serv, $fd, $payload);
            break;
        case 3: // PUBLISH
            handlePublish($serv, $payload, $subscribers);
            break;
        case 8: // SUBSCRIBE
            handleSubscribe($serv, $fd, $payload, $subscribers);
            break;
        case 12: // PINGREQ
            $serv->send($fd, chr(0xC0) . chr(0x00)); // 返回 PINGRESP
            break;
        case 14: // DISCONNECT
            $serv->close($fd);
            break;
    }
});

$server->on('close', function ($serv, $fd) use (&$subscribers) {
    // 清理该客户端的订阅
    foreach ($subscribers as $topic => $clients) {
        $subscribers[$topic] = array_filter($clients, function ($clientFd) use ($fd) {
            return $clientFd !== $fd;
        });
    }
    echo "Client: {$fd} closed.\n";
});

$server->start();

function handleConnect($serv, $fd, $payload) {
    // 跳过协议名(通常是'MQTT')
    $offset = 0;
    $protocolNameLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2 + $protocolNameLen;

    $protocolLevel = $payload[$offset];
    $offset += 1;

    $connectFlags = ord($payload[$offset]);
    $offset += 1;

    $keepAlive = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;

    // 解析 Client ID
    $clientIdLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $clientId = substr($payload, $offset, $clientIdLen);
    $offset += $clientIdLen;

    echo "Client ID: {$clientId} connected.\n";

    // 发送 CONNACK (连接确认)
    $connack = chr(0x20) . chr(0x02) . chr(0x00) . chr(0x00); // 0x00 表示连接成功
    $serv->send($fd, $connack);
}

function handleSubscribe($serv, $fd, $payload, &$subscribers) {
    $packetId = (ord($payload[0]) << 8) | ord($payload[1]);

    $offset = 2;
    $topics = [];
    while ($offset < strlen($payload)) {
        $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
        $offset += 2;
        $topic = substr($payload, $offset, $topicLen);
        $offset += $topicLen;
        $qos = $payload[$offset] & 0x03;
        $offset += 1;

        $topics[] = $topic;
        if (!isset($subscribers[$topic])) {
            $subscribers[$topic] = [];
        }
        $subscribers[$topic][] = $fd;
    }

    // 返回 SUBACK
    $suback = chr(0x90) . chr(count($topics) + 2) . chr($packetId >> 8) . chr($packetId & 0xFF);
    foreach ($topics as $t) {
        $suback .= chr(0x01); // 返回 QoS 1
    }
    $serv->send($fd, $suback);
}

function handlePublish($serv, $payload, &$subscribers) {
    $offset = 0;
    $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $topic = substr($payload, $offset, $topicLen);
    $offset += $topicLen;

    $message = substr($payload, $offset);

    // 查找订阅了该主题的客户端并发送消息
    if (isset($subscribers[$topic])) {
        $pubMsg = chr(0x30) . pack('C', 2 + $topicLen + strlen($message))
                . chr($topicLen >> 8) . chr($topicLen & 0xFF)
                . $topic . $message;

        foreach ($subscribers[$topic] as $clientFd) {
            $serv->send($clientFd, $pubMsg);
        }
    }
}
登录后复制

测试你的 MQTT 服务器

使用 mosquitto_pubmosquitto_sub 工具测试:

Giiso写作机器人
Giiso写作机器人

Giiso写作机器人,让写作更简单

Giiso写作机器人56
查看详情 Giiso写作机器人
  • mosquitto_sub -h 127.0.0.1 -p 1883 -t 'test/topic'
  • mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'Hello Swoole MQTT'

如果一切正常,订阅端会收到消息。

注意事项与扩展建议

这个实现非常基础,仅用于学习。生产环境需考虑:

  • 完整的 MQTT 协议解析(QoS 0/1/2、遗嘱消息、Clean Session 等)
  • 心跳机制和超时断开
  • 主题通配符(+ 和 #)支持
  • 持久化会话和消息队列
  • 安全性:认证、加密(TLS)
  • 集群和负载均衡

也可以考虑基于开源项目如 EMQX 或使用 Swoole 配合 Workerman + GatewayWorker 构建更复杂的 MQTT 代理。

基本上就这些。用 Swoole 写 MQTT 服务器核心是解析二进制协议并管理连接状态,难点在协议细节处理。

以上就是Swoole如何实现一个简单的MQTT服务器的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号