RabbitMq高级之如何保证消息发送可靠性

1.RabbitMq的发送机制

学过RabbitMq的同学们大概都知道了RabbitMq发送机制引入了Exchange(交换机的概念),消息发送方,首先把消息发送到交换机这是第一个步骤,然后交换机在把消息路由到不同的队列中(Queue)这是第二个步骤,在有不同的消费者去消费。

注意:大致,知道消息发送的整个过程后,大概知道了要想保证消息发送成功主要是从两个方面出发

1.消息成功发送到交换机(Exchange)
2.交换机成功路由到队列(Queue)

2.针对两种情况RabbitMq解决方案

为了解决这个两个方案RabbitMq给出了俩种方案,事务机制发送方确认机制,由于事务机制过于耗费性能,效率比较有点低.这里着重讲解发送方确认机制
发送方确认机制,就是消息发送到MQ那端后,MQ会回一个确认收到的消息给我们.

2.1发送方确认机制

在讲解这个,首先介绍两个接口ConfirmCallback和ReturnsCallback.
ConfirmCallback接口的作用:重写次接口的confirm方法,可以确认消息是否发送到Exchange.
ReturnsCallback接口的作用:重写次接口的returnedMessage方法,可以确认消息从EXchange路由到Queue失败,此方法的回调是个失败的回调,消息从Exchange路由到Queue失败才会回调这个方法。

配置:

spring:
  application:
    name: mq-test
  rabbitmq:
    host: ####
    port: #####
    #打开消息返回 消息到达队列的回调
    publisher-returns: true
    #打开消息确认机制 消息到达交换器的确认回调
    publisher-confirm-type: correlated

publisher-confirm-type配置主要有三个值:

  1. none:表示禁用发布确认模式,默认即此。
  2. correlated:表示成功发布消息到交换器后会触发的回调方法。
  3. simple:类似 correlated

实现上面俩个接口具体配置如下:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback
        rabbitTemplate.setReturnsCallback(this); // 指定 ReturnCallback
    }
    //这个主要是把消息路由到交换机 没有推到交换机会回调   在这里可以设置消息发送成功后的过程
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("{}:消息成功到达交换器",correlationData.getId()); // correlationData作用是给消息增加一个唯一性ID
        }else{
            log.error("{}:消息发送失败", correlationData.getId());
        }
    }
    // 交换机路由消息到队列 没有路由成功会报错
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
    }

}

接下来测试没有发送到交换机的情况:这是一个不存在的交换机

  @Test
    public void testNotExchange(){
        rabbitTemplate.convertAndSend("test_not_exchange","test_rouking","测试没有发送到交换机的回调".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
    }

日志:
测试有交换机但是没有队列的情况:

  @Test
    public void testNotExchange(){
        rabbitTemplate.convertAndSend("test_exchange","test_rouking__not","测试没有路由到队列的回调".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
    }

在这里插入图片描述
至此,就是RabbitMQ保证消息可靠性的解决方案。

3.RabbitMQ重试机制

RabbitMQ的重试机制这里主要也是个俩个原因:一是没找到MQ的情况,二是找到MQ了,但是消息发送失败了。

1.SpringBoot自带的重试机制

配置如下:

spring:
  application:
    name: mq-test
  rabbitmq:
    host: #######
    port: 5672
    publisher-returns: true
#    none:表示禁用发布确认模式,默认即此。
#    correlated:表示成功发布消息到交换器后会触发的回调方法。
#    simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
    publisher-confirm-type: correlated
#    此处配置重试机制 跟mq没关系,基于springboot的重试机制
    template:
      retry:
        enabled: true  #开启重试机制
        initial-interval: 1000ms  #重试起始间隔时间
        max-attempts: 10  #最大重试次数
        max-interval: 10000ms  #最大重试次数
        multiplier: 2  #间隔时间乘数
2.业务重试机制

针对消息没有成功到交换机,针对这种情况,回调做相关的业务逻辑即可.
具体思路:
可以使用数据库在每次发送消息的时候写入库里给此条消息增加一个状态,在交换机回调处,做相应的处理逻辑.
1.收到失败回调对此消息做数据库的修改,是重试几次后,还是发送失败那么就不在重新发送此条消息,改变此条消息的状态即可。
2.如果消息收到此条消息发送成功就改变次条消息的状态。
3.当然了,发送失败的消息,重试次数超过后,定时器去找失败的消息,做业务处理即可。
注意:这里会有消息重复发送的情况,在消费方做好消息的幂等性即可。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>