RabbitMQ笔记
# AMQP协议
- Broker:接受和分发消息的应用,RabbitMQ就算Message Broker
- Virtual Host:虚拟Broker,将多个单元隔离开
- Connection:publisher/consumer和broker之间的TCP连接(物理连接)
- Channel:connection内部建立的逻辑连接,通常每个线程创建单独的channel
- Routing Key:路由键,用来指示消息的路由转发,相当于快递的地址
- Exchange:交换机,相当于快递的分拨中心
- Queue:消息队列,消息最终被送到这里等待consumer取走
- Binding:exchange和queue之间的虚拟连接,用于message的分发依据
# Exchange的作用
- Exchange是AMQP协议和RabbitMQ的核心组件
- Exchange的功能是根据绑定关系和路由键为消息提供路由,将消息转发至相应的队列
- Exchange有4中类型:Direct/Topic/Fanout/Headers,其中header很少使用,以前三种为主
http://tryrabbitmq.com/ (opens new window)
# Direct Exchange
Message中的Routing Key如果和Binding Key一致,Direct Exchange就将message发送到对应的queue中
# Fanout Exchange
每个发到Fanout Exchange的message都会被分发到所有绑定的queue上(和Binding Key无关,只要有Queue就发送)
# Topic Exchange
根据Routing Key及统配规则,Topic Exchange将消息分发到目标Queue中
- 全匹配:和Direct类似
- Binding Key中的
#
:匹配任意个数的word - Binding Key中的
*
:匹配任意一个word
比如:咖啡、奶茶、果汁,三种饮料
- 咖啡含有咖啡因,冷热均可,苦甜均可,Binding Key:
caffeine.#
- 奶茶不含有咖啡因,热饮,甜味,Binding Key:
nocaffeine.hot.sweet
- 果汁不含有咖啡因,冷热均可,甜味,Binding Key:
nocaffeine.*.sweet
若条件是:无咖啡因、热饮、甜味
milktea队列和juice队列可以收到消息
若条件是:有咖啡因、冷饮、苦味
只有coffer队列可以收到消息
若条件是:无咖啡因、冷饮、甜味
只有juice队列可以收到消息
# RabbitMQ命令行
sbin目录下
rabbitmqclt --help
- 想看什么就list什么
- 想清空什么就purge什么
- 想删除什么就delete什么
- 有问题使用--help
rabbitmqctl stop_app:关闭应用
rabbitmqctl start_app:启动应用
rabbitmqctl status:节点状态
rabbitmqctl add_user username password:添加用户
rabbitmqctl list_users:列出所有用户
rabbitmqctl delete_user username:删除用户
rabbitmqctl clear_permissions -p vhostpath username:清除用户权限
rabbitmqctl list_user_permissions username:列出用户权限
rabbitmqctl change_password username password:修改密码
rabbitmqctl set_permissions -p vhostpath username "." "." ".*":设置用户权限
rabbitmqctl add_vhostpath vhostpath:创建虚拟主机
rabbitmqctl list_vhosts:列出所有虚拟主机
rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限
rabbitmqctl list_queues:查看所有队列信息
rabbitmqctl -p vhostpath purge_queue queue:清除队列里的消息
rabbitmqctl reset:移除所有数据
rabbitmqctl join_cluster <clusternode> [--ram]:组成集群命令
rabbitmqctl change_cluster_node_type disc | ram:修改集群节点的存储形式
rabbitmqctl forget_cluster_node [--offline]:忘记节点(摘除节点)
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]:修改节点名称
# RabbitMQ高性能原因
- Erlang进行间上下文切换效率远高于C和Java,进一步提高了RabbitMQ的并发性能
- Erlang的网络性能有着和原生Socker一样的延迟,使得RabbitMQ的网络IO性能极高
# RabbitMQ特性
- 发送端确认机制
- 消息返回直接
- 消费端限流机制
- 消费端确认机制
- 消息过期机制
- 死信队列
# 如何保证消息的可靠性
发送方
- 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
- 需要使用RabbitMQ消息返回机制,若发现没目标队列,中间件会通知发送方
消费方
- 需要使用RabbitMQ消费端确认机制,确认处理消息没有发送异常
- 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定
RabbitMQ自身
- 大量堆积的消息会给RabbitMQ产生很大压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
- 过期后会被直接丢弃,无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期信息,以供分析
# 发送端确认机制
即:消息发送后,若中间件收到消息,会给发送端一个应答,生产者接受应答,用来确认这条消息是否正常发送到中间件
三种确认机制
- 单条同步确认 配置channel,开启确认模式:channel.confirmSelect() 每发送一条消息,立即调用channel.waitForConfirms()方法,等待确认
channel.confirmSelect();
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant","key.restaurant",null,messageToSend.getBytes(StandardCharsets.UTF_8));
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}
- 多条同步确认 配置channel,开启确认模式:channel.confirmSelect() 发送多条消息后,调用channel.waitForConfirms()方法,等待确认
channel.confirmSelect();
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
for (int i = 0; i < 3; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes(StandardCharsets.UTF_8));
}
// 如果其中有一条失败就返回false,但是我们无法知道是哪条消息失败
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}
- 异步确认 配置channel,开启确认模式:channel.confirmSelect() 在channel上添加监听:addConfirmListener(),发送消息后,会回调此方法,通知是否发送成功 异步确认有可能是单挑,也有可能 注意这里的deliveryTag不是唯一的,可能会重复,不建议使用异步确认
channel.confirmSelect();
ConfirmListener confirmListener = new ConfirmListener() {
/** 确认成功
* @param deliveryTag 同一线程发送的消息tag 依次递增,不同线程间tag可能相同
* @param multiple 是否确认多条
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("Ack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("Nack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
}
};
// 添加确认监听
channel.addConfirmListener(confirmListener);
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes(StandardCharsets.UTF_8));
}
// 防止发送完,直接退出
Thread.sleep(10000);
2021-12-18 18:48:30.857 INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl : Ack,deliveryTag:2,multiple:true
2021-12-18 18:48:30.857 INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl : Ack,deliveryTag:3,multiple:false
2021-12-18 18:48:30.858 INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl : Ack,deliveryTag:4,multiple:false
2021-12-18 18:48:30.858 INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl : Ack,deliveryTag:10,multiple:true
# 消息返回机制
即:消息发送后,中间件会对消息进行路由,若没有发现目标队列,中间件会通知发送方,ReturnListener会被调用
开启消息返回机制
在RabbitMQ基础配置中有一个关键配置项:Mandatory
- false,RabbitMQ直接丢弃无法路由的消息
- true,RabbitMQ才会处理无法路由的消息
channel.addReturnListener(returnMessage -> {
log.error(String.valueOf(returnMessage.getReplyCode()));
log.error(returnMessage.getReplyText());
log.error(returnMessage.getExchange());
log.error(returnMessage.getRoutingKey());
log.error(returnMessage.getProperties().toString());
log.error(new String(returnMessage.getBody()));
});
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 发送到订单微服务 订单微服务收到数据解析后进行后续操作
channel.basicPublish("exchange.order.restaurant", "key.order",true, null, messageToSend.getBytes());
Thread.sleep(10000);
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : 312
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : NO_ROUTE
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : exchange.order.restaurant
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : key.order
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService : {"productId":2,"orderId":433,"confirmed":true,"accountId":2,"price":23.25}
# 消费端确认机制
//声明消费端 autoAck设置为false
channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
});
// 具体消费逻辑
DeliverCallback deliverCallback = (consumerTag, message) -> {
/*
* 注意这个channel是声明时的channel(获取消息的channel)
* 传入要确认的tag
* 是否为确认多条
* */
// 如果声明时autoAck为false,且不进行手动确认,会在服务结束时,重新回到ready状态
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
/*
* 确认失败,最后一个参数为是否重回队列
* 如果为true,RabbitMQ回重新将消息进行投递,然后消费端由于确认失败,一致这样,死循环,不建议使用
* */
//channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
}
# 消费端限流机制
RabbitMQ开发了QoS(服务质量保证)功能,QoS功能保证了在一定数量的消息未被确认前,不消费新的消息。
Qos的功能前提是不使用自动确认
参数设置
- prefetchCount:针对一个消费端最多推送多少未确认消息
- global:true针对整个消费端限流;false针对当前channel
- prefetchSize:0(单个消息大小限制,一般为0)
global和prefetchSize,RabbitMQ暂时未实现
// 在队列声明时就进行设置
channel.basicQos(2);
channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());
每次只能处理2个消息,其他消息未ready状态,此时可以横向扩展channel,其他channel可以处理ready的消息;但是如果不设置Qos,消息就是Unacked状态,就是消息送到channel了,但是没有进行确认,只有收到消息的channel可以处理,其他channel不能帮忙处理。
# 消息过期机制
RabbitMQ的过期时间分为消息TTL和队列TTL
- 消息TTL设置了单条消息的过期时间
- 队列TTL设置了队列中所有消息的过期时间
TTL应明显长于服务的平均重启时间,防止服务重启消息丢失
建议TTL长于业务高峰期时间
单条消息设置过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes(StandardCharsets.UTF_8));
队列中的消息设置过期时间
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args);
# 死信队列
即:队列被配置了DLX属性(Dead-Letter-Exchange),当一个消息变成死信(dead message)后,能重新被发布到另外一个Exchange,这个Exchange也是一个普通交换机,死信被死信交换机路由后,一般进入一个固定队列
怎么才能变成死信
- 消息被拒绝(reject/nack),并且requeue=false
- 消息过期(TTL到期)
- 队列达到最大长度
死信队列设置方法
设置转发、接受死信的交换机和队列
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
在需要设置死信的队列加入参数
- x-dead-letter-exchange = dlx.exchange
// 设置转发、接受死信的交换机和队列
channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.TOPIC, true, false, null);
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "#");
// 在需要设置死信的队列加入参数
Map<String, Object> args = new HashMap<>(16);
// 队列中消息过期时间
args.put("x-message-ttl", 15000);
// 队列最大长度
args.put("x-max-length",5)
// 死信投递的地方
args.put("x-dead-letter-exchange", "exchange.dlx");
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args);
// 消息消费,消息被拒绝(reject/nack),并且requeue=false
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);
# RabbitAdmin
手动创建和绑定
@Autowired
public void initRabbitMQ() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
RabbitAdmin admin = new RabbitAdmin(factory);
/*-----------restaurant--------------*/
Exchange exchange = new DirectExchange("exchange.order.restaurant");
admin.declareExchange(exchange);
Queue queue = new Queue("queue.order");
admin.declareQueue(queue);
Binding binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.restaurant", "key.order", null);
admin.declareBinding(binding);
/*-----------deliveryman--------------*/
exchange = new DirectExchange("exchange.order.deliveryman");
admin.declareExchange(exchange);
binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.deliveryman", "key.order", null);
admin.declareBinding(binding);
/*-----------settlement--------------*/
exchange = new FanoutExchange("exchange.settlement.order");
admin.declareExchange(exchange);
binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.settlement.order", "key.order", null);
admin.declareBinding(binding);
/*-----------reward--------------*/
exchange = new TopicExchange("exchange.order.reward");
admin.declareExchange(exchange);
binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.reward", "key.order", null);
admin.declareBinding(binding);
}
Spring自动创建和绑定
/*---------------------restaurant---------------------*/
@Bean
public Exchange exchange1() {
return new DirectExchange("exchange.order.restaurant");
}
@Bean
public Queue queue1() {
return new Queue("queue.order");
}
@Bean
public Binding binding1() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.restaurant",
"key.order",
null);
}
/*---------------------deliveryman---------------------*/
@Bean
public Exchange exchange2() {
return new DirectExchange("exchange.order.deliveryman");
}
@Bean
public Binding binding2() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.deliveryman",
"key.order",
null);
}
/*---------settlement---------*/
@Bean
public Exchange exchange4() {
return new FanoutExchange("exchange.settlement.order");
}
@Bean
public Binding binding3() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.settlement.order",
"key.order",
null);
}
/*--------------reward----------------*/
@Bean
public Exchange exchange5() {
return new TopicExchange("exchange.order.reward");
}
@Bean
public Binding binding4() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.reward",
"key.order",
null);
}
/**
* 这里connectionFactory spring会自动在容器中寻找并注入
* @param connectionFactory
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
/*
* 由于RabbitAdmin是懒加载
* 第一次操作时,才会执行自动将容器中Exchange、Queue进行Bing
* 所以我们需要在启动时就操作一下,让spring自动帮我们声明和绑定
* */
factory.createConnection();
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
// 是否自动将容器中Exchange、Queue进行Bing,默认为true
// admin.setAutoStartup(true);
return admin;
}
# 源码简单分析
RabbitAdmin实现了InitializingBean接口
当所有的bean被注入完成后,会执行此方法
RabbitAdmin重写此方法
private volatile boolean running = false;
private boolean autoStartup = true;
只有这两条件都满足才会自动声明和绑定,如果不希望spring帮我们操作,可以设置autoStartup为false
从spring容器中获取所有的Exchange、Queue、Binding
最后再执行
# RabbitTemplate
# 发送消息
注入RabbitTemplate的连接
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 消息属性
MessageProperties properties = new MessageProperties();
properties.setExpiration("15000");
Message message = new Message(messageToSend.getBytes(StandardCharsets.UTF_8), properties);
// 发送消息,格式为RabbitTemplate指定的Message,可以配置消息属性
rabbitTemplate.send("exchange.order.restaurant","key.restaurant",message);
// 发送消息,格式为Object,RabbitTemplate会进行转换
rabbitTemplate.convertAndSend("exchange.order.restaurant","key.restaurant",messageToSend);
# 源码简单分析
底层还是调用channel的basicPublish
①send方法重载,execute()
方法,传入lambda表达式
②调用doExecute(action, connectionFactory)
方法,这个action就我们①的lambda表达式,即
channel->{doSend(......);return null;}
③doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory)
传入lambda表达式和connection连接,前面都是判断有没有连接,有连接怎么做,没连接怎么做;后面调用invokeAction(action, connectionFactory, channel);
④invokeAction
直接返回action.doInRabbit(channel)
⑤doInRabbit
即我们最开始①的lambda表达式
上面的execute()
只是包装了一层,具体的执行方法还是我们传入的lambda方法,他只是帮我判断需不需要重试和帮我们管理连接
# 改造发送端确认和消息返回
添加配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 开启发送端返回和确认类型
factory.setPublisherReturns(true);
// 使用CORRELATED类型可以携带消息属性
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
/*
* 由于RabbitAdmin是懒加载
* 第一次操作时,才会执行自动将容器中Exchange、Queue进行Bing
* 所以我们需要在启动时就操作一下,让spring自动帮我们声明和绑定
* */
factory.createConnection();
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 只有设置了returnCallback setMandatory才有用
rabbitTemplate.setMandatory(true);
/*没有被成功路由*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);
});
/*成功发送到中间件*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{}, ack:{}, cause:{}",correlationData, ack, cause);
});
return rabbitTemplate;
}
发送消息
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 消息属性
MessageProperties properties = new MessageProperties();
properties.setExpiration("15000");
Message message = new Message(messageToSend.getBytes(StandardCharsets.UTF_8), properties);
// 设置相关数据
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderMessageDTO.getOrderId().toString());
// 发送消息,格式为RabbitTemplate指定的Message,可以配置消息属性
rabbitTemplate.send("exchange.order.restaurant","key.restaurant",message,correlationData);
消息确认携带传入的id数据
2021-12-19 16:05:44.541 INFO 40672 --- [nectionFactory1] c.s.o.config.RabbitConfig : correlationData:CorrelationData [id=447], ack:true, cause:null
# SimpleMessageListenerContainer
@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("queue.order");
// 并发消费者数量,如果设置为多个就不能保证排序
container.setConcurrentConsumers(3);
// 最大并发消费数量
container.setMaxConcurrentConsumers(5);
// 设置确认模式 自动确认
// container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// container.setMessageListener(message -> {
// log.info("message:{}",message); // });
// 设置限流1个
container.setPrefetchCount(1);
// 手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 需要传入带有channel的listener container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("message:{}",message); });
return container;
}
# MessageListenerAdapter
修改业务代码方法为public void handleMessage(byte[] messageBody)
我们这个是配置类,里面直接写业务不太优雅,这里使用MessageListenerAdapter
的setDelegate
来设置具体的执行方法
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("queue.order");
// 并发消费者数量,如果设置为多个就不能保证排序 container.setConcurrentConsumers(3);
// 最大并发消费数量 container.setMaxConcurrentConsumers(5);
// 设置确认模式 自动确认 container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// container.setMessageListener(message -> {
// log.info("message:{}",message);
// });
// 设置限流1个
container.setPrefetchCount(1);
// 手动确认
// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//
// 需要传入带有channel的listener
// container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); // log.info("message:{}",message);
// });
MessageListenerAdapter adapter = new MessageListenerAdapter();
// 设置业务类,默认调用方法handleMessage
adapter.setDelegate(orderMessageService);
container.setMessageListener(adapter);
return container;
}
# 源码简单分析
① MessageListenerAdapter
间接实现了ChannelAwareMessageListener
接口,所以 container.setMessageListener(adapter)
可以直接传入
②我们之前使用的setMessageListener()
传入的是一个lambda表达式,为函数接口,实现onMessage()
方法
③查看MessageListenerAdapter
的onMessage
方法,他是如何知道我们业务代码的方法是哪个呢,就是通过getListenerMethodName
这方法
④我们没有设置this.queueOrTagToMethodName
所以走下面的getDefaultListenerMethod
方法
⑤获取默认的方法名,就是handleMessage
;所以如果我们不指定方法名,那么方法名就必须是handleMessage
⑥找到方法名后还要进行调用,就是通过反射invoke
,进而实现方法调用
如果我们不想使用handleMessage这个固定的方法名,那么我们可以传入QueueOrTagToMethodName
加入业务方法的方法名为handleMesage
,少写了一个s,并且这个方法被封装为jar包,不能修改,那么就要配置QueueOrTagToMethodName了
MessageListenerAdapter adapter = new MessageListenerAdapter();
// 设置业务类,默认调用方法handleMessage
adapter.setDelegate(orderMessageService);
Map<String, String> methodMap = new HashMap<>(8);
// 配置queue对应的method
methodMap.put("queue.order", "handleMesage");
adapter.setQueueOrTagToMethodName(methodMap);
container.setMessageListener(adapter);
return container;
此时配置了QueueOrTagToMethodName
再次执行到④时,就从map中get queue对应的method
# MessageConverter
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("queue.order");
// 并发消费者数量,如果设置为多个就不能保证排序
container.setConcurrentConsumers(3);
// 最大并发消费数量
container.setMaxConcurrentConsumers(5);
// 设置确认模式 自动确认
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setPrefetchCount(1);
MessageListenerAdapter adapter = new MessageListenerAdapter();
// 设置业务类,默认调用方法handleMessage
adapter.setDelegate(orderMessageService);
// json转换类,对消息进行转换,
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
/*
* 默认转换后是LinkedHashMap,如果不做处理,我们还要去get获取对象
* 我们需要进行设置,直接返回我们需要的对象类型
*/
messageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
/**
* byte[]消息转成java对象
* @param properties
* @return
*/
@Override
public Class<?> toClass(MessageProperties properties) {
return OrderMessageDTO.class;
}
});
adapter.setMessageConverter(messageConverter);
container.setMessageListener(adapter); return container;}
# 源码简单分析
①调用extractMessage
方法来实现Message的转换
②获取消息转换类getMessageConverter()
③返回this.messageConverter
,就是我们传入的Jackson2JsonMessageConverter即adapter.setMessageConverter(messageConverter);
如果没设置的话就用默认的
MessageConverter实现,可以处理字符串、Serializable 实例或字节数组
private MessageConverter messageConverter = new SimpleMessageConverter();
④调用converter.fromMessage(message)
方法
这是一个接口,具体调用的就是实现类的方法,这里就是我们传入的Jackson2JsonMessageConverter
⑤Jackson2JsonMessageConverter
自身并没有实现这个方法,而是他的父类AbstractJackson2MessageConverter
实现了这个方法
⑥执行doFromMessage
方法,注意这个conversionHint
是null,即⑤的第一个fromMessage
方法传入的null
Class<?> targetClass
获取到的目标类,即OrderMessageDTO.class
messageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
/**
* byte[]消息转成java对象
* @param properties
* @return
*/
@Override
public Class<?> toClass(MessageProperties properties) {
return OrderMessageDTO.class;
}
});
最后再执行convertBytesToObject
方法,最终还是objectMapper.readValue
方法
# RabbitListener
注册RabbitListenerContainerFactory
,SimpleMessageListenerContainer
就不需要了,直接使用注解@RabbitListener
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
在业务类上加注解
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues = "queue.order")
在消费方法上加注解
// 由于没有转换类,所以方法参数就为byte[]@RabbitHandler(isDefault = true)public void handleMessage( byte[] messageBody) {}
还可以直接在方法上使用@RabbitListener
表明此方法是消费方法,可以直接在方法上声明队列和绑定关系,这样就不用@Bean一个一个注入了
@RabbitListener(
// containerFactory = "rabbitListenerContainerFactory",
// admin = "rabbitAdmin",
bindings = {
@QueueBinding(
value = @Queue(name = "queue.order",
arguments = {
// @Argument(name =
// "x-message-ttl", value =
// "15000", type = "java.lang
// .Integer"),
// @Argument(name =
// "x-dead-letter-exchange",
// value = "exchange.dlx"),
// @Argument(name =
// "x-dead-letter-routing-key", value = "#")
}
),
exchange = @Exchange(name = "exchange.order.restaurant", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.deliveryman", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.settlement.order", type = ExchangeTypes.FANOUT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.reward", type = ExchangeTypes.TOPIC),
key = "key.order"
)
}
// queues = "queue.order"
)
/**
* 使用@Payload可以指定消息体,可以直接使用pojo对象,但是发送和接收端需要实现Serializable接口
* @Headers指定消息头
*/
public void handleMessage(@Payload byte[] messageBody) {}
简化配置类,只需在application.yml
写入rabbitmq的配置,connection、factory、admin、template都会自动帮我们创建。不用编写配置类了。
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: admin
password: admin
# RabbitMQ集群间通信原理
集群间通信方法
- Federation(联邦)
- Shovel(铲子)
# 集群配置文件
- tcp_listeners 设置rabbitmq的监听端口,默认为5672
- disk_free_limit 磁盘低水位线,若磁盘容量低于指定值则停止接收数据,默认值为{mem_relative, 1.0},即与内存相关联1:1,也可以定制为多少byte
- vm_memory_high_watermark 设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%
- force_fine_statistics 该参数属于rabbitmq_management,若为true则进行精细化的统计,但会影响性能
# Federation
- 通过AMQP协议,使用一个内部交换机(隐藏,默认看不到),让原本发送到一个集群的消息转发至另一个集群
- 消息可以从交换机转发到交换机,也可以由队列转发至队列(双端统一)
- 消息可以单向转发,也可以双向转发
# Federation设置
- 启用Federation插件
rabbitmq-plugins enable rabbitmq_federation_management
- 使用网页控制台配置Federation
# Shovel
- Shovel可以持续的从一个broker拉取消息转发至另一个broker
- Shovel的使用较为灵活,可以配置从队列至交换机,从队列至队列,从交换机至交换机(双端不必统一)
# Shovel设置
- 启用Shovel插件
rabbitmq-plugins enable rabbitmq_shovel_management
- 使用网页控制台配置Shovel
# 集群网络分区
当一个节点发生网络故障,发送/接受不到消息时,会认为其他节点挂了,为了保证可用性,会把当前节点升级为主节点,继续对外进行服务,但是真正主节点没有任何问题(只是集群间暂时不能通信)。
此时从节点升级为主节点,处理数据,但是不会进行数据同步,因为网络分区了(自己单独为一个分区)。
真正的主节点,会在自己的网络分区中进行数据同步。
所以集群间的数据就不一致了。
- 网络分区指的是系统网络被分割为了不互通的两个部分
- 相对与网络部分故障,彻底的分区有时是有意义的(相对于业务不可以)
# 如何发现网络分区
- 使用
rabbitmqctl cluster_status
命令 - 使用网页控制台查看
- 使用http api
http://localhost:15672/api/nodes
# 集群网络分区处理方法
# 手动处理
- 挂起客户端进程(springboot) 可以减少不必要的消息丢失,如果进程数过多,可跳过
- 删除镜像队列的配置 如果没有删除镜像队列配置,恢复过程中可能会出现队列”漂移“现象(即从变主)
- 挑选信任分区 挑选指标:是否有disk节点(持久化)/分区节点数/分区队列数/分区客户端连接数
- 关闭非信任区的节点 采用rabbitmqctl stop_app命令,只关闭RabbitMQ应用,不会关闭ErLang虚拟机
- 启动非信任区的节点(修复网络后) 采用rabbitmqctl start_app命令
- 检查网络分区是否恢复 若已恢复跳至步骤8 若还存在网络分区进行步骤7
- 重启信任分区中的节点(可能是信任分区的问题) 使用步骤4和5的命令
- 添加镜像队列的配置(问题已解决)
- 恢复生产者和消费者的进程 若步骤1并未挂起客户端进程,也应检查客户端连接,必要时重启客户端(防止springamqp长时间连接失败后,就不进行连接了)
# 自动处理
RabbitMQ中有三种网络分区自动处理模式
- pause-minority
- pause-if-all-down
- autoheal
默认是ignore模式,不自动处理
如果要开启,配置rabbitmq.config中的cluster_parititon_handing参数
pause-minority
发生网络分区时,节点自动检测自己是否处于少数派,若是则关闭自己
若是出现了节点数相同的两个分区,可能会导致两个分区全部关闭
pause-if-all-down
每个节点预先配置一个节点列表,当失去和列表中所有节点的通信时,关闭自己
此方法非常考验配置的合理性,配置不合理可能会导致集群节点全部宕机
autoheal
发生网络分区时,每个节点使用特定算法自动决定一个”获胜分区“,然后重启不在分区的其他节点
当节点中有关闭状态时,autoheal不会起作用
# 总结
手动处理方式比较考验运维操作水平,但比较常用
慎用自动处理方式,因为如果配置不合理,会导致更大的问题
# 集群恢复与故障转移
前提:假设节点A和节点B组成一个镜像队列
场景1:A先停,B后停
方案1:B是master,只要先启动B,再启动A即可。或者先启动A,在30秒内启动B即可恢复镜像队列
场景2:A、B同时停机
方案2:可能是机房断电等原因造成,只需在30秒内连续启动A和B即可恢复镜像队列
场景3:A先停、B后停,且A无法恢复
方案3:因为B是master,所以等B启动后,在B节点上调用控制台命令rabbitmqctl forget_cluster_node A
解除与A的cluster关系,再将新的节点加入B即可重新恢复镜像队列
场景4:A先停、B后停,且B无法恢复
方案4:因为B是主节点,所以直接启动A是不信的,当A无法启动时,也就没有办法在A节点上调用方案3的命令,但是3.4.2之后的版本可以在加上--offline
参数解决。
这就意味着允许rabbitmqctl在理想节点上执行该命令,迫使RabbitMQ在未启动slave节点中选择一个节点作为master。当在A节点执行rabbitmqctl forget_cluster_node --offline B
时,RabbitMQ会mock一个节点代表A,执行forget_cluster_node命令将B剔除cluster,然后A就可以正常启动了,最后将新的slave节点加入A即可重新恢复镜像队列
场景5:A先停、B后停,且A、B均无法恢复,但是能得到A或B的磁盘文件
方案5:尝试恢复数据,将A或B的数据库文件(默认在$RABBIT_HONE/var/lib目录),把它拷贝到新节点的对应目录下,再将新节点的hostname改成A或B的hostname,如果是A节点(slave)的磁盘文件,则按照场景4处理即可,如果是B节点(master)的磁盘文件,则按照场景3处理,最后将新的slave加入到新节点后完成恢复
场景6:A先停、B后停,且A、B均无法恢复,且得不到A、B的磁盘文件
无解~29
# 延迟插件
配置
- 官网 (opens new window)下载插件(rabbitmq_delayed_message_exchange) 直达地址 (opens new window)
- 放入
$RABBIT_HOME/plugins
目录下 - 启动插件,如果是集群需要每个节点都配置(可以运行时直接配置)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用
- 声明交换机时添加参数
"x-delayed-type", "direct"
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
- 发送消息时配置延迟时间
"x-delay", 5000
,如果没有配置就算正常发送,不延迟
计时器可以设置为 (2^32)-1
毫秒将来
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...
# RabbitMQ状态监控
- 通过Java API判断节点是否健康
- 通过HTTP REST API监控集群状态
- 通过监控中间件监控RabbitMQ
通过Java API判断节点是否健康
使用Java应用创建connection与channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
若能创建成功,则节点健康,若创建失败(抛异常)则节点挂机或节点的网络连接失败
通过HTTP REST API监控集群状态
- 使用
api/nodes/
接口获得节点信息 - 使用
api/exchanges/{vhost}/{name}/
接口获得exchange状态信息 - 使用
api/queues/{vhost}/{name}/
接口获得queue状态信息
通过监控中间件监控RabbitMQ
常见的监控中间件有Zabbix、Prometheus等
其底层原始还是调用HTTP REST API,再将数据处理、存储、展示
# 消息可靠性保证
生产端的可靠性投递
- 保障消息成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(broker)确认应答
- 完善的消息补偿机制
大厂解决方案
- 消息落库,对消息状态进行打标
- 消息的延迟投递,做二次投递,回调检查
# 使用定时任务
# 延迟投递
定时任务的话读写消息库IO比较频繁,不太适合高并发的场景。
使用消息的延迟投递,做二次确认,回调检查
生产端只用关注业务,不用关注消息是否发送成功,新建一个回调服务用来关注消息状态。异步解耦