消息队列实战:使用 RabbitMQ 实现异步处理

环境准备
开始之前,我们需要安装 RabbitMQ 服务器和 Python 客户端库。本教程使用 RabbitMQ 3.12 和 Python 3.10。最简单的部署方式是使用 Docker,确保你的系统已安装 Docker。
# 使用 Docker 启动 RabbitMQ 容器
# -d 表示后台运行
# -p 15672:15672 映射管理界面端口
# -p 5672:5672 映射 AMQP 协议端口
# --name rabbitmq 为容器命名
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.12-management
启动 RabbitMQ 容器
容器启动后,可以通过浏览器访问 http://localhost:15672 查看管理界面,默认用户名和密码都是 guest。接下来,安装 Python 的 RabbitMQ 客户端库 pika。
# 安装 pika 库,这是 RabbitMQ 官方推荐的 Python 客户端
pip install pika==1.3.2
安装 Python 客户端库
步骤拆解:实现生产者与消费者
我们将创建一个简单的任务队列。生产者发送任务消息,消费者接收并处理。这遵循了 RabbitMQ 的基本工作模式 [来源#1]。首先,创建项目目录结构。
# 创建项目目录
mkdir rabbitmq-tutorial
cd rabbitmq-tutorial
# 创建生产者和消费者文件
touch producer.py consumer.py
创建项目目录结构
现在,编写生产者代码。生产者将连接到 RabbitMQ,声明一个名为 'task_queue' 的队列,并发送一条消息。这里我们使用持久化队列,确保消息在 RabbitMQ 重启后不丢失 [来源#2]。
# producer.py
import pika
import sys
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个持久化队列
# durable=True 确保队列在 RabbitMQ 重启后依然存在
channel.queue_declare(queue='task_queue', durable=True)
# 获取命令行参数作为消息内容
message = ' '.join(sys.argv[1:]) or "Hello World!"
# 发送消息
# delivery_mode=2 使消息持久化
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
))
print(f" [x] Sent '{message}'")
connection.close()
生产者代码 (producer.py)
接下来,编写消费者代码。消费者连接到同一个队列,接收消息并处理。为了确保消息不丢失,我们使用手动消息确认机制。当消费者成功处理消息后,会向 RabbitMQ 发送确认信号 [来源#2]。
# consumer.py
import pika
import time
def callback(ch, method, properties, body):
"""处理接收到的消息"""
print(f" [x] Received {body.decode()}")
# 模拟任务处理耗时
# 消息中包含几个点就休眠几秒
dots = body.count(b'.')
time.sleep(dots)
print(" [x] Done")
# 手动确认消息,告诉 RabbitMQ 消息已处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列,确保队列存在
channel.queue_declare(queue='task_queue', durable=True)
# 设置 QoS,一次只处理一条消息
# 这确保了消费者不会被淹没,也保证了消息能被公平分发
channel.basic_qos(prefetch_count=1)
# 注册回调函数
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消费者代码 (consumer.py)
结果验证
现在,我们来验证整个流程。打开两个终端窗口。
- 在第一个终端启动消费者:运行 `python consumer.py`。你会看到等待消息的提示 `[] Waiting for messages. To exit press CTRL+C`。
- 在第二个终端启动生产者并发送消息:运行 `python producer.py Hello RabbitMQ`。预期输出:`[x] Sent 'Hello RabbitMQ'`。
- 观察第一个终端:你会看到消费者接收到消息并处理,输出 `[x] Received Hello RabbitMQ` 和 `[x] Done`。
- 再次运行生产者发送多条消息:`python producer.py Message.with.dots`。预期输出:`[x] Sent 'Message.with.dots'`。
- 观察第一个终端:消费者会处理这条消息,由于消息中包含三个点,它会模拟耗时 3 秒,然后输出 `[x] Done`。
- 检查 RabbitMQ 管理界面(http://localhost:15672):进入 Queues 标签页,找到 'task_queue' 队列。你会看到队列中消息数量为 0,因为消息已被消费确认。
常见错误与排查
在实践过程中,你可能会遇到以下问题。这里提供排查思路和解决方案。
- 错误 1:连接被拒绝。运行生产者或消费者时,报错 `pika.exceptions.AMQPConnectionError`。原因:RabbitMQ 服务未启动或端口未正确映射。排查:检查 Docker 容器状态 `docker ps`,确认 RabbitMQ 容器正在运行。检查端口映射 `docker port rabbitmq`,确保 5672 端口已映射到宿主机。如果使用非本地连接,需修改连接参数中的主机地址。
- 错误 2:消息未被消费。生产者发送消息后,消费者终端无任何输出。原因:队列名称不匹配或消费者未正确启动。排查:确保生产者和消费者代码中的队列名 `task_queue` 完全一致。确认消费者在发送消息前已启动并处于等待状态。检查 RabbitMQ 管理界面,查看队列中是否有消息积压(Ready 列)。
- 错误 3:消息确认失败。消费者处理消息后,报错 `pika.exceptions.ChannelClosedByBroker`,消息可能被重新投递。原因:消费者在处理消息时发生异常,未调用 `ch.basic_ack`。排查:确保 `callback` 函数中的逻辑能正常执行,且 `ch.basic_ack` 在 `try...except` 块之外或确保异常被捕获后仍能发送确认。对于需要重试的场景,可以考虑使用 `ch.basic_nack`。
通过以上步骤,你已经成功搭建了一个基于 RabbitMQ 的异步任务处理系统。这个系统实现了生产者与消费者的解耦,通过持久化和消息确认机制保证了消息的可靠性。你可以在此基础上扩展,例如增加多个消费者实现负载均衡,或使用不同的交换机类型来实现更复杂的路由逻辑 [来源#1]。
参考链接
- https://www.rabbitmq.com/getstarted.html
- https://www.rabbitmq.com/tutorials/tutorial-one-python.html
阅读剩余
THE END