使用python通过pika操作rabbitmq的核心步骤为:1. 建立连接(blockingconnection);2. 创建通道(channel);3. 声明持久化队列(queue_declare,durable=true);4. 发布消息时设置消息持久化(delivery_mode=2);5. 消费者手动确认消息(auto_ack=false,basic_ack)。选择rabbitmq因其基于amqp协议,具备高可靠性、丰富的交换机类型和成熟生态,适合需要复杂路由与消息不丢失的场景。pika的同步模式(blockingconnection)适用于简单脚本,逻辑直观但阻塞线程;异步模式(如selectconnection)适用于高并发服务,通过事件循环提升吞吐量,但编程复杂度更高。消息持久化需同时设置队列和消息的durable与delivery_mode=2,确保服务重启后消息可恢复;确认机制通过关闭auto_ack并手动调用basic_ack实现,保证消息被成功处理前不会丢失,支持“至少一次”投递,要求消费者具备幂等性。完整实现包括生产者发送5条消息并休眠,消费者接收后模拟处理耗时并发送确认,确保消息可靠传递与处理。

Python操作消息队列,Pika连接RabbitMQ,这组合在很多后端系统里简直是标配。它提供了一种可靠的异步通信机制,让不同服务间解耦,处理高并发任务变得游刃有余。通过Pika库,Python应用可以轻松地发布消息到队列,也能消费队列中的消息,实现服务间的有效协作。
要用Python通过Pika操作RabbitMQ,核心步骤围绕着连接(Connection)、通道(Channel)、声明队列/交换机、发布消息和消费消息。最直接的方式是使用Pika的
BlockingConnection
生产者(发布消息)示例:
立即学习“Python免费学习笔记(深入)”;
import pika
import time
# 连接RabbitMQ服务器
# 这里假设RabbitMQ运行在本地,没有用户名密码
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列,如果队列不存在则创建。durable=True表示队列持久化
# 即使RabbitMQ重启,队列也不会丢失
channel.queue_declare(queue='my_queue', durable=True)
message_count = 0
while message_count < 5:
message = f"Hello World! Message number {message_count}"
# 发布消息到默认交换机,路由键为队列名
# delivery_mode=2表示消息持久化,即使RabbitMQ重启,消息也不会丢失
channel.basic_publish(
exchange='',
routing_key='my_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
)
)
print(f" [x] Sent '{message}'")
message_count += 1
time.sleep(1) # 模拟发送间隔
connection.close()消费者(消费消息)示例:
import pika
import time
# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明相同的队列,确保消费者知道要从哪个队列取消息
channel.queue_declare(queue='my_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
"""
消息处理回调函数
ch: channel对象
method: 包含消息的 delivery tag 等信息
properties: 消息属性
body: 消息体
"""
print(f" [x] Received '{body.decode()}'")
time.sleep(body.count(b'.')) # 模拟处理消息的耗时
# 消息处理完成后,发送确认回执
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" [x] Done")
# 设置QoS (Quality of Service),每次只分发一条消息给消费者
# 这样可以防止一个消费者处理速度慢,导致所有消息堆积在它那里
channel.basic_qos(prefetch_count=1)
# 开始消费消息,no_ack=False表示需要手动发送确认回执
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
# 启动消费循环,会一直阻塞直到连接关闭
channel.start_consuming()我个人觉得,RabbitMQ就像是消息队列界的“老黄牛”,它稳定、可靠、功能全面,是很多企业级应用的首选。你可能听说过Kafka在高吞吐量大数据场景的优势,但对于需要复杂路由、高可靠性、并且消息处理量并非天文数字的业务,RabbitMQ的优势就凸显出来了。
它基于AMQP(Advanced Message Queuing Protocol)协议,这个协议本身就为消息的可靠传输、事务性、路由等提供了强大的保障。这意味着,当你的系统对消息丢失零容忍时,RabbitMQ能给你足够的信心。它的交换机(Exchange)类型非常丰富,比如直连(Direct)、扇出(Fanout)、主题(Topic)、头部(Headers),可以满足各种复杂的路由需求。想象一下,你有一个订单系统,新订单消息可能需要同时通知库存、物流和客户服务部门,通过RabbitMQ的Topic交换机,一条消息就能精准地分发给所有相关方,这可比你手动维护多个HTTP请求或者数据库触发器要优雅和高效得多。
而且,RabbitMQ的社区非常活跃,文档也相当完善,遇到问题很容易找到解决方案。对于Python开发者来说,Pika库的支持也很好,虽然Pika的API有时候看起来有点“原生”,需要对AMQP概念有一定理解,但这正是它强大和灵活的体现。它的成熟度,让它在很多关键业务场景下,成为一个让人放心的选择。
Pika库提供了两种主要的工作模式:同步模式(
BlockingConnection
SelectConnection
TornadoConnection
想象一下,你是个餐厅服务员。
同步模式 (BlockingConnection
BlockingConnection
BlockingConnection
异步模式 (SelectConnection
TornadoConnection
在生产环境中,消息的持久化和确认机制是确保消息不丢失的关键。这两点在Pika中都有明确的实现方式,它们共同构筑了RabbitMQ“至少一次”消息投递的可靠性保障。
消息持久化:
消息持久化分为两个层面:队列持久化和消息持久化。
队列持久化: 当你声明队列时,将
durable
True
channel.queue_declare(queue='my_queue', durable=True)
这样做是为了防止RabbitMQ服务器重启后,你创建的队列消失。如果队列是非持久化的,服务器一重启,队列就不在了,即使里面有持久化的消息,也无处可寻了。
消息持久化: 当你发布消息时,通过
pika.BasicProperties
delivery_mode=2
channel.basic_publish(
exchange='',
routing_key='my_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 2表示消息持久化
)
)这告诉RabbitMQ,这条消息需要写入磁盘。这样,即使在消息到达消费者并被确认之前,RabbitMQ服务器突然崩溃,重启后这条消息也能从磁盘中恢复,并重新投递。
需要注意的是,即使消息和队列都持久化了,也不能保证100%不丢消息。比如,在消息到达RabbitMQ并写入磁盘的极短时间内,如果服务器崩溃,消息可能还是会丢失。对于极端高可靠性的场景,你可能还需要结合发布者确认(Publisher Confirms)机制。
消息确认机制(Acknowledgements):
这是消费者端确保消息被成功处理的关键。当消费者从队列中获取一条消息后,它需要向RabbitMQ发送一个“确认回执”(Acknowledgement)。
关闭自动确认: 在
basic_consume
auto_ack
False
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
默认情况下,
auto_ack
True
手动发送确认回执: 在你的消息回调函数中,当消息被成功处理后,调用
channel.basic_ack()
def callback(ch, method, properties, body):
# ... 处理消息的逻辑 ...
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息delivery_tag
如果消费者在处理消息过程中崩溃,或者没有发送确认回执,RabbitMQ会认为这条消息没有被成功处理,并在消费者重新连接或有其他消费者可用时,将这条消息重新投递给其他消费者。这保证了消息的“至少一次”投递:消息可能被投递多次,但绝不会丢失。当然,这也意味着你的消费者需要具备幂等性,即多次处理同一条消息不会产生副作用。
你也可以使用
basic_nack
basic_reject
basic_nack
requeue=True/False
basic_reject
以上就是Python怎样操作消息队列?pika连接RabbitMQ的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号