springboot 整合rabbitMq保证消息一致性方案

rabbitMq介绍

RabbitMQ是一种开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可用于在应用程序之间传递消息。RabbitMQ最初由LShift开发,现在由Pivotal Software维护。
RabbitMQ可以在多个平台上运行,包括Windows、Mac OS X和各种Linux发行版。它提供了多种编程语言的客户端库,如Java、Python、Ruby、.NET等等。RabbitMQ的主要特点包括:

  1. 可靠性:RabbitMQ使用多种机制确保消息传递的可靠性,例如消息确认、持久化和备份机制等。
  2. 灵活性:RabbitMQ支持多种消息传递模式,如点对点、发布/订阅、RPC等,可根据具体应用场景选择适合的模式。
  3. 可扩展性:RabbitMQ可以通过集群方式实现水平扩展,从而提高系统的吞吐量和可用性。
  4. 可插拔性:RabbitMQ支持多种插件,如消息传递追踪、消息转换、限流等,可以根据需要选择使用。
  5. 易用性:RabbitMQ提供了简单易用的管理界面,可用于监控和管理消息队列。同时,它还提供了丰富的文档和社区支持,方便用户学习和使用。

rabbitMq工作原理

在这里插入图片描述
RabbitMQ的工作原理主要包括生产者(Producer)、消息队列(Queue)和消费者(Consumer)三个部分。

  1. 生产者将消息发送到消息队列:生产者将消息发送到RabbitMQ的消息队列中,消息队列会暂时存储这些消息。
  2. 消费者从消息队列中获取消息:消费者可以从消息队列中获取消息,并对这些消息进行处理。
  3. 消费者处理完消息后发送确认:消费者在处理完消息后,向RabbitMQ发送确认信息,表示已经处理完该消息。
  4. RabbitMQ将已确认的消息从队列中移除:当RabbitMQ接收到消费者的确认信息后,会将已经确认的消息从队列中移除,这样其他消费者就不会再次获取到该消息。
    RabbitMQ使用交换机(Exchange)将消息发送到相应的消息队列中。交换机根据特定的路由规则(Routing Key)将消息发送到一个或多个消息队列中

工作模式

  1. 简单模式(Simple Mode):也称为点对点(Point-to-Point)模式,消息只能被一个消费者消费。生产者将消息发送到队列中,消费者从队列中获取消息并处理,消息处理完后从队列中删除。这种模式下的队列只能有一个消费者。
  2. 发布/订阅模式(Publish/Subscribe Mode):也称为广播模式(Broadcasting),消息可以被多个消费者消费。生产者将消息发送到交换机中,交换机将消息复制到多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。
  3. 路由模式(Routing Mode):消息可以根据路由规则(Routing Key)被不同的消费者消费。生产者将消息发送到交换机中,交换机根据消息的路由规则将消息发送到一个或多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。
  4. 主题模式(Topic Mode):也称为通配符模式(Wildcards),消息可以根据模糊匹配的路由规则被不同的消费者消费。生产者将消息发送到交换机中,交换机根据消息的路由规则和通配符规则将消息发送到一个或多个队列中,每个队列对应一个消费者。消费者从队列中获取消息并处理。

rabbitMq安装

安装步骤如下

  1. 下载 Erlang:RabbitMQ 是基于 Erlang 语言开发的,因此需要先安装 Erlang。Erlang 的下载地址为 https://www.erlang.org/downloads,根据系统类型和版本下载对应的安装程序,然后按照提示进行安装。
  2. 下载 RabbitMQ:RabbitMQ 的下载地址为 https://www.rabbitmq.com/download.html,根据系统类型和版本下载对应的安装程序,然后按照提示进行安装。
  3. 启动 RabbitMQ:在安装完成后,可以通过命令行或者图形化界面启动 RabbitMQ。在 Windows 系统中,可以在开始菜单中找到 RabbitMQ Server,然后选择启动 RabbitMQ Server。在 Linux 或者 Mac 系统中,可以在命令行中输入 rabbitmq-server start 命令来启动 RabbitMQ。
  4. 配置 RabbitMQ:启动 RabbitMQ 后,默认会监听 5672 端口,如果需要更改监听端口、配置虚拟主机、添加用户等操作,可以通过 RabbitMQ 的管理控制台或者命令行来进行配置。
  5. 使用 RabbitMQ:安装和配置完成后,就可以开始使用 RabbitMQ 进行消息传递了。可以选择使用 RabbitMQ 的客户端库或者 AMQP 协议来进行消息传递。

需要注意的是,安装 RabbitMQ 之前需要先安装 Erlang,而且版本要匹配。另外,如果在安装过程中出现问题,可以参考 RabbitMQ 的官方文档或者社区论坛来解决。
安装成功后 访问127.0.0.1:15672 出现登录页面安装成功。

springboot整合RabbitMQ

首先通过idea准备springboot的项目,添加rabbitMQ的依赖

<!-- SpringBoot web启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- SpringBoot amqp启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- SpringBoot 测试启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--  数据库连接-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--  mybatis 连接-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

配置rabbitMq如下

spring:
  rabbitmq:   #配置文件
    host: 39.104.177.194  #ip
    port: 5672
    username: admin
    password: daizhihua1996
    virtual-host: /
    connection-timeout: 15000
    publisher-confirm-type: correlated  #开启 confirms 回调  P →  Exchange
    publisher-returns: true  # 开启 returnedMessage 回调 Exchange →  Queue
    template:
      mandatory: true   # 抵达队列异步发送有效回调
    listener:
      simple:
        acknowledge-mode: manual  # 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
        concurrency: 5   #当前线线程数
        max-concurrency: 10  # 最大线程数
        prefetch: 10
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000ms   # 重试最大间隔时间10s
          initial-interval: 2000ms  # 重试初始间隔时间2s
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间,重试时间依次是2s,4s,8s,10s
   
       #消息message
com.acrdpm.smart_topic_queue=smart.queue
com.acrdpm.smart_topic_exchange=smart.exchange
com.acrdpm.smart_topic_routingKey=smart.routing.key

#延迟队列
com.acrdpm.delayed_queue=delayed.queue
com.acrdpm.delayed_exchange=delayed.exchange
com.acrdpm.delayed_routingKey=delayed.routing.key

配置rabbitMq配置文件

@Configuration
@Slf4j
//启用rabbitmQ
@EnableRabbit
@Getter
public class RabbitConfig {

	
    private final RabbitTemplate rabbitTemplate;
	// 将配置文件封装成工具类
    private final RabbitPropertiesConfig rabbitPropertiesConfig;
    // 消息备份类
    private final MsgLogService msgLogService;



    public RabbitConfig(RabbitTemplate rabbitTemplate, RabbitPropertiesConfig rabbitPropertiesConfig, MsgLogService msgLogService) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitPropertiesConfig = rabbitPropertiesConfig;
        this.msgLogService = msgLogService;
    }




    /**
     * 定义硬件需要的topic
     * @return
     */
    @Bean
    public Queue smartQueue() {
        return new Queue(rabbitPropertiesConfig.getSmart_topic_queue(), true);
    }

    @Bean
    public TopicExchange smartExchange() {
        return new TopicExchange(rabbitPropertiesConfig.getSmart_topic_exchange(), true, false);
    }

    @Bean
    public Binding smartBinding() {
        return BindingBuilder.bind( smartQueue()).to(smartExchange()).with(rabbitPropertiesConfig.getSmart_topic_routingKey());
    }


    /**
     * 定义延迟队列
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(rabbitPropertiesConfig.getDelayed_queue());
    }

    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(rabbitPropertiesConfig.getDelayed_exchange(), "x-delayed-message", true, false,
                args);
    }

    @Bean
    public Binding bindingDelayedQueue(){
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(rabbitPropertiesConfig.getDelayed_routingKey()).noargs();
    }




    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     * 1、spring.rabbitmq.publisher-returns: true
     * spring.rabbitmq.template.mandatory: true
     * 2、设置确认回调ReturnCallback
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     * MyRabbitConfig对象创建完成以后,执行这个方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm...correlationData[,{}",correlationData);
            log.info("ack是:{}",ack);
            log.info("case是:{}",cause);
            System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]");
            if (ack) {
                log.info("消息成功发送到Exchange");
                String msgId = correlationData.getId();
                msgLogService.updateStatus(msgId, MsgLogStatus.DELIVER_SUCCESS);
            } else {
                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
            }
        });

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         * 修改数据库状态
         */
        rabbitTemplate.setReturnsCallback((returnCallback) -> {
            Message message = returnCallback.getMessage();
            String exchange = returnCallback.getExchange();
            int replyCode = returnCallback.getReplyCode();
            String routingKey = returnCallback.getRoutingKey();
            String replyText = returnCallback.getReplyText();
            if(rabbitPropertiesConfig.getDelayed_exchange().equals(exchange)){
                /**
                 * 使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法
                 * 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
                 * 并非是BUG,而是有原因的,所以使用利用if去拦截这个异常,判断延迟队列交换机名称,然后break;
                 */
                log.info("如果是延迟队列那么break");
                return;
            }
            log.info("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
                    "==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");

            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange,
                    routingKey,replyCode,replyText,message);
            //todo 没有发送到指定的队列 数据暂存到数据库认定消费失败 再次重新上传
        });
    }

}


注意rabbitMq延迟队列需要安装插件,可参考官网

配置日志消息类

@Service
@Slf4j
public class MsgLogService {

    private final MsgLogMapper msgLogMapper;

    public MsgLogService(MsgLogMapper msgLogMapper) {
        this.msgLogMapper = msgLogMapper;
    }

    public void saveMsg(MsgLog msgLog){
        msgLogMapper.insert(msgLog);
    }

    public void updateStatus(String msgId, Integer status) {
        log.info("执行");
        msgLogMapper.updateStatus(msgId,status);
    }


    public MsgLog selectByMsgId(String msgId) {
        if (!ObjectUtils.isEmpty(msgId)){
            return msgLogMapper.seletMsgFormsgId(msgId);
        }
        return null;
    }


    public List<MsgLog> selectTimeoutMsg() {

        return msgLogMapper.selectTimeOutMsg();
    }


    public void updateTryCount(String msgId, Integer tryCount) {
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setTryCount(tryCount);
        msgLogMapper.updateByMsgId(msgLog);
    }

}

@Data
@NoArgsConstructor
public class MsgLog {

    private static final long serialVersionUID = 4990197789742500403L;
    private String msgId;

    private JSONObject msg;

    private String exchange;

    private String routingKey;

    private Integer status;

    private Integer tryCount;

    private String nextTryTime;

    private String createTime;

    private String updateTime;

    private String msgCase;


}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cvdmp.dao.MsgLogMapper">

  <insert id="insert" parameterType="com.cvdmp.domain.entity.MsgLog">

    insert into msg_log
    <trim prefix="(" suffix=")" suffixOverrides=",">
      <if test="msgId != null">
        msg_id,
      </if>
      <if test="exchange != null">
        exchange,
      </if>
      <if test="routingKey != null">
        routing_key,
      </if>
      <if test="status != null">
        status,
      </if>
      <if test="tryCount != null">
        try_count,
      </if>
      <if test="nextTryTime != null">
        next_try_time,
      </if>
      <if test="createTime != null">
        create_time,
      </if>
      <if test="updateTime != null">
        update_time,
      </if>
      <if test="msg != null">
        msg,
      </if>
        <if test="msgCase!=null">
          msg_case,
        </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides=",">
      <if test="msgId != null">
        #{msgId,jdbcType=VARCHAR},
      </if>
      <if test="exchange != null">
        #{exchange,jdbcType=VARCHAR},
      </if>
      <if test="routingKey != null">
        #{routingKey,jdbcType=VARCHAR},
      </if>
      <if test="status != null">
        #{status,jdbcType=INTEGER},
      </if>
      <if test="tryCount != null">
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="nextTryTime != null">
        #{nextTryTime},
      </if>
      <if test="createTime != null">
        #{createTime},
      </if>
      <if test="updateTime != null">
        #{updateTime},
      </if>
      <if test="msg != null">
        #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
      </if>
        <if test="msgCase !=null">
          #{msgCase},
        </if>
    </trim>
  </insert>

  <update id="updateStatus" parameterType="map">
    update msg_log set status = #{status}, update_time = now()
    where msg_id = #{msgId}
  </update>

  <select id="selectTimeOutMsg" resultType="com.cvdmp.domain.entity.MsgLog">
    select
    *
    from msg_log
    where status = 0
    and next_try_time &lt;= now()
  </select>
  <select id="seletMsgFormsgId" parameterType="string" resultType="com.cvdmp.domain.entity.MsgLog">
    select
      *
    from msg_log
    where msg_id = #{msgId}
  </select>

  <update id="updateByMsgId" parameterType="com.cvdmp.domain.entity.MsgLog">

    update msg_log
    <set>
      <if test="exchange != null">
        exchange = #{exchange,jdbcType=VARCHAR},
      </if>
      <if test="routingKey != null">
        routing_key = #{routingKey,jdbcType=VARCHAR},
      </if>
      <if test="status != null">
        status = #{status,jdbcType=INTEGER},
      </if>
      <if test="tryCount != null">
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="nextTryTime != null">
        next_try_time = #{nextTryTime},
      </if>
      <if test="createTime != null">
        create_time = #{createTime},
      </if>
      <if test="updateTime != null">
        update_time = #{updateTime},
      </if>
      <if test="msg != null">
        msg = #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler},
      </if>
    </set>
    where msg_id = #{msgId,jdbcType=VARCHAR}
  </update>
</mapper>

最后我们定义生产者和消费者


/**
 * mq消息推送策略
 * 1、通过rabbitmq完成消息的推送保证消息推送成功
 * @author daizhihua
 * @time 2023/4/25
 */
@Component(value = "mqStrategy")
public class MqStrategyService {

    private final RabbitConfig rabbitConfig;

    private final MsgLogService msgLogService;

    public MqStrategyService(RabbitConfig rabbitConfig, MsgLogService msgLogService) {
        this.rabbitConfig = rabbitConfig;
        this.msgLogService = msgLogService;
   }


    public void sendMessage(JSONObject map, HttpServletRequest request) {
        RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
        String msgId = RandomUtil.getRandomNumber(32);
        //设置消息id
        map.put("msgId",msgId);
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setMsg(map);
        msgLog.setExchange(rabbitPropertiesConfig.getSmart_topic_exchange());
        msgLog.setRoutingKey(rabbitPropertiesConfig.getSmart_topic_routingKey());
        msgLog.setNextTryTime(DateUtil.getNow());
        msgLogService.saveMsg(msgLog);
        //生成消息的唯一id
        CorrelationData correlationData = new CorrelationData(msgId);
        RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
        // 发送消息
        rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getSmart_topic_exchange(),
                rabbitPropertiesConfig.getSmart_topic_routingKey(), map, correlationData);



    }

    /**
     * 发送延迟队列消息
     * @param map
     * @param delayTime
     */
    public void sendMessageDelay(JSONObject map,int delayTime){
        RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig();
        String msgId = RandomUtil.getRandomNumber(32);
        //设置消息id
        map.put("msgId",msgId);
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setMsg(map);
        msgLog.setExchange(rabbitPropertiesConfig.getDelayed_exchange());
        msgLog.setRoutingKey(rabbitPropertiesConfig.getDelayed_routingKey());
        msgLog.setNextTryTime(DateUtil.getNow());
        //生成消息的唯一id
        CorrelationData correlationData = new CorrelationData(msgId);
        RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate();
        rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getDelayed_exchange(),
                rabbitPropertiesConfig.getDelayed_routingKey(), map, message -> {
                    message.getMessageProperties().setDelay(delayTime);
                    return message;},correlationData);
    }

}

延迟队列的消费

@Slf4j
@Component
@RabbitListener(queues = "${com.acrdpm.delayed_queue}")
public class MessageConsumer {

    private final MqStrategyService mqStrategyService;

    public MessageConsumer(MqStrategyService mqStrategyService) {
        this.mqStrategyService = mqStrategyService;
    }

    @RabbitHandler
    public void consume(Message message, JSONObject map, Channel channel) throws IOException {
        System.out.println("First Queue received msg : " );
        log.info("数据是:{}",map);
        System.out.println(message);
        System.out.println(channel);
        long tag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(tag, false);

    }

}

订阅消息的消费者

@Slf4j
@Component
@RabbitListener(queues = {"${com.acrdpm.smart_topic_queue}"})
public class SmartConsumer {

    private MsgLogService msgLogService;


    @RabbitHandler
    public void consume(Message message, JSONObject mail, Channel channel) throws IOException {
        log.info("接收到消息了");
        log.info("消息 {}",message);
        log.info("收到的消息是:{}",mail);
        long tag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(tag, false);

        String msgId = mail.getMsgId();
        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        if (null == msgLog || msgLog.getStatus().equals(MsgLogStatus.CONSUMED_SUCCESS)) {
            // 消费幂等性:确定不是重复的消息:及消费完成的消息
            log.info("重复消费, msgId: {}", msgId);
            return;
        }
         //获取投送标签
        long tag = message.getMessageProperties().getDeliveryTag();
//        boolean success = false;
//        if (success) {
//            log.info("成功发送消息");
//            msgLogService.updateStatus(msgId, MsgLogStatus.CONSUMED_SUCCESS);
//            // 消费确认手动ack
//            channel.basicAck(tag, false);
//        } else {
//            channel.basicAck(tag, false);
//        }
//        try {
            boolean success = EmailUtil.sendEmail(mail);
//
//        } catch (EmailException e) {
//            log.error("email 发送异常" , e);
//        } catch (IOException e) {
//            log.error("消息处理异常" , e);
//        }

    }
}

在发送消息的过程中,肯定会出现网络异常等情况所以我们定义了发送消息的持久化,为了保证一致性,可参考如下时序图

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>