消息队列与事件驱动

What — 是什么

消息队列是异步通信的核心基础设施,生产者发送消息到队列,消费者异步处理。事件驱动架构基于消息的发布/订阅模式,实现服务间松耦合。

核心概念:

  • RabbitMQ:功能最全的消息代理,支持多种交换机类型、消息确认、死信队列
  • Kafka:分布式事件流平台,高吞吐、持久化、支持回放,适合日志和数据管道
  • 发布/订阅(Pub/Sub):生产者发布消息到 Topic/Exchange,多个消费者订阅
  • 消息确认(ACK):消费者处理完消息后确认,未确认则重投递
  • 死信队列(DLX):处理失败的消息被路由到死信队列,待人工处理或重试
  • 背压(Backpressure):消费者处理不过来时通知生产者减速

关键特性:

  • RabbitMQ 支持精确路由(direct/topic/fanout/headers 交换机)
  • Kafka 支持消息回放和分区并行消费
  • 消息持久化确保不丢失
  • 事件驱动架构实现服务间异步解耦

Why — 为什么

适用场景:

  • 异步任务:发送邮件/生成报表/图片处理
  • 服务解耦:订单服务发消息,库存/通知/积分服务各自动处理
  • 流量削峰:高并发请求先入队列,消费者按能力处理
  • 事件溯源:所有状态变更以事件形式记录

对比方案:

维度RabbitMQKafkaRedis Pub/Sub
模式消息代理事件流发布订阅
持久化支持强支持不持久化
回放不支持支持不支持
吞吐量万级百万级万级
适用任务队列/RPC数据管道/事件流简单通知

How — 怎么用

代码示例

// RabbitMQ(amqplib)
const amqp = require('amqplib');

async function startConsumer() {
    const conn = await amqp.connect('amqp://localhost');
    const ch = await conn.createChannel();
    await ch.assertQueue('task_queue', { durable: true });
    ch.prefetch(1); // 每次只取1条,处理完再取

    ch.consume('task_queue', async (msg) => {
        try {
            const task = JSON.parse(msg.content.toString());
            await processTask(task);
            ch.ack(msg); // 确认处理成功
        } catch (err) {
            ch.nack(msg, false, false); // 处理失败,不入队(进死信队列)
        }
    });
}

async function publish(queue, message) {
    const conn = await amqp.connect('amqp://localhost');
    const ch = await conn.createChannel();
    await ch.assertQueue(queue, { durable: true });
    ch.sendToQueue(queue, Buffer.from(JSON.stringify(message)), { persistent: true });
}

// 死信队列配置
await ch.assertExchange('dlx', 'direct');
await ch.assertQueue('tasks.dlq', { durable: true });
await ch.bindQueue('tasks.dlq', 'dlx', 'task_failed');
await ch.assertQueue('tasks', {
    durable: true,
    arguments: { 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'task_failed' }
});

常见问题与踩坑

问题原因解决方案
消息丢失未持久化+未确认durable + persistent + ack
消息重复消费网络抖动导致重复投递幂等消费(唯一ID去重)
队列积压消费速度慢于生产增加消费者 + prefetch 调优
消费者崩溃消息丢失自动确认后崩溃手动确认(noAck: false)

最佳实践

  • 生产者发送持久化消息,消费者手动 ACK
  • 消费逻辑幂等设计(唯一 ID 去重)
  • 设置 prefetch 控制消费速率
  • 死信队列处理失败消息
  • 监控队列深度和消费延迟

面试题

Q1: 如何保证消息不丢失?

三个环节保障:① 生产者确认——publisher confirms 确保消息到达队列;② 队列持久化——durable: true 队列 + persistent: true 消息;③ 消费者手动 ACK——处理完才确认,异常则 nack 重回队列或进死信队列。Kafka 额外需要:acks=all(所有副本确认)、min.insync.replicas=2、消费者手动提交 offset。

Q2: RabbitMQ 和 Kafka 如何选择?

RabbitMQ:任务队列、RPC、路由复杂(多种 Exchange)、消息需要精确路由和优先级、中小吞吐量(万级/秒)。Kafka:事件流、数据管道、日志收集、高吞吐量(百万级/秒)、需要消息回放、消费者需要重新消费历史数据。简单规则:需要路由和任务分发选 RabbitMQ,需要高吞吐和事件流选 Kafka。


相关链接: