Python连接MQ核心是选对客户端库、建立可靠连接、正确收发消息并做好异常与确认处理;主流MQ对应库包括RabbitMQ用pika、Kafka用kafka-python、Redis用redis-py、RocketMQ用rocketmq-client-python。

Python连接消息队列(MQ)系统,核心是选对客户端库、建立可靠连接、正确收发消息,并做好异常与确认处理。不同MQ系统协议和API略有差异,但通用逻辑一致。
主流MQ及推荐库:
安装示例(以RabbitMQ为例):
pip install pika
典型步骤:建立连接 → 创建信道 → 声明交换机/队列 → 发布消息
立即学习“Python免费学习笔记(深入)”;
import pika
<h1>1. 连接RabbitMQ服务器</h1><p>connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()</p><h1>2. 确保队列存在(不存在则自动创建)</h1><p>channel.queue_declare(queue='task_queue', durable=True)</p>
<div class="aritcle_card">
<a class="aritcle_card_img" href="/ai/1337">
<img src="https://img.php.cn/upload/ai_manual/001/431/639/68b6d64b79043646.png" alt="盘古大模型">
</a>
<div class="aritcle_card_info">
<a href="/ai/1337">盘古大模型</a>
<p>华为云推出的一系列高性能人工智能大模型</p>
<div class="">
<img src="/static/images/card_xiazai.png" alt="盘古大模型">
<span>207</span>
</div>
</div>
<a href="/ai/1337" class="aritcle_card_btn">
<span>查看详情</span>
<img src="/static/images/cardxiayige-3.png" alt="盘古大模型">
</a>
</div>
<h1>3. 发送消息(持久化 + 消息确认)</h1><p>channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化,重启后不丢失
)
)
print(" [x] Sent 'Hello World!'")
connection.close()
避免消息丢失的关键是关闭自动确认(auto_ack=False),并显式调用 basic_ack。
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模拟处理耗时任务
import time
time.sleep(2)
# 手动确认消息已处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
<h1>关闭自动确认</h1><p>channel.basic_consume(queue='task_queue', on_message_callback=callback)</p><p>print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
注意:若处理中崩溃且未ack,RabbitMQ会将消息重新入队(前提是队列和消息都设为durable,且消费者未设置requeue=False)。
基本上就这些。MQ不是黑盒,理解“连接-声明-发/收-确认-异常兜底”这条主线,就能稳住大多数业务场景。
以上就是Python如何连接消息队列系统_MQ消息处理步骤详解【教程】的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号