消息队列实战:使用 RabbitMQ 实现异步处理

配图:标题:消息队列实战:使用 RabbitMQ 实现异步处理;副标题:从零搭

环境准备

开始之前,我们需要安装 RabbitMQ 服务器和 Python 客户端库。本教程使用 RabbitMQ 3.12 和 Python 3.10。最简单的部署方式是使用 Docker,确保你的系统已安装 Docker。

# 使用 Docker 启动 RabbitMQ 容器
# -d 表示后台运行
# -p 15672:15672 映射管理界面端口
# -p 5672:5672 映射 AMQP 协议端口
# --name rabbitmq 为容器命名

docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.12-management

启动 RabbitMQ 容器

容器启动后,可以通过浏览器访问 http://localhost:15672 查看管理界面,默认用户名和密码都是 guest。接下来,安装 Python 的 RabbitMQ 客户端库 pika。

# 安装 pika 库,这是 RabbitMQ 官方推荐的 Python 客户端
pip install pika==1.3.2

安装 Python 客户端库

步骤拆解:实现生产者与消费者

我们将创建一个简单的任务队列。生产者发送任务消息,消费者接收并处理。这遵循了 RabbitMQ 的基本工作模式 [来源#1]。首先,创建项目目录结构。

# 创建项目目录
mkdir rabbitmq-tutorial
cd rabbitmq-tutorial

# 创建生产者和消费者文件
touch producer.py consumer.py

创建项目目录结构

现在,编写生产者代码。生产者将连接到 RabbitMQ,声明一个名为 'task_queue' 的队列,并发送一条消息。这里我们使用持久化队列,确保消息在 RabbitMQ 重启后不丢失 [来源#2]。

# producer.py
import pika
import sys

# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个持久化队列
# durable=True 确保队列在 RabbitMQ 重启后依然存在
channel.queue_declare(queue='task_queue', durable=True)

# 获取命令行参数作为消息内容
message = ' '.join(sys.argv[1:]) or "Hello World!"

# 发送消息
# delivery_mode=2 使消息持久化
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 使消息持久化
    ))

print(f" [x] Sent '{message}'")
connection.close()

生产者代码 (producer.py)

接下来,编写消费者代码。消费者连接到同一个队列,接收消息并处理。为了确保消息不丢失,我们使用手动消息确认机制。当消费者成功处理消息后,会向 RabbitMQ 发送确认信号 [来源#2]。

# consumer.py
import pika
import time

def callback(ch, method, properties, body):
    """处理接收到的消息"""
    print(f" [x] Received {body.decode()}")
    
    # 模拟任务处理耗时
    # 消息中包含几个点就休眠几秒
    dots = body.count(b'.')
    time.sleep(dots)
    
    print(" [x] Done")
    # 手动确认消息,告诉 RabbitMQ 消息已处理完成
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列,确保队列存在
channel.queue_declare(queue='task_queue', durable=True)

# 设置 QoS,一次只处理一条消息
# 这确保了消费者不会被淹没,也保证了消息能被公平分发
channel.basic_qos(prefetch_count=1)

# 注册回调函数
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者代码 (consumer.py)

结果验证

现在,我们来验证整个流程。打开两个终端窗口。

  • 在第一个终端启动消费者:运行 `python consumer.py`。你会看到等待消息的提示 `[] Waiting for messages. To exit press CTRL+C`。
  • 在第二个终端启动生产者并发送消息:运行 `python producer.py Hello RabbitMQ`。预期输出:`[x] Sent 'Hello RabbitMQ'`。
  • 观察第一个终端:你会看到消费者接收到消息并处理,输出 `[x] Received Hello RabbitMQ` 和 `[x] Done`。
  • 再次运行生产者发送多条消息:`python producer.py Message.with.dots`。预期输出:`[x] Sent 'Message.with.dots'`。
  • 观察第一个终端:消费者会处理这条消息,由于消息中包含三个点,它会模拟耗时 3 秒,然后输出 `[x] Done`。
  • 检查 RabbitMQ 管理界面(http://localhost:15672):进入 Queues 标签页,找到 'task_queue' 队列。你会看到队列中消息数量为 0,因为消息已被消费确认。

常见错误与排查

在实践过程中,你可能会遇到以下问题。这里提供排查思路和解决方案。

  • 错误 1:连接被拒绝。运行生产者或消费者时,报错 `pika.exceptions.AMQPConnectionError`。原因:RabbitMQ 服务未启动或端口未正确映射。排查:检查 Docker 容器状态 `docker ps`,确认 RabbitMQ 容器正在运行。检查端口映射 `docker port rabbitmq`,确保 5672 端口已映射到宿主机。如果使用非本地连接,需修改连接参数中的主机地址。
  • 错误 2:消息未被消费。生产者发送消息后,消费者终端无任何输出。原因:队列名称不匹配或消费者未正确启动。排查:确保生产者和消费者代码中的队列名 `task_queue` 完全一致。确认消费者在发送消息前已启动并处于等待状态。检查 RabbitMQ 管理界面,查看队列中是否有消息积压(Ready 列)。
  • 错误 3:消息确认失败。消费者处理消息后,报错 `pika.exceptions.ChannelClosedByBroker`,消息可能被重新投递。原因:消费者在处理消息时发生异常,未调用 `ch.basic_ack`。排查:确保 `callback` 函数中的逻辑能正常执行,且 `ch.basic_ack` 在 `try...except` 块之外或确保异常被捕获后仍能发送确认。对于需要重试的场景,可以考虑使用 `ch.basic_nack`。

通过以上步骤,你已经成功搭建了一个基于 RabbitMQ 的异步任务处理系统。这个系统实现了生产者与消费者的解耦,通过持久化和消息确认机制保证了消息的可靠性。你可以在此基础上扩展,例如增加多个消费者实现负载均衡,或使用不同的交换机类型来实现更复杂的路由逻辑 [来源#1]。

参考链接

阅读剩余
THE END