写一篇RocketMQ卷文让自己冷静一下

不吃不喝看了好久才概括出这么一点点东西,希望大佬们能够有耐心看一看,遇到说的不对的地方,也欢迎在评论区或者私信与我交流

另外完整版的代码注释,我在我的github上也添加了,感兴趣的小伙伴也可以点击这个链接去看一波 github地址

觉得我讲的有那么一点点道理,对你有那么一丢丢的帮助的,也可以给我一波点赞关注666哟~

废话不多说,下面开始我的表演~

RocketMQ全局流程图

RocketMQ主要流程图.png

上来就是这么一大张图片,相信大家肯定完全不想看下去。(那么我为什么还要放在一开始呢?主要是为了能够让大家有一个全局的印象,然后后续复习的时候也可以根据这个流程图去具体复习)

那么,下面我们就针对一些问题来具体描述RocketMQ的工作流程 此处内容会不断补充,也欢迎大家把遇到的问题在评论区留下来

消息消费逻辑

消息消费可以分为三大模块

  • Rebalance
  • 拉取消息
  • 消费消息

Rebalance

Rebalance逻辑

// RebalanceImpl
public void doRebalance(final boolean isOrder) {
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    // 遍历每个主题的队列
    // subTable 会在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 时修改
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 对队列进行重新负载
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }

  this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
  switch (messageModel) {
    case BROADCASTING: {
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      if (mqSet != null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
          this.messageQueueChanged(topic, mqSet, mqSet);
          log.info("messageQueueChanged {} {} {} {}",
                   consumerGroup,
                   topic,
                   mqSet,
                   mqSet);
        }
      } else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
      }
      break;
    }
    case CLUSTERING: {
      // topicSubscribeInfoTable topic订阅信息缓存表
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      // 发送请求到broker获取topic下该消费组内当前所有的消费者客户端id
      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      if (null == mqSet) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
        }
      }

      if (null == cidAll) {
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
      }

      if (mqSet != null && cidAll != null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);

        // 排序保证了同一个消费组内消费者看到的视图保持一致,确保同一个消费队列不会被多个消费者分配
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        // 分配算法 (尽量使用前两种)
        // 默认有5种 1)平均分配 2)平均轮询分配 3)一致性hash
        // 4)根据配置 为每一个消费者配置固定的消息队列 5)根据broker部署机房名,对每个消费者负责不同的broker上的队列
        // 但是如果消费者数目大于消息队列数量,则会有些消费者无法消费消息
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        // 当前消费者分配到的队列
        List<MessageQueue> allocateResult = null;
        try {
          allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);
        } catch (Throwable e) {
          log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
          return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if (allocateResult != null) {
          allocateResultSet.addAll(allocateResult);
        }

        // 更新消息消费队列,如果是新增的消息消费队列,则会创建一个消息拉取请求并立即执行拉取
        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {
          log.info(
            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
            allocateResultSet.size(), allocateResultSet);
          this.messageQueueChanged(topic, mqSet, allocateResultSet);
        }
      }
      break;
    }
    default:
      break;
  }
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                   final boolean isOrder) {
  boolean changed = false;

  Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
      // 当前分配到的队列中不包含原先的队列(说明当前队列被分配给了其他消费者)
      if (!mqSet.contains(mq)) {
        // 丢弃 processQueue
        pq.setDropped(true);
        // 移除当前消息队列
        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
          it.remove();
          changed = true;
          log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
        }
      } else if (pq.isPullExpired()) {
        switch (this.consumeType()) {
          case CONSUME_ACTIVELY:
            break;
          case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
              it.remove();
              changed = true;
              log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
          default:
            break;
        }
      }
    }
  }

  List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  for (MessageQueue mq : mqSet) {
    // 消息消费队列缓存中不存在当前队列 本次分配新增的队列
    if (!this.processQueueTable.containsKey(mq)) {
      // 向broker发起锁定队列请求 (向broker端请求锁定MessageQueue,同时在本地锁定对应的ProcessQueue)
      if (isOrder && !this.lock(mq)) {
        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
        // 加锁失败,跳过,等待下一次队列重新负载时再尝试加锁
        continue;
      }

      // 从内存中移除该消息队列的消费进度
      this.removeDirtyOffset(mq);
      ProcessQueue pq = new ProcessQueue();

      long nextOffset = -1L;
      try {
        nextOffset = this.computePullFromWhereWithException(mq);
      } catch (Exception e) {
        log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
        continue;
      }

      if (nextOffset >= 0) {
        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
        if (pre != null) {
          log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
        } else {
          // 首次添加,构建拉取消息的请求
          log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
          PullRequest pullRequest = new PullRequest();
          pullRequest.setConsumerGroup(consumerGroup);
          pullRequest.setNextOffset(nextOffset);
          pullRequest.setMessageQueue(mq);
          pullRequest.setProcessQueue(pq);
          pullRequestList.add(pullRequest);
          changed = true;
        }
      } else {
        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
      }
    }
  }

  // 立即拉取消息(对新增的队列)
  this.dispatchPullRequest(pullRequestList);

  return changed;
}

由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:

  1. 从Broker获取订阅当前Topic的消费者列表
  2. 根据具体的策略进行负载均衡
  3. 对当前消费者分配到的队列进行处理
    1. 原来有,现在没有:丢弃对应的消息处理队列(ProcessQueue)
    2. 原来没有,现在有:添加消息处理队列(ProcessQueue),如果是第一次新增,还会创建一个消息拉取请求

拉取消息

拉取消息逻辑

拉取消息的代码太多了,我就不再这里贴出来了。

我在这里说一下大致流程,然后有几个需要注意的地方

流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作

这里说一个RocketMQ消费者这边设计的一个亮点

它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)

另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因

既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式

  • 表达式过滤
    • tag
    • SQL92
  • 类过滤

消费消息

消息消费逻辑

这边也先说几个注意点吧,后面再单独出篇文章。

(一)顺序消费和非顺序消费消费失败的处理

(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量

(三)广播消息失败不会重试,仅打印失败日志

补充:为什么同一个消费组下消费者的订阅信息要相同

首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同

即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费

参考文档:阿里云:订阅关系一致

Rebalance细节

我们在看待这个问题的时候,可以把它分为两类情况考虑

  • topic不一致
  • tag不一致

(一)topic不一致的问题

首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest

在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。

(二)tag不一致的问题

随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉

延时队列是如何工作的

RocketMQ延时队列工作流程图.png

由流程图中我们不难看出,RocketMQ对延时消息的处理,是交由Timer去完成的(相关类ScheduleMessageService)。在Timer的任务队列中读取需要处理的延迟任务,将消息从延迟队列转发到具体的业务队列中

此处补充一点:此处提到的Timer为java工具类包(java.util.Timer)下的一个定时任务工具。它主要由两个部分:TaskQueue queue(任务队列)和TimerThread thread(工作线程)。这边我把它简单的类比为一个单线程的工作线程池

另外在ScheduleMessageService中使用到了Timer的两个方法,我在这里先单独列出来下

  • this.timer.schedule :在任务执行成功后,再加上对应的周期,然后再执行
  • this.timer.scheduleAtFixedRate :每隔指定时间就执行一次,与任务执行时间无关

话不多少,贴上源码*(源码虽然枯燥,但希望可以耐心的看完)*

// ScheduleMessageService
public void start() {
  if (started.compareAndSet(false, true)) {
    super.load();
    this.timer = new Timer("ScheduleMessageTimerThread", true);
    // 根据延时队列创建对应的定时任务
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
      Integer level = entry.getKey();
      Long timeDelay = entry.getValue();
      Long offset = this.offsetTable.get(level);
      if (null == offset) {
        offset = 0L;
      }

      if (timeDelay != null) {
        // 第一次,延迟一秒执行任务,后续根据对应延时时间来执行
        // 延时级别和消息队列id对应关系 : 消息队列id = 延时级别 - 1
        // shedule 在任务执行成功后,再加上对应的周期,然后再执行
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
      }
    }

    // scheduleAtFixedRate 每隔指定时间就执行一次,与任务执行时间无关
    this.timer.scheduleAtFixedRate(new TimerTask() {

      @Override
      public void run() {
        try {
          if (started.get()) {
            // 每个十秒持久化一次延迟队列的处理进度
            ScheduleMessageService.this.persist();
          }
        } catch (Throwable e) {
          log.error("scheduleAtFixedRate flush exception", e);
        }
      }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
  }
}
// DeliverDelayedMessageTimerTask
@Override
public void run() {
  try {
    if (isStarted()) {
      this.executeOnTimeup();
    }
  } catch (Exception e) {
    // XXX: warn and notify me
    log.error("ScheduleMessageService, executeOnTimeup exception", e);
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
      this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
  }
}

public void executeOnTimeup() {
  // 根据 延时队列topic 和 延时队列id 查找消费队列
  ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                                                                     delayLevel2QueueId(delayLevel));

  long failScheduleOffset = offset;

  if (cq != null) {
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ != null) {
      try {
        long nextOffset = offset;
        int i = 0;
        // 遍历ConsumeQueue,每一个标准的ConsumeQueue条目为20字节
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
          long offsetPy = bufferCQ.getByteBuffer().getLong();
          int sizePy = bufferCQ.getByteBuffer().getInt();
          long tagsCode = bufferCQ.getByteBuffer().getLong();

          if (cq.isExtAddr(tagsCode)) {
            if (cq.getExt(tagsCode, cqExtUnit)) {
              tagsCode = cqExtUnit.getTagsCode();
            } else {
              //can't find ext content.So re compute tags code.
              log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                        tagsCode, offsetPy, sizePy);
              long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
              tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
            }
          }

          long now = System.currentTimeMillis();
          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

          // > 0 未到消息消费时间
          long countdown = deliverTimestamp - now;

          if (countdown <= 0) {
            MessageExt msgExt =
              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
              offsetPy, sizePy);

            if (msgExt != null) {
              try {
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                  log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                  continue;
                }
                // 放到对应的 %RETRY%+gid 重试topic下进行消费(转发消息)
                PutMessageResult putMessageResult =
                  ScheduleMessageService.this.writeMessageStore
                  .putMessage(msgInner);

                if (putMessageResult != null
                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                  if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
                                                                                                            putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
                  }
                  continue;
                } else {
                  // XXX: warn and notify me
                  log.error(
                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                    msgExt.getTopic(), msgExt.getMsgId());
                  ScheduleMessageService.this.timer.schedule(
                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                       nextOffset), DELAY_FOR_A_PERIOD);
                  ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                           nextOffset);
                  return;
                }
              } catch (Exception e) {
                /*
                                         * XXX: warn and notify me
                                         */
                log.error(
                  "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
              }
            }
          } else {
            // 会将下次任务执行时间设置为countdown 即 消息的延时转发时间-当前时间
            ScheduleMessageService.this.timer.schedule(
              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
              countdown);
            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
            return;
          }
        } // end of for

        // 更新延时队列拉取任务进度
        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
        return;
      } finally {

        bufferCQ.release();
      }
    } // end of if (bufferCQ != null)
    else {
      // 消费队列不存在,默认为没有需要消费的任务,跳过本次消费

      long cqMinOffset = cq.getMinOffsetInQueue();
      long cqMaxOffset = cq.getMaxOffsetInQueue();
      if (offset < cqMinOffset) {
        // 下次拉取任务进度更新
        failScheduleOffset = cqMinOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                  offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
      }

      if (offset > cqMaxOffset) {
        failScheduleOffset = cqMaxOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                  offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
      }
    }
  } // end of if (cq != null)

  // 根据延时等级创建一个任务
  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>