消息队列与事件驱动
What — 是什么
消息队列是异步通信的核心基础设施,生产者发送消息到队列,消费者异步处理。事件驱动架构基于消息的发布/订阅模式,实现服务间松耦合。
核心概念:
- RabbitMQ:功能最全的消息代理,支持多种交换机类型、消息确认、死信队列
- Kafka:分布式事件流平台,高吞吐、持久化、支持回放,适合日志和数据管道
- 发布/订阅(Pub/Sub):生产者发布消息到 Topic/Exchange,多个消费者订阅
- 消息确认(ACK):消费者处理完消息后确认,未确认则重投递
- 死信队列(DLX):处理失败的消息被路由到死信队列,待人工处理或重试
- 背压(Backpressure):消费者处理不过来时通知生产者减速
关键特性:
- RabbitMQ 支持精确路由(direct/topic/fanout/headers 交换机)
- Kafka 支持消息回放和分区并行消费
- 消息持久化确保不丢失
- 事件驱动架构实现服务间异步解耦
Why — 为什么
适用场景:
- 异步任务:发送邮件/生成报表/图片处理
- 服务解耦:订单服务发消息,库存/通知/积分服务各自动处理
- 流量削峰:高并发请求先入队列,消费者按能力处理
- 事件溯源:所有状态变更以事件形式记录
对比方案:
| 维度 | RabbitMQ | Kafka | Redis Pub/Sub |
|---|---|---|---|
| 模式 | 消息代理 | 事件流 | 发布订阅 |
| 持久化 | 支持 | 强支持 | 不持久化 |
| 回放 | 不支持 | 支持 | 不支持 |
| 吞吐量 | 万级 | 百万级 | 万级 |
| 适用 | 任务队列/RPC | 数据管道/事件流 | 简单通知 |
How — 怎么用
代码示例
// RabbitMQ(amqplib)
const amqp = require('amqplib');
async function startConsumer() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue('task_queue', { durable: true });
ch.prefetch(1); // 每次只取1条,处理完再取
ch.consume('task_queue', async (msg) => {
try {
const task = JSON.parse(msg.content.toString());
await processTask(task);
ch.ack(msg); // 确认处理成功
} catch (err) {
ch.nack(msg, false, false); // 处理失败,不入队(进死信队列)
}
});
}
async function publish(queue, message) {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue(queue, { durable: true });
ch.sendToQueue(queue, Buffer.from(JSON.stringify(message)), { persistent: true });
}
// 死信队列配置
await ch.assertExchange('dlx', 'direct');
await ch.assertQueue('tasks.dlq', { durable: true });
await ch.bindQueue('tasks.dlq', 'dlx', 'task_failed');
await ch.assertQueue('tasks', {
durable: true,
arguments: { 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'task_failed' }
});
常见问题与踩坑
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 消息丢失 | 未持久化+未确认 | durable + persistent + ack |
| 消息重复消费 | 网络抖动导致重复投递 | 幂等消费(唯一ID去重) |
| 队列积压 | 消费速度慢于生产 | 增加消费者 + prefetch 调优 |
| 消费者崩溃消息丢失 | 自动确认后崩溃 | 手动确认(noAck: false) |
最佳实践
- 生产者发送持久化消息,消费者手动 ACK
- 消费逻辑幂等设计(唯一 ID 去重)
- 设置 prefetch 控制消费速率
- 死信队列处理失败消息
- 监控队列深度和消费延迟
面试题
Q1: 如何保证消息不丢失?
三个环节保障:① 生产者确认——
publisher confirms确保消息到达队列;② 队列持久化——durable: true队列 +persistent: true消息;③ 消费者手动 ACK——处理完才确认,异常则 nack 重回队列或进死信队列。Kafka 额外需要:acks=all(所有副本确认)、min.insync.replicas=2、消费者手动提交 offset。
Q2: RabbitMQ 和 Kafka 如何选择?
RabbitMQ:任务队列、RPC、路由复杂(多种 Exchange)、消息需要精确路由和优先级、中小吞吐量(万级/秒)。Kafka:事件流、数据管道、日志收集、高吞吐量(百万级/秒)、需要消息回放、消费者需要重新消费历史数据。简单规则:需要路由和任务分发选 RabbitMQ,需要高吞吐和事件流选 Kafka。
相关链接: