RabbitMQ 实战指南
# 消息中间件的作用
- 解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。
- 持久化:有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
- 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
- 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
- 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
- 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。(什么是 queue?queue 就是一个顺序的,不管是先进先出,还是先进后出,queue 就是一个有顺序的存在,rabbitmq 也是如此,你先放进去的消息,肯定会被先消费出来,但是有一个关键的前提,只有一个消费者。)
# RabbitMQ 整体架构
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
- Producer:生产者,就是投递消息的一方。
- Consumer:消费者,就是接收消息的一方。
- Broker:消息中间件的服务节点。对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQBroker 看作一台 RabbitMQ 服务器。
- Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。RabbitMQ 中消息都只能存储在队列中,这 一点 和 Katka 这种消息中间件相反 。 Katka 将消息存储在 topic(主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费 。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
- Exchange:交换器。生产者将消息发送到 Exchange,由交换器将消息路由到一个或者多个队列中。
# RabbitMQ 交换机类型
RabbitMQ 常用的交换器类型有 fanout、direct、topic、headers 这四种。
- fanout:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
- direct:direct 类型的交换器路由规则也很简单,它 会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。
- topic:模糊匹配
- RoutingKey 为一个点号
.
分隔的字符串(分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”; - BindingKey 和 RoutingKey 一样也是点号
.
分隔的字符串; - BindingKey 中可以存在两种特殊字符串
*
和#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多规格单词(可以是零个)。
- RoutingKey 为一个点号
- headers:headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
# RabbitMQ 如何保证消息不丢失
- 需要进行持久化设置(durable)
- 保证生产者发送到 Exchange 成功,confirmCallback
- 保存 Exchange 成功路由到 Queue,returnListener
- 消费者成功消费消息进行 ack,建议手动 ack
- 使用镜像队列,单机 broker,单点故障;镜像队列,数据多份更安全
# 持久化
- Exchange 持久化,持久化交换机的元数据(名称、类型、参数......)
- Queue 持久化,持久化元数据及数据相关存储指针(名称、路由键、参数......)
- 消息持久化,消息的
delivery mode
属性设置为 2,这样在 Queue 持久化时才会持久化具体的消息体。每次写磁盘,性能影响。
# 存储机制
持久层是一个逻辑上的概念,实际包含两个部分:队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。rabbit_queue_index
负责维护队列中落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者 ack 等。每个队列都有与之对应的一个 rabbit_queue_index
。rabbit_msg_store
以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。
消息(包括消息体、属性和 headers)可以直接存储在 rabbit_queue_index
中,也可以被保存在 rabbit_msg_store
中。消息大小的界定可以通过 queue_index_embed_msgs_below
来配置,默认大小为 4096 B。当一个消息小于设定的大小阈值时就可以存储在 rabbit_queue_index
中。rabbit_queue_index
中以顺序(文件名从 0 开始累加)的段文件来进行存储,后缀为 .idx
,每个段文件中包含固定的 SEGMENT_ENTRY_COUNT
条记录,SEGMENT_ENTRY_COUNT
默认值为 16384。每个 rabbit_queue_index
从磁盘中读取消息的时候至少要在内存中维护一个段文件,所以设置 queue_index_embed_msgs_below
值的时候要格外谨慎,一点点增大也可能会引起内存爆炸式的增长。
经过 rabbit_msg_store
处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(file_size_limit)后,关闭这个文件再创建一个新的文件以供新的消息写入。文件名(文件后缀是 .rdq
)从 0 开始进行累加,因此文件名最小的文件也是最老的文件。在进行消息的存储时,RabbitMQ 会在 ETS(Erlang Term Storage)表中记录消息在文件中的位置映射(Index)和文件的相关信息(FileSummary)。
消息的删除只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对在文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有 3 个文件存在的情况下)的数据大小的比值超过设置的阈值 GARBAGE_FRACTION
(默认值为 0.5)时才会触发文件合并。
# 发送回调
@Configuration
public class RabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public Exchange exchange() {
Map<String, Object> arguments = new HashMap<>();
return new DirectExchange("myExchange", true, false, arguments);
}
@Bean
public Queue queue() {
Map<String, Object> arguments = new HashMap<>();
return new Queue("myQueue", true, false, false, arguments);
}
@Bean
public Binding binding() {
Map<String, Object> arguments = new HashMap<>();
return new Binding("myQueue", Binding.DestinationType.QUEUE, "myExchange", "myRoutingKey", arguments);
}
@PostConstruct
void init() {
// confirmCallback 消息发送到 Exchange 回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息发送到 Exchange 成功
System.out.println(Thread.currentThread() + "Success to send message to Exchange");
} else {
// 消息发送到 Exchange 失败
System.out.println("Fail to send message to Exchange: " + cause);
}
});
// returnsCallback Exchange 无法路由到 Queue 的回调
rabbitTemplate.setReturnsCallback((returnCallback) -> {
System.out.println(Thread.currentThread() + "Message returned with code: " + returnCallback.getReplyCode()
+ ", text: " + returnCallback.getReplyText()
+ ", exchange: " + returnCallback.getExchange()
+ ", routingKey: " + returnCallback.getRoutingKey());
});
}
}
MessageProperties properties = new MessageProperties();
// 消息持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message2Send = new Message("hello rabbitmq".getBytes(StandardCharsets.UTF_8), properties);
// 路由成功
rabbitTemplate.send("myExchange", "myRoutingKey", message2Send);
// 路由失败
rabbitTemplate.send("myExchange", "myRoutingKey222", message2Send);
spring:
rabbitmq:
username: guest
password: guest
port: 5672
publisher-confirm-type: correlated
# 开启发送到 exchange 回调
publisher-returns: true
# 消息成功路由回调
template:
mandatory: true
listener:
simple:
prefetch: 10
# 镜像队列
如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的 durable
属性也设置为 true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘并执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisher confirm 机制能够确保客户端知道哪些消息已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列(以下简称镜像队列)都包含一个主节点(master)和若干个从节点(slave)。
slave 会准确地按照 master 执行命令的顺序进行动作,故 slave 与 master 上维护的状态应该是相同的。如果 master 由于某种原因失效,那么“资历最老”的 slave 会被提升为新的 master。根据 slave 加入的时间排序,时间最长的 slave 即为“资历最老”。发送到镜像队列的所有消息会被同时发往 master 和所有的 slave 上,如果此时 master 挂掉了,消息还会在 slave 上,这样 slave 提升为 master 的时候消息也不会丢失。
# 备份交换机
如何不设置路由失败回调,消息丢失无法知道。可以使用备份交换机来替代路由失败回调,即路由失败,将消息发送到备份交换机,备份交换机,备份交换机的处理流程和正常交换机一样。
只需要声明路由失败后需要路由的交换机 alternate-exchange
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "myAE");
return new DirectExchange("myExchange", true, false, arguments);
# 订单定时取消
TTL + 死信队列
目前有两种方法可以设置消息的 TTL 。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。 如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时一旦超过设置 TTL 值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息。
对于第一种设置队列属性的方法,一旦消息过期,就会从队列中抹去。
而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。
为什么这两种方法处理的方式不一样?
- 因为第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。
- 而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
通过队列属性设置消息 TTL 的方法是在
channel.queueDeclare
方法中加入x-message-ttl
参数实现的,这个参数的单位是毫秒。
Time-To-Live and Expiration | RabbitMQ (opens new window) 仲裁队列 当死信过期消息到达队列头部时,仲裁将它们排队。
经典队列 经典在几种情况下对死信过期消息进行队列:
- 当消息到达队列头部时
- 当队列收到影响其的策略更改通知时
追溯应用每条消息的 TTL(应用于现有队列) 追溯应用了每条消息 TTL 的队列(当它们已经有消息时)将在发生特定事件时丢弃消息。
只有当过期的消息到达队列的头部时,它们才会真正被丢弃(标记为删除)。消费者不会收到过期的消息。请记住,消息过期和消费者交付之间可能存在自然竞争条件,例如:消息在写入套接字之后但在到达使用者之前可能会过期。
设置每条消息的 TTL 时,过期的消息可以在未过期的消息后面排队,直到后者被消耗或过期。因此,此类过期消息所使用的资源将不会被释放,并且它们将被计入队列统计信息(例如队列中的消息数)。 当追溯应用每条消息的 TTL 策略时,建议让消费者在线以确保更快地丢弃消息。 鉴于现有队列上每消息 TTL 设置的这种行为,当需要删除消息以释放资源时,应改用队列 TTL(或队列清除或队列删除)。
即:对每条消息设置 TTL,最先过期的消息并不会最先消费,只有在队列头的被消费了,后面的才会被消费。
2 条消息,设置为 6s 和 1s 过期,先投递 6s,再投递 1s
public void test() {
MessageProperties properties1 = MessagePropertiesBuilder.newInstance().setExpiration("6000").build();
Message message1 = new Message((LocalDateTime.now() + "hello6").getBytes(), properties1);
rabbitTemplate.send("directExchange", "directRoutingKey", message1);
MessageProperties properties2 = MessagePropertiesBuilder.newInstance().setExpiration("1000").build();
Message message2 = new Message((LocalDateTime.now() + "hello1").getBytes(), properties2);
rabbitTemplate.send("directExchange", "directRoutingKey", message2);
}
查看 listener 的输出,1s 的等到 6s 的到期了才会被消费,所以不推荐在消息上设置 TTL,而是在队列上设置统一的 TTL。
2024-03-03T16:27:01.713received:2024-03-03T16:26:55.648hello6
2024-03-03T16:27:01.714received:2024-03-03T16:26:55.685hello1
TTL(time to live)可以给队列和消息设置过期时间。同时存在,以最小时间为过期时间。
Map<String, Object> arguments = new HashMap<>();
// 队列中的消息存活时间
arguments.put("x-message-ttl", 3000);
return new Queue("myQueue", true, false, false, arguments);
MessageProperties properties = new MessageProperties();
// 消息持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 消息设置过期时间
properties.setExpiration("3000");
Message message2Send = new Message("hello rabbitmq".getBytes(StandardCharsets.UTF_8), properties);
// 路由成功
rabbitTemplate.send("myExchange", "myRoutingKey", message2Send)
消息到期,队列长度达到上限,消息被拒绝会路由到死信交换机。
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 3000);
// 死信交换机
arguments.put("x-dead-letter-exchange", "dlx_exchange");
return new Queue("myQueue", true, false, false, arguments);
# 消息积压
积压 1kw 条消息,发现是消费者异常下线导致,如何解决? 能否直接启动消费者,需要根据 ack 机制和 qos 数量来保证。
- 没有 ack 的话消息就一直堆积在消费者哪里,占用 CPU 和 内存。
- 需要设置 qos 数量,来限制消费者同时处理为 ack 的数量。客户端最多接收未被 ack 的消息的个数。
- 扩容。
spring:
rabbitmq:
username: guest
password: guest
port: 5672
publisher-confirm-type: correlated
# 开启发送到 exchange 回调
publisher-returns: true
# 消息成功路由回调
template:
mandatory: true
listener:
simple:
prefetch: 10
# 顺序消费
- 想要顺序必须单点,保证队列只有一个消费者。
- 业务上进行数据细分,投递到多个不同分类的 queue,每个 queue 对应单独的消费者。
- 加锁。
# 重复消费
根据业务 id 幂等处理