Swoole可通过TCP服务器实现MQTT协议解析,核心包括处理CONNECT、PUBLISH、SUBSCRIBE等报文,管理客户端订阅关系与消息转发,需手动解析变长头部与主题长度,支持PINGREQ心跳与连接状态维护,配合mosquitto工具测试基础通信,适用于轻量级物联网场景,但生产环境需扩展QoS、TLS、持久化等机制。

Swoole 是一个强大的 PHP 扩展,支持异步、并发和长连接网络编程,非常适合用来构建自定义协议的服务器,比如 MQTT 服务。虽然 Swoole 不直接内置 MQTT 协议解析,但你可以基于其 TCP 或 WebSocket 服务器功能,手动实现一个轻量级的 MQTT 服务器。
MQTT 是一种基于发布/订阅模式的轻量级消息协议,常用于物联网场景。它使用二进制报文格式,通过 TCP 传输。关键报文类型包括:CONNECT、PUBLISH、SUBSCRIBE、PINGREQ、DISCONNECT 等。
要实现简单 MQTT 服务器,你至少需要:
下面是一个基于 Swoole 实现的极简 MQTT 服务器骨架:
$server = new Swoole\Server('0.0.0.0', 1883);
// 存储客户端订阅的主题,格式:topic => [fd1, fd2, ...]
$subscribers = [];
$server->on('connect', function ($serv, $fd) {
    echo "Client: {$fd} connected.\n";
});
$server->on('receive', function ($serv, $fd, $reactorId, $data) use (&$subscribers) {
    if (strlen($data) < 2) return;
    $firstByte = ord($data[0]);
    $msgType = ($firstByte & 0xF0) >> 4;
    // 解析剩余长度(MQTT 变长编码)
    $i = 1;
    $remainingLength = 0;
    $multiplier = 1;
    do {
        if ($i >= strlen($data)) return;
        $byte = ord($data[$i]);
        $remainingLength += ($byte & 127) * $multiplier;
        $multiplier *= 128;
        $i++;
    } while (($byte & 128) > 0);
    $payload = substr($data, $i, $remainingLength);
    switch ($msgType) {
        case 1: // CONNECT
            handleConnect($serv, $fd, $payload);
            break;
        case 3: // PUBLISH
            handlePublish($serv, $payload, $subscribers);
            break;
        case 8: // SUBSCRIBE
            handleSubscribe($serv, $fd, $payload, $subscribers);
            break;
        case 12: // PINGREQ
            $serv->send($fd, chr(0xC0) . chr(0x00)); // 返回 PINGRESP
            break;
        case 14: // DISCONNECT
            $serv->close($fd);
            break;
    }
});
$server->on('close', function ($serv, $fd) use (&$subscribers) {
    // 清理该客户端的订阅
    foreach ($subscribers as $topic => $clients) {
        $subscribers[$topic] = array_filter($clients, function ($clientFd) use ($fd) {
            return $clientFd !== $fd;
        });
    }
    echo "Client: {$fd} closed.\n";
});
$server->start();
function handleConnect($serv, $fd, $payload) {
    // 跳过协议名(通常是'MQTT')
    $offset = 0;
    $protocolNameLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2 + $protocolNameLen;
    $protocolLevel = $payload[$offset];
    $offset += 1;
    $connectFlags = ord($payload[$offset]);
    $offset += 1;
    $keepAlive = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    // 解析 Client ID
    $clientIdLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $clientId = substr($payload, $offset, $clientIdLen);
    $offset += $clientIdLen;
    echo "Client ID: {$clientId} connected.\n";
    // 发送 CONNACK (连接确认)
    $connack = chr(0x20) . chr(0x02) . chr(0x00) . chr(0x00); // 0x00 表示连接成功
    $serv->send($fd, $connack);
}
function handleSubscribe($serv, $fd, $payload, &$subscribers) {
    $packetId = (ord($payload[0]) << 8) | ord($payload[1]);
    $offset = 2;
    $topics = [];
    while ($offset < strlen($payload)) {
        $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
        $offset += 2;
        $topic = substr($payload, $offset, $topicLen);
        $offset += $topicLen;
        $qos = $payload[$offset] & 0x03;
        $offset += 1;
        $topics[] = $topic;
        if (!isset($subscribers[$topic])) {
            $subscribers[$topic] = [];
        }
        $subscribers[$topic][] = $fd;
    }
    // 返回 SUBACK
    $suback = chr(0x90) . chr(count($topics) + 2) . chr($packetId >> 8) . chr($packetId & 0xFF);
    foreach ($topics as $t) {
        $suback .= chr(0x01); // 返回 QoS 1
    }
    $serv->send($fd, $suback);
}
function handlePublish($serv, $payload, &$subscribers) {
    $offset = 0;
    $topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
    $offset += 2;
    $topic = substr($payload, $offset, $topicLen);
    $offset += $topicLen;
    $message = substr($payload, $offset);
    // 查找订阅了该主题的客户端并发送消息
    if (isset($subscribers[$topic])) {
        $pubMsg = chr(0x30) . pack('C', 2 + $topicLen + strlen($message))
                . chr($topicLen >> 8) . chr($topicLen & 0xFF)
                . $topic . $message;
        foreach ($subscribers[$topic] as $clientFd) {
            $serv->send($clientFd, $pubMsg);
        }
    }
}使用 mosquitto_pub 和 mosquitto_sub 工具测试:
mosquitto_sub -h 127.0.0.1 -p 1883 -t 'test/topic'mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'Hello Swoole MQTT'如果一切正常,订阅端会收到消息。
这个实现非常基础,仅用于学习。生产环境需考虑:
也可以考虑基于开源项目如 EMQX 或使用 Swoole 配合 Workerman + GatewayWorker 构建更复杂的 MQTT 代理。
基本上就这些。用 Swoole 写 MQTT 服务器核心是解析二进制协议并管理连接状态,难点在协议细节处理。
以上就是Swoole如何实现一个简单的MQTT服务器的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号