0

0

Python怎样操作消息队列?pika连接RabbitMQ

蓮花仙者

蓮花仙者

发布时间:2025-08-04 14:37:01

|

271人浏览过

|

来源于php中文网

原创

使用python通过pika操作rabbitmq的核心步骤为:1. 建立连接(blockingconnection);2. 创建通道(channel);3. 声明持久化队列(queue_declare,durable=true);4. 发布消息时设置消息持久化(delivery_mode=2);5. 消费者手动确认消息(auto_ack=false,basic_ack)。选择rabbitmq因其基于amqp协议,具备高可靠性、丰富的交换机类型和成熟生态,适合需要复杂路由与消息不丢失的场景。pika的同步模式(blockingconnection)适用于简单脚本,逻辑直观但阻塞线程;异步模式(如selectconnection)适用于高并发服务,通过事件循环提升吞吐量,但编程复杂度更高。消息持久化需同时设置队列和消息的durable与delivery_mode=2,确保服务重启后消息可恢复;确认机制通过关闭auto_ack并手动调用basic_ack实现,保证消息被成功处理前不会丢失,支持“至少一次”投递,要求消费者具备幂等性。完整实现包括生产者发送5条消息并休眠,消费者接收后模拟处理耗时并发送确认,确保消息可靠传递与处理。

Python怎样操作消息队列?pika连接RabbitMQ

Python操作消息队列,Pika连接RabbitMQ,这组合在很多后端系统里简直是标配。它提供了一种可靠的异步通信机制,让不同服务间解耦,处理高并发任务变得游刃有余。通过Pika库,Python应用可以轻松地发布消息到队列,也能消费队列中的消息,实现服务间的有效协作。

解决方案

要用Python通过Pika操作RabbitMQ,核心步骤围绕着连接(Connection)、通道(Channel)、声明队列/交换机、发布消息和消费消息。最直接的方式是使用Pika的

BlockingConnection
,它简单易用,适合快速开发和非高并发场景。

生产者(发布消息)示例:

立即学习Python免费学习笔记(深入)”;

import pika
import time

# 连接RabbitMQ服务器
# 这里假设RabbitMQ运行在本地,没有用户名密码
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列,如果队列不存在则创建。durable=True表示队列持久化
# 即使RabbitMQ重启,队列也不会丢失
channel.queue_declare(queue='my_queue', durable=True)

message_count = 0
while message_count < 5:
    message = f"Hello World! Message number {message_count}"
    # 发布消息到默认交换机,路由键为队列名
    # delivery_mode=2表示消息持久化,即使RabbitMQ重启,消息也不会丢失
    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 使消息持久化
        )
    )
    print(f" [x] Sent '{message}'")
    message_count += 1
    time.sleep(1) # 模拟发送间隔

connection.close()

消费者(消费消息)示例:

import pika
import time

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明相同的队列,确保消费者知道要从哪个队列取消息
channel.queue_declare(queue='my_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    """
    消息处理回调函数
    ch: channel对象
    method: 包含消息的 delivery tag 等信息
    properties: 消息属性
    body: 消息体
    """
    print(f" [x] Received '{body.decode()}'")
    time.sleep(body.count(b'.')) # 模拟处理消息的耗时
    # 消息处理完成后,发送确认回执
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(" [x] Done")

# 设置QoS (Quality of Service),每次只分发一条消息给消费者
# 这样可以防止一个消费者处理速度慢,导致所有消息堆积在它那里
channel.basic_qos(prefetch_count=1)

# 开始消费消息,no_ack=False表示需要手动发送确认回执
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

# 启动消费循环,会一直阻塞直到连接关闭
channel.start_consuming()

为什么选择RabbitMQ作为消息队列?

我个人觉得,RabbitMQ就像是消息队列界的“老黄牛”,它稳定、可靠、功能全面,是很多企业级应用的首选。你可能听说过Kafka在高吞吐量大数据场景的优势,但对于需要复杂路由、高可靠性、并且消息处理量并非天文数字的业务,RabbitMQ的优势就凸显出来了。

它基于AMQP(Advanced Message Queuing Protocol)协议,这个协议本身就为消息的可靠传输、事务性、路由等提供了强大的保障。这意味着,当你的系统对消息丢失零容忍时,RabbitMQ能给你足够的信心。它的交换机(Exchange)类型非常丰富,比如直连(Direct)、扇出(Fanout)、主题(Topic)、头部(Headers),可以满足各种复杂的路由需求。想象一下,你有一个订单系统,新订单消息可能需要同时通知库存、物流和客户服务部门,通过RabbitMQ的Topic交换机,一条消息就能精准地分发给所有相关方,这可比你手动维护多个HTTP请求或者数据库触发器要优雅和高效得多。

而且,RabbitMQ的社区非常活跃,文档也相当完善,遇到问题很容易找到解决方案。对于Python开发者来说,Pika库的支持也很好,虽然Pika的API有时候看起来有点“原生”,需要对AMQP概念有一定理解,但这正是它强大和灵活的体现。它的成熟度,让它在很多关键业务场景下,成为一个让人放心的选择。

Pika库的异步与同步模式有何不同?

Pika库提供了两种主要的工作模式:同步模式(

BlockingConnection
)和异步模式(如
SelectConnection
TornadoConnection
等)。这两种模式的选择,很大程度上取决于你的应用场景和对性能、并发处理的需求。

想象一下,你是个餐厅服务员。

同步模式 (

BlockingConnection
) 就像你一次只服务一位客人。你接到一个点餐请求,就一直等到菜做好、客人吃完、结账,你才去接下一个客人的请求。这种模式简单直接,逻辑清晰,不容易出错。对于那些一次只处理少量消息、或者在脚本中一次性发送一批消息然后退出的场景,
BlockingConnection
是完美的。它会阻塞当前线程,直到操作完成。比如,一个简单的日志收集脚本,把日志发到RabbitMQ,用
BlockingConnection
就足够了,写起来也很顺手。

异步模式 (

SelectConnection
,
TornadoConnection
等)
则像你同时服务多位客人。你接到点餐请求后,不是傻等,而是把点餐单交给厨房,然后立刻去接其他客人的请求,或者去处理其他事务(比如倒水、收拾桌子)。当厨房通知你菜好了,你再回来处理之前的点餐。这种模式复杂一些,因为你需要管理多个并发的事件,但它的效率非常高,不会因为一个耗时操作而阻塞整个应用。对于Web服务(如Django、Flask应用)、长连接服务、或者需要处理大量并发消息的消费者来说,异步模式是必不可少的。它能让你的应用在等待I/O(比如网络传输)的时候,去处理其他事情,大大提升了吞吐量和响应速度。当然,这也意味着你需要更深入地理解Python的异步编程模型,比如回调函数、事件循环等。虽然学习曲线可能陡峭一点,但一旦掌握,它能让你的Python应用在处理消息队列时如虎添翼。

消息的持久化与确认机制在Pika中如何实现?

在生产环境中,消息的持久化和确认机制是确保消息不丢失的关键。这两点在Pika中都有明确的实现方式,它们共同构筑了RabbitMQ“至少一次”消息投递的可靠性保障。

消息持久化:

豆包手机助手
豆包手机助手

豆包推出的手机系统服务级AI助手

下载

消息持久化分为两个层面:队列持久化消息持久化

  1. 队列持久化: 当你声明队列时,将

    durable
    参数设为
    True

    channel.queue_declare(queue='my_queue', durable=True)

    这样做是为了防止RabbitMQ服务器重启后,你创建的队列消失。如果队列是非持久化的,服务器一重启,队列就不在了,即使里面有持久化的消息,也无处可寻了。

  2. 消息持久化: 当你发布消息时,通过

    pika.BasicProperties
    设置
    delivery_mode=2

    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 2表示消息持久化
        )
    )

    这告诉RabbitMQ,这条消息需要写入磁盘。这样,即使在消息到达消费者并被确认之前,RabbitMQ服务器突然崩溃,重启后这条消息也能从磁盘中恢复,并重新投递。

需要注意的是,即使消息和队列都持久化了,也不能保证100%不丢消息。比如,在消息到达RabbitMQ并写入磁盘的极短时间内,如果服务器崩溃,消息可能还是会丢失。对于极端高可靠性的场景,你可能还需要结合发布者确认(Publisher Confirms)机制。

消息确认机制(Acknowledgements):

这是消费者端确保消息被成功处理的关键。当消费者从队列中获取一条消息后,它需要向RabbitMQ发送一个“确认回执”(Acknowledgement)。

  1. 关闭自动确认:

    basic_consume
    时,将
    auto_ack
    参数设为
    False

    channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

    默认情况下,

    auto_ack
    True
    ,这意味着RabbitMQ一旦把消息发给消费者,就认为消息已经被成功处理并从队列中删除。这显然是不安全的。

  2. 手动发送确认回执: 在你的消息回调函数中,当消息被成功处理后,调用

    channel.basic_ack()

    def callback(ch, method, properties, body):
        # ... 处理消息的逻辑 ...
        ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息

    delivery_tag
    是RabbitMQ分配给每条消息的唯一标识。通过发送确认回执,RabbitMQ就知道这条消息已经被消费者成功处理,可以安全地从队列中删除了。

如果消费者在处理消息过程中崩溃,或者没有发送确认回执,RabbitMQ会认为这条消息没有被成功处理,并在消费者重新连接或有其他消费者可用时,将这条消息重新投递给其他消费者。这保证了消息的“至少一次”投递:消息可能被投递多次,但绝不会丢失。当然,这也意味着你的消费者需要具备幂等性,即多次处理同一条消息不会产生副作用。

你也可以使用

basic_nack
(否定确认)或
basic_reject
来拒绝消息。
basic_nack
更灵活,可以指定是否将消息重新放回队列(
requeue=True/False
),而
basic_reject
通常用于彻底拒绝一条消息,且只能拒绝一条。在实际应用中,根据业务逻辑选择合适的确认方式,是构建健壮消息系统的关键。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

754

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

636

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

758

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

618

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1262

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

577

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

707

2023.08.11

Golang gRPC 服务开发与Protobuf实战
Golang gRPC 服务开发与Protobuf实战

本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

8

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 0.8万人学习

Django 教程
Django 教程

共28课时 | 3.1万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号