notes notes
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
首页
读书笔记
系统设计
项目实战
学习笔记
源码
运维
其它
极客时间 (opens new window)
GitHub (opens new window)
  • 并发编程

    • 并发编程
    • 多线程
    • 高级篇
  • 设计模式

    • 设计模式
  • 网络编程

    • Netty

      • NIO基础
      • Netty入门
      • Netty进阶
      • 优化与源码
  • 源码篇

    • 环境搭建
    • Spring
  • 云原生

    • Kubernetes
    • Helm
  • ElasticSearch

    • ElasticSearch
  • Java 虚拟机

    • 深入拆解 Java 虚拟机
    • JVM与GC调优
  • MQ

    • RabbitMQ

      • RabbitMQ笔记
        • AMQP协议
          • Exchange的作用
          • Direct Exchange
          • Fanout Exchange
          • Topic Exchange
          • RabbitMQ命令行
          • RabbitMQ高性能原因
        • RabbitMQ特性
          • 如何保证消息的可靠性
          • 发送端确认机制
          • 消息返回机制
          • 消费端确认机制
          • 消费端限流机制
          • 消息过期机制
          • 死信队列
        • RabbitAdmin
          • 源码简单分析
        • RabbitTemplate
          • 发送消息
          • 源码简单分析
          • 改造发送端确认和消息返回
        • SimpleMessageListenerContainer
        • MessageListenerAdapter
          • 源码简单分析
        • MessageConverter
          • 源码简单分析
        • RabbitListener
        • RabbitMQ集群间通信原理
          • 集群配置文件
          • Federation
          • Federation设置
          • Shovel
          • Shovel设置
          • 集群网络分区
          • 如何发现网络分区
          • 集群网络分区处理方法
          • 手动处理
          • 自动处理
          • 总结
          • 集群恢复与故障转移
        • 延迟插件
        • RabbitMQ状态监控
        • 消息可靠性保证
          • 使用定时任务
          • 延迟投递
      • RabbitMQ集群搭建文档
  • Redis

    • Redis进阶
  • ShardingSphere

    • Sharding-JDBC
  • SpringCloud

    • SpringCloud
  • ZooKeeper

    • ZooKeeper
  • 学习笔记
  • MQ
  • RabbitMQ
starry
2023-08-03
目录

RabbitMQ笔记

# AMQP协议

  • Broker:接受和分发消息的应用,RabbitMQ就算Message Broker
  • Virtual Host:虚拟Broker,将多个单元隔离开
  • Connection:publisher/consumer和broker之间的TCP连接(物理连接)
  • Channel:connection内部建立的逻辑连接,通常每个线程创建单独的channel
  • Routing Key:路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue:消息队列,消息最终被送到这里等待consumer取走
  • Binding:exchange和queue之间的虚拟连接,用于message的分发依据

# Exchange的作用

  • Exchange是AMQP协议和RabbitMQ的核心组件
  • Exchange的功能是根据绑定关系和路由键为消息提供路由,将消息转发至相应的队列
  • Exchange有4中类型:Direct/Topic/Fanout/Headers,其中header很少使用,以前三种为主

http://tryrabbitmq.com/ (opens new window)

# Direct Exchange

Message中的Routing Key如果和Binding Key一致,Direct Exchange就将message发送到对应的queue中

# Fanout Exchange

每个发到Fanout Exchange的message都会被分发到所有绑定的queue上(和Binding Key无关,只要有Queue就发送)

# Topic Exchange

根据Routing Key及统配规则,Topic Exchange将消息分发到目标Queue中

  • 全匹配:和Direct类似
  • Binding Key中的#:匹配任意个数的word
  • Binding Key中的*:匹配任意一个word

比如:咖啡、奶茶、果汁,三种饮料

  • 咖啡含有咖啡因,冷热均可,苦甜均可,Binding Key:caffeine.#
  • 奶茶不含有咖啡因,热饮,甜味,Binding Key:nocaffeine.hot.sweet
  • 果汁不含有咖啡因,冷热均可,甜味,Binding Key:nocaffeine.*.sweet

若条件是:无咖啡因、热饮、甜味

milktea队列和juice队列可以收到消息

若条件是:有咖啡因、冷饮、苦味

只有coffer队列可以收到消息

若条件是:无咖啡因、冷饮、甜味

只有juice队列可以收到消息

# RabbitMQ命令行

sbin目录下

rabbitmqclt --help
  • 想看什么就list什么
  • 想清空什么就purge什么
  • 想删除什么就delete什么
  • 有问题使用--help

rabbitmqctl stop_app:关闭应用

rabbitmqctl start_app:启动应用

rabbitmqctl status:节点状态

rabbitmqctl add_user username password:添加用户

rabbitmqctl list_users:列出所有用户

rabbitmqctl delete_user username:删除用户

rabbitmqctl clear_permissions -p vhostpath username:清除用户权限

rabbitmqctl list_user_permissions username:列出用户权限

rabbitmqctl change_password username password:修改密码

rabbitmqctl set_permissions -p vhostpath username "." "." ".*":设置用户权限

rabbitmqctl add_vhostpath vhostpath:创建虚拟主机

rabbitmqctl list_vhosts:列出所有虚拟主机

rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限

rabbitmqctl list_queues:查看所有队列信息

rabbitmqctl -p vhostpath purge_queue queue:清除队列里的消息

rabbitmqctl reset:移除所有数据

rabbitmqctl join_cluster <clusternode> [--ram]:组成集群命令

rabbitmqctl change_cluster_node_type disc | ram:修改集群节点的存储形式

rabbitmqctl forget_cluster_node [--offline]:忘记节点(摘除节点)

rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]:修改节点名称

# RabbitMQ高性能原因

  • Erlang进行间上下文切换效率远高于C和Java,进一步提高了RabbitMQ的并发性能
  • Erlang的网络性能有着和原生Socker一样的延迟,使得RabbitMQ的网络IO性能极高

# RabbitMQ特性

  • 发送端确认机制
  • 消息返回直接
  • 消费端限流机制
  • 消费端确认机制
  • 消息过期机制
  • 死信队列

# 如何保证消息的可靠性

发送方

  • 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
  • 需要使用RabbitMQ消息返回机制,若发现没目标队列,中间件会通知发送方

消费方

  • 需要使用RabbitMQ消费端确认机制,确认处理消息没有发送异常
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

RabbitMQ自身

  • 大量堆积的消息会给RabbitMQ产生很大压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
  • 过期后会被直接丢弃,无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期信息,以供分析

# 发送端确认机制

即:消息发送后,若中间件收到消息,会给发送端一个应答,生产者接受应答,用来确认这条消息是否正常发送到中间件

三种确认机制

  • 单条同步确认 配置channel,开启确认模式:channel.confirmSelect() 每发送一条消息,立即调用channel.waitForConfirms()方法,等待确认
channel.confirmSelect();

String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant","key.restaurant",null,messageToSend.getBytes(StandardCharsets.UTF_8));

if (channel.waitForConfirms()) {
    log.info("RabbitMQ confirm success");
} else {
    log.info("RabbitMQ confirm failed");
}
  • 多条同步确认 配置channel,开启确认模式:channel.confirmSelect() 发送多条消息后,调用channel.waitForConfirms()方法,等待确认
channel.confirmSelect();

String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
for (int i = 0; i < 3; i++) {
    channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes(StandardCharsets.UTF_8));
}
// 如果其中有一条失败就返回false,但是我们无法知道是哪条消息失败
if (channel.waitForConfirms()) {
    log.info("RabbitMQ confirm success");
} else {
    log.info("RabbitMQ confirm failed");
}
  • 异步确认 配置channel,开启确认模式:channel.confirmSelect() 在channel上添加监听:addConfirmListener(),发送消息后,会回调此方法,通知是否发送成功 异步确认有可能是单挑,也有可能 注意这里的deliveryTag不是唯一的,可能会重复,不建议使用异步确认
channel.confirmSelect();
ConfirmListener confirmListener = new ConfirmListener() {
    /** 确认成功
     * @param deliveryTag 同一线程发送的消息tag 依次递增,不同线程间tag可能相同
     * @param multiple 是否确认多条
     * @throws IOException
     */
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        log.info("Ack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        log.info("Nack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
    }
};
// 添加确认监听
channel.addConfirmListener(confirmListener);

String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
for (int i = 0; i < 10; i++) {
    channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes(StandardCharsets.UTF_8));
}
// 防止发送完,直接退出
Thread.sleep(10000);
2021-12-18 18:48:30.857  INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl      : Ack,deliveryTag:2,multiple:true
2021-12-18 18:48:30.857  INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl      : Ack,deliveryTag:3,multiple:false
2021-12-18 18:48:30.858  INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl      : Ack,deliveryTag:4,multiple:false
2021-12-18 18:48:30.858  INFO 54796 --- [ 127.0.0.1:5672] c.s.o.service.impl.OrderServiceImpl      : Ack,deliveryTag:10,multiple:true

# 消息返回机制

即:消息发送后,中间件会对消息进行路由,若没有发现目标队列,中间件会通知发送方,ReturnListener会被调用

开启消息返回机制

在RabbitMQ基础配置中有一个关键配置项:Mandatory

  • false,RabbitMQ直接丢弃无法路由的消息
  • true,RabbitMQ才会处理无法路由的消息
channel.addReturnListener(returnMessage -> {
    log.error(String.valueOf(returnMessage.getReplyCode()));
    log.error(returnMessage.getReplyText());
    log.error(returnMessage.getExchange());
    log.error(returnMessage.getRoutingKey());
    log.error(returnMessage.getProperties().toString());
    log.error(new String(returnMessage.getBody()));
});

String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 发送到订单微服务 订单微服务收到数据解析后进行后续操作
channel.basicPublish("exchange.order.restaurant", "key.order",true, null, messageToSend.getBytes());
Thread.sleep(10000);
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : 312
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : NO_ROUTE
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : exchange.order.restaurant
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : key.order
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
2021-12-18 19:23:33.427 ERROR 63004 --- [ 127.0.0.1:5672] c.s.r.service.impl.OrderMessageService   : {"productId":2,"orderId":433,"confirmed":true,"accountId":2,"price":23.25}

# 消费端确认机制

//声明消费端 autoAck设置为false
channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
            });

// 具体消费逻辑
DeliverCallback deliverCallback = (consumerTag, message) -> {
    /*
    * 注意这个channel是声明时的channel(获取消息的channel)
    * 传入要确认的tag
    * 是否为确认多条
    * */
    // 如果声明时autoAck为false,且不进行手动确认,会在服务结束时,重新回到ready状态
    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    /*
    * 确认失败,最后一个参数为是否重回队列
    * 如果为true,RabbitMQ回重新将消息进行投递,然后消费端由于确认失败,一致这样,死循环,不建议使用
    * */
    //channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
}

# 消费端限流机制

RabbitMQ开发了QoS(服务质量保证)功能,QoS功能保证了在一定数量的消息未被确认前,不消费新的消息。

Qos的功能前提是不使用自动确认

参数设置

  • prefetchCount:针对一个消费端最多推送多少未确认消息
  • global:true针对整个消费端限流;false针对当前channel
  • prefetchSize:0(单个消息大小限制,一般为0)

global和prefetchSize,RabbitMQ暂时未实现

// 在队列声明时就进行设置
channel.basicQos(2);
channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());

每次只能处理2个消息,其他消息未ready状态,此时可以横向扩展channel,其他channel可以处理ready的消息;但是如果不设置Qos,消息就是Unacked状态,就是消息送到channel了,但是没有进行确认,只有收到消息的channel可以处理,其他channel不能帮忙处理。

# 消息过期机制

RabbitMQ的过期时间分为消息TTL和队列TTL

  • 消息TTL设置了单条消息的过期时间
  • 队列TTL设置了队列中所有消息的过期时间

TTL应明显长于服务的平均重启时间,防止服务重启消息丢失

建议TTL长于业务高峰期时间

单条消息设置过期时间

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes(StandardCharsets.UTF_8));

队列中的消息设置过期时间

Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
channel.queueDeclare(
        "queue.restaurant",
        true,
        false,
        false,
        args);

# 死信队列

即:队列被配置了DLX属性(Dead-Letter-Exchange),当一个消息变成死信(dead message)后,能重新被发布到另外一个Exchange,这个Exchange也是一个普通交换机,死信被死信交换机路由后,一般进入一个固定队列

怎么才能变成死信

  • 消息被拒绝(reject/nack),并且requeue=false
  • 消息过期(TTL到期)
  • 队列达到最大长度

死信队列设置方法

设置转发、接受死信的交换机和队列

  • Exchange:dlx.exchange
  • Queue:dlx.queue
  • RoutingKey:#

在需要设置死信的队列加入参数

  • x-dead-letter-exchange = dlx.exchange
// 设置转发、接受死信的交换机和队列
channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.TOPIC, true, false, null);

channel.queueDeclare("queue.dlx", true, false, false, null);

channel.queueBind("queue.dlx", "exchange.dlx", "#");

// 在需要设置死信的队列加入参数
Map<String, Object> args = new HashMap<>(16);
// 队列中消息过期时间
args.put("x-message-ttl", 15000);
// 队列最大长度
args.put("x-max-length",5)
// 死信投递的地方
args.put("x-dead-letter-exchange", "exchange.dlx");
channel.queueDeclare(
    "queue.restaurant",
    true,
    false,
    false,
    args);



// 消息消费,消息被拒绝(reject/nack),并且requeue=false
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);

# RabbitAdmin

手动创建和绑定

@Autowired
public void initRabbitMQ() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");

    RabbitAdmin admin = new RabbitAdmin(factory);


    /*-----------restaurant--------------*/
    Exchange exchange = new DirectExchange("exchange.order.restaurant");
    admin.declareExchange(exchange);

    Queue queue = new Queue("queue.order");
    admin.declareQueue(queue);

    Binding binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.restaurant", "key.order", null);
    admin.declareBinding(binding);

    /*-----------deliveryman--------------*/
    exchange = new DirectExchange("exchange.order.deliveryman");
    admin.declareExchange(exchange);

    binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.deliveryman", "key.order", null);
    admin.declareBinding(binding);

    /*-----------settlement--------------*/
    exchange = new FanoutExchange("exchange.settlement.order");
    admin.declareExchange(exchange);

    binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.settlement.order", "key.order", null);
    admin.declareBinding(binding);

    /*-----------reward--------------*/
    exchange = new TopicExchange("exchange.order.reward");
    admin.declareExchange(exchange);

    binding = new Binding("queue.order", Binding.DestinationType.QUEUE, "exchange.order.reward", "key.order", null);
    admin.declareBinding(binding);

}

Spring自动创建和绑定

/*---------------------restaurant---------------------*/
@Bean
public Exchange exchange1() {
    return new DirectExchange("exchange.order.restaurant");
}

@Bean
public Queue queue1() {
    return new Queue("queue.order");
}

@Bean
public Binding binding1() {
    return new Binding(
            "queue.order",
            Binding.DestinationType.QUEUE,
            "exchange.order.restaurant",
            "key.order",
            null);
}

/*---------------------deliveryman---------------------*/
@Bean
public Exchange exchange2() {
    return new DirectExchange("exchange.order.deliveryman");
}

@Bean
public Binding binding2() {
    return new Binding(
            "queue.order",
            Binding.DestinationType.QUEUE,
            "exchange.order.deliveryman",
            "key.order",
            null);
}


/*---------settlement---------*/
@Bean
public Exchange exchange4() {
    return new FanoutExchange("exchange.settlement.order");
}

@Bean
public Binding binding3() {
    return new Binding(
            "queue.order",
            Binding.DestinationType.QUEUE,
            "exchange.settlement.order",
            "key.order",
            null);
}

/*--------------reward----------------*/
@Bean
public Exchange exchange5() {
    return new TopicExchange("exchange.order.reward");
}

@Bean
public Binding binding4() {
    return new Binding(
            "queue.order",
            Binding.DestinationType.QUEUE,
            "exchange.order.reward",
            "key.order",
            null);
}

/**
* 这里connectionFactory spring会自动在容器中寻找并注入
* @param connectionFactory 
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    /*
    * 由于RabbitAdmin是懒加载
    * 第一次操作时,才会执行自动将容器中Exchange、Queue进行Bing
    * 所以我们需要在启动时就操作一下,让spring自动帮我们声明和绑定
    * */
    factory.createConnection();
    return factory;
}

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    // 是否自动将容器中Exchange、Queue进行Bing,默认为true
    // admin.setAutoStartup(true);
    return admin;
}

# 源码简单分析

RabbitAdmin实现了InitializingBean接口

当所有的bean被注入完成后,会执行此方法

RabbitAdmin重写此方法

private volatile boolean running = false;
private boolean autoStartup = true;

只有这两条件都满足才会自动声明和绑定,如果不希望spring帮我们操作,可以设置autoStartup为false

从spring容器中获取所有的Exchange、Queue、Binding

最后再执行

# RabbitTemplate

# 发送消息

注入RabbitTemplate的连接

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
}

发送消息

@Autowired
private RabbitTemplate rabbitTemplate;
String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 消息属性
MessageProperties properties = new MessageProperties();
properties.setExpiration("15000");
Message message = new Message(messageToSend.getBytes(StandardCharsets.UTF_8), properties);
// 发送消息,格式为RabbitTemplate指定的Message,可以配置消息属性
rabbitTemplate.send("exchange.order.restaurant","key.restaurant",message);

// 发送消息,格式为Object,RabbitTemplate会进行转换
rabbitTemplate.convertAndSend("exchange.order.restaurant","key.restaurant",messageToSend);

# 源码简单分析

底层还是调用channel的basicPublish

①send方法重载,execute()方法,传入lambda表达式

②调用doExecute(action, connectionFactory)方法,这个action就我们①的lambda表达式,即

channel->{doSend(......);return null;}

③doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory)传入lambda表达式和connection连接,前面都是判断有没有连接,有连接怎么做,没连接怎么做;后面调用invokeAction(action, connectionFactory, channel);

④invokeAction直接返回action.doInRabbit(channel)

⑤doInRabbit即我们最开始①的lambda表达式

上面的execute()只是包装了一层,具体的执行方法还是我们传入的lambda方法,他只是帮我判断需不需要重试和帮我们管理连接

# 改造发送端确认和消息返回

添加配置

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    // 开启发送端返回和确认类型
    factory.setPublisherReturns(true);
    // 使用CORRELATED类型可以携带消息属性
    factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    /*
    * 由于RabbitAdmin是懒加载
    * 第一次操作时,才会执行自动将容器中Exchange、Queue进行Bing
    * 所以我们需要在启动时就操作一下,让spring自动帮我们声明和绑定
    * */
    factory.createConnection();
    return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    // 只有设置了returnCallback setMandatory才有用
    rabbitTemplate.setMandatory(true);

    /*没有被成功路由*/
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);

    });
    /*成功发送到中间件*/
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        log.info("correlationData:{}, ack:{}, cause:{}",correlationData, ack, cause);
    });

    return rabbitTemplate;
}

发送消息

String messageToSend = JSONUtil.toJsonStr(orderMessageDTO);
// 消息属性
MessageProperties properties = new MessageProperties();
properties.setExpiration("15000");
Message message = new Message(messageToSend.getBytes(StandardCharsets.UTF_8), properties);
// 设置相关数据
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderMessageDTO.getOrderId().toString());
// 发送消息,格式为RabbitTemplate指定的Message,可以配置消息属性
rabbitTemplate.send("exchange.order.restaurant","key.restaurant",message,correlationData);

消息确认携带传入的id数据

2021-12-19 16:05:44.541  INFO 40672 --- [nectionFactory1] c.s.o.config.RabbitConfig                : correlationData:CorrelationData [id=447], ack:true, cause:null

# SimpleMessageListenerContainer

@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);    container.setQueueNames("queue.order");    
// 并发消费者数量,如果设置为多个就不能保证排序    
container.setConcurrentConsumers(3);    
// 最大并发消费数量   
container.setMaxConcurrentConsumers(5);    
// 设置确认模式 自动确认    
//        container.setAcknowledgeMode(AcknowledgeMode.AUTO);   
//        container.setMessageListener(message -> {    
//            log.info("message:{}",message);    //        });    
// 设置限流1个    
container.setPrefetchCount(1);    
// 手动确认    
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);   
// 需要传入带有channel的listener    container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        log.info("message:{}",message);    });    
return container;
}

# MessageListenerAdapter

修改业务代码方法为public void handleMessage(byte[] messageBody)

我们这个是配置类,里面直接写业务不太优雅,这里使用MessageListenerAdapter的setDelegate来设置具体的执行方法

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);    container.setQueueNames("queue.order");   
// 并发消费者数量,如果设置为多个就不能保证排序    container.setConcurrentConsumers(3);    
// 最大并发消费数量    container.setMaxConcurrentConsumers(5);    
// 设置确认模式 自动确认    container.setAcknowledgeMode(AcknowledgeMode.AUTO);   
//        container.setMessageListener(message -> {    
//            log.info("message:{}",message);    
//        });    
// 设置限流1个    
container.setPrefetchCount(1);    
// 手动确认    
//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);   
//        
// 需要传入带有channel的listener    
//        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {    
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    //            log.info("message:{}",message);   
//        });    
MessageListenerAdapter adapter = new MessageListenerAdapter();    
// 设置业务类,默认调用方法handleMessage    
adapter.setDelegate(orderMessageService);    
container.setMessageListener(adapter);    
return container;
}

# 源码简单分析

① MessageListenerAdapter间接实现了ChannelAwareMessageListener接口,所以 container.setMessageListener(adapter)可以直接传入

②我们之前使用的setMessageListener()传入的是一个lambda表达式,为函数接口,实现onMessage()方法

③查看MessageListenerAdapter的onMessage方法,他是如何知道我们业务代码的方法是哪个呢,就是通过getListenerMethodName这方法

④我们没有设置this.queueOrTagToMethodName所以走下面的getDefaultListenerMethod方法

⑤获取默认的方法名,就是handleMessage;所以如果我们不指定方法名,那么方法名就必须是handleMessage

⑥找到方法名后还要进行调用,就是通过反射invoke,进而实现方法调用

如果我们不想使用handleMessage这个固定的方法名,那么我们可以传入QueueOrTagToMethodName

加入业务方法的方法名为handleMesage,少写了一个s,并且这个方法被封装为jar包,不能修改,那么就要配置QueueOrTagToMethodName了

        MessageListenerAdapter adapter = new MessageListenerAdapter();
        // 设置业务类,默认调用方法handleMessage
        adapter.setDelegate(orderMessageService);
        Map<String, String> methodMap = new HashMap<>(8);
        // 配置queue对应的method
        methodMap.put("queue.order", "handleMesage");
        adapter.setQueueOrTagToMethodName(methodMap);

        container.setMessageListener(adapter);

        return container;

此时配置了QueueOrTagToMethodName

再次执行到④时,就从map中get queue对应的method

# MessageConverter

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
container.setQueueNames("queue.order");    
// 并发消费者数量,如果设置为多个就不能保证排序    
container.setConcurrentConsumers(3);   
// 最大并发消费数量    
container.setMaxConcurrentConsumers(5);    
// 设置确认模式 自动确认   
container.setAcknowledgeMode(AcknowledgeMode.AUTO);    
container.setPrefetchCount(1);    
MessageListenerAdapter adapter = new MessageListenerAdapter();    
// 设置业务类,默认调用方法handleMessage    
adapter.setDelegate(orderMessageService);    
// json转换类,对消息进行转换,    
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();   
/*     
* 默认转换后是LinkedHashMap,如果不做处理,我们还要去get获取对象     
* 我们需要进行设置,直接返回我们需要的对象类型     
*/   
messageConverter.setClassMapper(new ClassMapper() {        
    @Override        
    public void fromClass(Class<?> clazz, MessageProperties properties) {        
    }        
    /**         
    * byte[]消息转成java对象         
    * @param properties         
    * @return         
    */        
    @Override        
    public Class<?> toClass(MessageProperties properties) {    
        return OrderMessageDTO.class; 
    }  
});    
adapter.setMessageConverter(messageConverter);   
container.setMessageListener(adapter);    return container;}

# 源码简单分析

①调用extractMessage方法来实现Message的转换

②获取消息转换类getMessageConverter()

③返回this.messageConverter,就是我们传入的Jackson2JsonMessageConverter即adapter.setMessageConverter(messageConverter);

如果没设置的话就用默认的

MessageConverter实现,可以处理字符串、Serializable 实例或字节数组

private MessageConverter messageConverter = new SimpleMessageConverter();

④调用converter.fromMessage(message)方法

这是一个接口,具体调用的就是实现类的方法,这里就是我们传入的Jackson2JsonMessageConverter

⑤Jackson2JsonMessageConverter自身并没有实现这个方法,而是他的父类AbstractJackson2MessageConverter实现了这个方法

⑥执行doFromMessage方法,注意这个conversionHint是null,即⑤的第一个fromMessage方法传入的null

Class<?> targetClass获取到的目标类,即OrderMessageDTO.class

messageConverter.setClassMapper(new ClassMapper() {
    @Override
    public void fromClass(Class<?> clazz, MessageProperties properties) {

    }

    /**
     * byte[]消息转成java对象
     * @param properties
     * @return
     */
    @Override
    public Class<?> toClass(MessageProperties properties) {
        return OrderMessageDTO.class;
    }
});

最后再执行convertBytesToObject方法,最终还是objectMapper.readValue方法

# RabbitListener

注册RabbitListenerContainerFactory,SimpleMessageListenerContainer就不需要了,直接使用注解@RabbitListener

@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    return factory;
}

在业务类上加注解

@RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues = "queue.order")

在消费方法上加注解

// 由于没有转换类,所以方法参数就为byte[]@RabbitHandler(isDefault = true)public void handleMessage( byte[] messageBody) {}

还可以直接在方法上使用@RabbitListener表明此方法是消费方法,可以直接在方法上声明队列和绑定关系,这样就不用@Bean一个一个注入了

    @RabbitListener(
//            containerFactory = "rabbitListenerContainerFactory",
//            admin = "rabbitAdmin",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "queue.order",
                                    arguments = {
//                                            @Argument(name =
//                                            "x-message-ttl", value =
//                                            "15000", type = "java.lang
//                                            .Integer"),
//                                            @Argument(name =
//                                            "x-dead-letter-exchange",
//                                            value = "exchange.dlx"),
//                                            @Argument(name =
//                                            "x-dead-letter-routing-key", value = "#")
                                    }
                            ),
                            exchange = @Exchange(name = "exchange.order.restaurant", type = ExchangeTypes.DIRECT),
                            key = "key.order"
                    ),
                    @QueueBinding(
                            value = @Queue(name = "queue.order"),
                            exchange = @Exchange(name = "exchange.order.deliveryman", type = ExchangeTypes.DIRECT),
                            key = "key.order"
                    ),
                    @QueueBinding(
                            value = @Queue(name = "queue.order"),
                            exchange = @Exchange(name = "exchange.settlement.order", type = ExchangeTypes.FANOUT),
                            key = "key.order"
                    ),
                    @QueueBinding(
                            value = @Queue(name = "queue.order"),
                            exchange = @Exchange(name = "exchange.order.reward", type = ExchangeTypes.TOPIC),
                            key = "key.order"
                    )
            }
//            queues = "queue.order"
    )

/**
 * 使用@Payload可以指定消息体,可以直接使用pojo对象,但是发送和接收端需要实现Serializable接口
 * @Headers指定消息头
 */
    public void handleMessage(@Payload byte[] messageBody) {}

简化配置类,只需在application.yml写入rabbitmq的配置,connection、factory、admin、template都会自动帮我们创建。不用编写配置类了。

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: admin
    password: admin

# RabbitMQ集群间通信原理

集群间通信方法

  • Federation(联邦)
  • Shovel(铲子)

# 集群配置文件

  • tcp_listeners 设置rabbitmq的监听端口,默认为5672
  • disk_free_limit 磁盘低水位线,若磁盘容量低于指定值则停止接收数据,默认值为{mem_relative, 1.0},即与内存相关联1:1,也可以定制为多少byte
  • vm_memory_high_watermark 设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%
  • force_fine_statistics 该参数属于rabbitmq_management,若为true则进行精细化的统计,但会影响性能

# Federation

  • 通过AMQP协议,使用一个内部交换机(隐藏,默认看不到),让原本发送到一个集群的消息转发至另一个集群
  • 消息可以从交换机转发到交换机,也可以由队列转发至队列(双端统一)
  • 消息可以单向转发,也可以双向转发

# Federation设置

  • 启用Federation插件
rabbitmq-plugins enable rabbitmq_federation_management
  • 使用网页控制台配置Federation

# Shovel

  • Shovel可以持续的从一个broker拉取消息转发至另一个broker
  • Shovel的使用较为灵活,可以配置从队列至交换机,从队列至队列,从交换机至交换机(双端不必统一)

# Shovel设置

  • 启用Shovel插件
rabbitmq-plugins enable rabbitmq_shovel_management
  • 使用网页控制台配置Shovel

# 集群网络分区

当一个节点发生网络故障,发送/接受不到消息时,会认为其他节点挂了,为了保证可用性,会把当前节点升级为主节点,继续对外进行服务,但是真正主节点没有任何问题(只是集群间暂时不能通信)。

此时从节点升级为主节点,处理数据,但是不会进行数据同步,因为网络分区了(自己单独为一个分区)。

真正的主节点,会在自己的网络分区中进行数据同步。

所以集群间的数据就不一致了。

  • 网络分区指的是系统网络被分割为了不互通的两个部分
  • 相对与网络部分故障,彻底的分区有时是有意义的(相对于业务不可以)

# 如何发现网络分区

  • 使用rabbitmqctl cluster_status命令
  • 使用网页控制台查看
  • 使用http api http://localhost:15672/api/nodes

# 集群网络分区处理方法

# 手动处理
  1. 挂起客户端进程(springboot) 可以减少不必要的消息丢失,如果进程数过多,可跳过
  2. 删除镜像队列的配置 如果没有删除镜像队列配置,恢复过程中可能会出现队列”漂移“现象(即从变主)
  3. 挑选信任分区 挑选指标:是否有disk节点(持久化)/分区节点数/分区队列数/分区客户端连接数
  4. 关闭非信任区的节点 采用rabbitmqctl stop_app命令,只关闭RabbitMQ应用,不会关闭ErLang虚拟机
  5. 启动非信任区的节点(修复网络后) 采用rabbitmqctl start_app命令
  6. 检查网络分区是否恢复 若已恢复跳至步骤8 若还存在网络分区进行步骤7
  7. 重启信任分区中的节点(可能是信任分区的问题) 使用步骤4和5的命令
  8. 添加镜像队列的配置(问题已解决)
  9. 恢复生产者和消费者的进程 若步骤1并未挂起客户端进程,也应检查客户端连接,必要时重启客户端(防止springamqp长时间连接失败后,就不进行连接了)
# 自动处理

RabbitMQ中有三种网络分区自动处理模式

  • pause-minority
  • pause-if-all-down
  • autoheal

默认是ignore模式,不自动处理

如果要开启,配置rabbitmq.config中的cluster_parititon_handing参数

pause-minority

发生网络分区时,节点自动检测自己是否处于少数派,若是则关闭自己

若是出现了节点数相同的两个分区,可能会导致两个分区全部关闭

pause-if-all-down

每个节点预先配置一个节点列表,当失去和列表中所有节点的通信时,关闭自己

此方法非常考验配置的合理性,配置不合理可能会导致集群节点全部宕机

autoheal

发生网络分区时,每个节点使用特定算法自动决定一个”获胜分区“,然后重启不在分区的其他节点

当节点中有关闭状态时,autoheal不会起作用

# 总结

手动处理方式比较考验运维操作水平,但比较常用

慎用自动处理方式,因为如果配置不合理,会导致更大的问题

# 集群恢复与故障转移

前提:假设节点A和节点B组成一个镜像队列

场景1:A先停,B后停

方案1:B是master,只要先启动B,再启动A即可。或者先启动A,在30秒内启动B即可恢复镜像队列


场景2:A、B同时停机

方案2:可能是机房断电等原因造成,只需在30秒内连续启动A和B即可恢复镜像队列


场景3:A先停、B后停,且A无法恢复

方案3:因为B是master,所以等B启动后,在B节点上调用控制台命令rabbitmqctl forget_cluster_node A解除与A的cluster关系,再将新的节点加入B即可重新恢复镜像队列


场景4:A先停、B后停,且B无法恢复

方案4:因为B是主节点,所以直接启动A是不信的,当A无法启动时,也就没有办法在A节点上调用方案3的命令,但是3.4.2之后的版本可以在加上--offline参数解决。

这就意味着允许rabbitmqctl在理想节点上执行该命令,迫使RabbitMQ在未启动slave节点中选择一个节点作为master。当在A节点执行rabbitmqctl forget_cluster_node --offline B时,RabbitMQ会mock一个节点代表A,执行forget_cluster_node命令将B剔除cluster,然后A就可以正常启动了,最后将新的slave节点加入A即可重新恢复镜像队列


场景5:A先停、B后停,且A、B均无法恢复,但是能得到A或B的磁盘文件

方案5:尝试恢复数据,将A或B的数据库文件(默认在$RABBIT_HONE/var/lib目录),把它拷贝到新节点的对应目录下,再将新节点的hostname改成A或B的hostname,如果是A节点(slave)的磁盘文件,则按照场景4处理即可,如果是B节点(master)的磁盘文件,则按照场景3处理,最后将新的slave加入到新节点后完成恢复


场景6:A先停、B后停,且A、B均无法恢复,且得不到A、B的磁盘文件

无解~29

# 延迟插件

配置

  • 官网 (opens new window)下载插件(rabbitmq_delayed_message_exchange) 直达地址 (opens new window)
  • 放入$RABBIT_HOME/plugins目录下
  • 启动插件,如果是集群需要每个节点都配置(可以运行时直接配置)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用

  1. 声明交换机时添加参数"x-delayed-type", "direct"
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
  1. 发送消息时配置延迟时间"x-delay", 5000,如果没有配置就算正常发送,不延迟

计时器可以设置为 (2^32)-1 毫秒将来

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...

# RabbitMQ状态监控

  • 通过Java API判断节点是否健康
  • 通过HTTP REST API监控集群状态
  • 通过监控中间件监控RabbitMQ

通过Java API判断节点是否健康

使用Java应用创建connection与channel

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

若能创建成功,则节点健康,若创建失败(抛异常)则节点挂机或节点的网络连接失败

通过HTTP REST API监控集群状态

  • 使用api/nodes/接口获得节点信息
  • 使用api/exchanges/{vhost}/{name}/接口获得exchange状态信息
  • 使用api/queues/{vhost}/{name}/接口获得queue状态信息

通过监控中间件监控RabbitMQ

常见的监控中间件有Zabbix、Prometheus等

其底层原始还是调用HTTP REST API,再将数据处理、存储、展示

# 消息可靠性保证

生产端的可靠性投递

  • 保障消息成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(broker)确认应答
  • 完善的消息补偿机制

大厂解决方案

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次投递,回调检查

# 使用定时任务

# 延迟投递

定时任务的话读写消息库IO比较频繁,不太适合高并发的场景。

使用消息的延迟投递,做二次确认,回调检查

生产端只用关注业务,不用关注消息是否发送成功,新建一个回调服务用来关注消息状态。异步解耦

上次更新: 2024/03/03, 08:36:37
JVM与GC调优
RabbitMQ集群搭建文档

← JVM与GC调优 RabbitMQ集群搭建文档→

Theme by Vdoing | Copyright © 2023-2024 Starry | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式