rocketmq入门笔记

消息队列经典场景

优点

  1. 异步
    • 原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的 下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去拉取消息队列中的东西进行处理.大大减少时间
  2. 解耦:增加积分,发送短信这些可以单独拆分出来,需要使用直接发送到知道的消息队列就行,你只需要关注你当前的业务
  3. 削峰: 如果使用线程池来解决,一个服务一个线程在高峰期你的mysql或者redis可能撑不住,使用mq就可以限制主机每次只拉取多少条进行处理

缺点

  1. 可用性降低
    引入了mq,一旦mq宕机对业务有影响
  2. 复杂度提高
    数据链路变得复杂,如何保证顺序性,不重复消费
  3. 一致性问题
    用户支付了,增加积分出错该怎么处理

整体架构

  1. nameserver 相当于注册中心,连接从这里取ip
  2. broker 消息仓库,里面有topic与队列
  3. product,consumer生产者消费者

安装

  1. 基本的环境yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y
  2. 下载mq安装包wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
  3. 解压缩unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/
  4. 启动nameserver服务
    1. vim bin/runserver.sh
    2. 默认堆初始化最大都是4g,新生代2g,测试机没这么内存,不修改无法启动,改为256m,新生代128mJAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    3. 后台启动nohup bin/mqnamesrv > n1.out &
  5. 启动broker服务
    1. vim bin/runbroker.sh
    2. 默认堆初始化最大都是8g,新生代4g,测试机没这么内存,不修改无法启动,改为512m,新生代256mJAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
    3. 暴露namserver地址echo 'export NAMESRV_ADDR=localhost:9876' >> ~/.bash_profile
    4. 后台启动nohup bin/runbroker.sh >n2.out &
  6. 日志验证
    • n1.out The Name Server boot success. serializeType=JSON
    • n2.out The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876
  7. 发送接收测试
    1. 发送bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    2. 接收bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
  8. 关闭
    • 关闭nameserver服务bin/mqshutdown namesrv
    • 关闭broker服务bin/mqshutdown broker

集群

  1. 4种高可用集群
    • 多master模式
      • 优点:配置简单,性能最高
      • 缺点:单个宕机,这台机器上违背消费的消息不可订阅
    • 多master多salve 异步复制
      • 优点:消息丢失少(异步复制),消息实时性不受到影响,master宕机可以从slave上消费,性能与多master基本一致
      • 缺点:master宕机下会丢失少量消息
    • 多master多salve 同步双写
      • 优点:master宕机,消息无延迟,可用性高
      • 缺点:性能有所丢失
    • dledger模式:4.5版本之前采用master-slave架构部署但是master挂掉都slave无法自动晋升为master,这种模式可以将多个master-slave组成一个组,当组内master挂了将选举一个master继续服务

集群搭建

  1. 修改vim conf/2m-2s-async/broker-a.properties配置文件
#名字一样一个集群
brokerClusterName=DefaultCluster
#名字一样一个主从
brokerName=broker-a
# 0表示master >0标识slave
brokerId=0
# 删除文件时间
deleteWhen=04
# namesrv集群
namesrvAddr=work1:9876;work2:9876
# 默认创建队列数
defaultTopicQueueNums=4
# 自动创建队列
autoCreateTopicEnable=true
# 对外监听端口
listenPort=10911
#文件保留时间 默认48h
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#强制销毁文件间隔时间
#destroyMapedFileIntervalForcibly=120000
#重载文件时间
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio=88
#存储路径 
storePathRootDir=/usr/local/rocketmq-all-4.7.1-bin-release/store
#commitLog 存储路径 
storePathCommitLog=/usr/local/rocketmq-all-4.7.1-bin-release/store/commitlog
#消费队列存储路径存储路径 
storePathConsumeQueue=/usr/local/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径 
storePathIndex=/usr/local/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径 
storeCheckpoint=/usr/local/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-all-4.7.1-bin-release/store/abort
#限制的消息大小 
maxMessageSize=65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE 
brokerRole=ASYNC_MASTER 
#刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 
#checkTransactionMessageEnable=false 
#发消息线程池数量 
#sendMessageThreadPoolNums=128 
#拉消息线程池数量 
#pullMessageThreadPoolNums=128
  1. 将broker-a.properties写入到broker-b-s.properties修改brokerName,brokerId,brokerRole和几个文件存储路径,同一台虚拟机注意端口号也需要修改
  2. 克隆当前虚拟机,修改broker-a-s.properties,broker-b.properties文件
  3. 修改host文件vim /etc/hosts
192.168.147.134 work1
192.168.147.135 work2
  1. 启动两台nameservernohup bin/mqnamesrv >n1.out &
  2. 启动broker,使用-c指定配置文件nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties >nb.out &
  3. 关闭防火墙或者开放9876,两个broker服务的端口firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload
  4. 四个broker服务都启动后验证集群bin/mqadmin clusterList -n work1:9876

控制台搭建

  1. 项目地址rocketmq-dashboard
  2. 项目克隆git clone https://github.com/apache/rocketmq-dashboard.git
  3. 打开rocketmq-console导入idea,修改application.properties文件rocketmq.config.namesrvAddr=work1:9876;work2:9876以实际情况修改
  4. 打包项目上传jar包,启动nohup java -jar rocketmq-console-ng-2.0.0.jar &
  5. 打开浏览器访问当前服务器8080端口

dledger集群搭建

  1. 快速演示
    1. bin/dledger/fast-try.sh快速演示的脚本,但脚本给一个broker的内存是1g,虚拟机没有这么大修改一下
    2. 这里我修改为256m
      function startNameserver() {
          export JAVA_OPT_EXT=" -Xms256m -Xmx256m  "
          nohup bin/mqnamesrv &
      }
      
      function startBroker() {
          export JAVA_OPT_EXT=" -Xms256m -Xmx256m  "
          conf_name=$1
          nohup bin/mqbroker -c $conf_name &
      }
      
    3. 启动bin/dledger/fast-try.sh start
    4. 查看集群情况bin/mqadmin clusterList -n 127.0.0.1:9876
    5. 查询master节点进程号并把它kill,看slave是否能转为master
  2. 实际搭建
    1. 配置文件增加一下几条
    #是否启动dledger
    enableDLegerCommitLog=true
    #组名与brokerName保持一致
    dLegerGroup=broker-a
    #当前组所有主机-专门监听端口号
    dLegerPeers=n0-192.168.147.134:40911;n1-192.168.147.135:49011;n2-192.168.147.134:40912
    #当前主机id
    dLegerSelfId=n0
    
    1. 集群搭建成功
    2. 直接把135主机关机了
    3. 切换成功

基本概念

消息模型

producer生产消息,consumer消费消息,broker存储消息,每个broker对于一台服务器,每个broker可以存储多个opic消息,每个topic消息也可以分片存储于不同的broker上,message queue用于存储多个消息的物理地址,每个topic消息存储于多个message queue中

生产者

producer负责生产消费,将消费者消息发送到broker上,有多种发送方式:同步发送,异步发送,顺序发送,单向发送,同步与异步需要broker返回确认消息,单向发送不需要。同一类producer组成一个集合为生产组发送同一类消息且逻辑一致,如果有异常,broker服务器会联系同一生产者组提交或回滚

消费者

consumer消息者形式分为两种:

  1. 拉取式:主动式消费,消费者调用拉取的方法
  2. 推动式消费:broker有数据就会推给消费者
    消费者组必须订阅同一个topic,消息模式两种:
  3. 集群消费模式:平摊消费
  4. 广播消费模式:共享消费

主题

每个topic若干个消息,每个消息只能有一个主题,同一个topic下的数据分片保存到不同的broker,每个分片单位是messageQueue

代理服务器

  • 几个模块
    • remoting module:处理来自clients的请求
    • client manager:负责管理客户端和维护消费者的topic订阅信息
    • store service:处理消息的存储查询功能
    • ha service:高可用服务,负责master与slave的数据同步
    • index service:索引服务,以提高查询
  • 普通集群
    • 每个节点固定角色,master负责响应客户端请求并存储消息,slave负责同步数据并响应客户端部分读请求
  • dledger高可用集群
    • dledger
      1. 接管broker的commitlog消息存储
      2. 选举leader节点
      3. 完成消息同步
    • 多副本消息同步
      leader收到消息会将消息标记为uncommited状态,发给follower,follower收到消息后需要给leader返回一个ack,如果有超过半数的follower返回ack就会把消息改为commited状态,发给follower
    • leader选举机制
      • 每个节点有三个状态,leader,follower,candidate(候选人)
      • 每个时间点叫做term
      • 集群启动时,每个节点都是follower,集群内部发送一个timeout信号,follower转为候选人,发起投票后收到半数以上的投票晋升为leader,
      • 选举过程,集群启动,三个节点都是follower,三个节点都给自己投票,term都是1,三个节点随机休眠,a启动term加一为2,第二个节点醒来,发现a的term比自己大,承认a是leader,c同理

名字服务器

充当路由消息的提供者,broker会在启动时向nameserver注册自己的服务信息,后续通过心跳维护当前服务的可用性,生产者或消费者通过名字服务查找各主题消息相应的broker ip列表

消息

每个消息都必须拥有一个topic,每个消息拥有唯一的message id,且可以携带业务标识key, 可以为消息设置一个tag标签

消息存储

消息存储
  • 时间
    • mq收到消息标记为uncommit状态发给follower,follower收到消息,发给leader一个ack,超过半数follower返回ack,消息改为commit状态,存储,状态同步给follower
    • mqpush消息给消费者,等待消费者ack响应,标记为已消费,没有标记消息会重复推送
    • mq会定期删除一些过期的消息
  • 存储介质:磁盘文件(采用顺序读写,保证存储的速度,采用mmap的方式,省去上下文切换,提高速度)
消息存储结构
  1. commitlog:存储消息元数据,每个文件1个G
  2. consumerQueue:消息队列,保存commitlog的索引
  3. indexFile:提供通过key或时间来查询消息的方法
刷盘机制
  1. 同步刷盘:消息写入机器的内存时,通知刷盘线程刷盘,等待刷盘线程写入完成后唤醒线程,返回写入完成
    • 优点:稳定安全
    • 缺点:性能不如异步
  2. 异步刷盘:消息写入内存后,返回写入完成,当内存累计到一定程度是统一触发刷盘操作
    • 优点:吞吐量大
    • 缺点:一旦服务器断电丢失部分消息
主从复制
  1. 同步复制:生产者发送消息,只有master与slave(半数slave)写入成功才反馈生产者写入成功
  2. 异步复制:生产者发送消息,只要master写入消息成功,就反馈生产者写入成功,再异步将消息同步到slave
负载均衡
  1. 生产者负载均衡:
    • 生产者发送消息时,获取当前topic下所有broker集合,采用取模递增算法将消息往不同的broker上发送
  2. 消费者负载均衡
    • 集群模式:六种分配算法
      1. AllocateMachineRoomNearby:同机房的消费者与broker分配一起
      2. AllocateMessageQueueAveragely:平均分配,将所有消息队列平均分配给消费者,先算数后分配
      3. AllocateMessageQueueAveragelyByCircle:先轮流给消费者分配一个队列,后面再增加
      4. AllocateMessageQueueByConfig:直接指定所有队列
      5. AllocateMessageQueueByMachineRoom:按逻辑机房进行分配
      6. AllocateMessageQueueConsistentHash:
    • 广播模式:每个消费者分配所有的队列
消息重试

广播模式下不存在消息重试,会直接消费下一条

  1. 如何重试
    消息监听器中配置
    1. 返回Action.ReconsumeLater
    2. 返回null
    3. 抛出异常
      不重试返回Action.CommitMessage
  2. 重试处理
    重试的消息会进入“%RETRY%”+ConsumeGroup队列,最多16次,16次后会进入死信队列,可配置例如20次,16次后酶促间隔2h
    16次每次间隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
  3. messageId
    老版本中,无论重试多少次messageId是相同的,4.7.1中每次重试messageId会重建
  4. 配置覆盖
    最大重试次数对同一个消费组实例有效,最后启动的消费者会覆盖之前的配置
死信队列
  1. 一个死信队列对于一个消费组,而不是一个消费者
  2. 一个消费组不需要死信队列是不会创建死信队列的
  3. 一个死信队列包含这个消费组所有无法消费的消息,不区分主题
  4. 消息无法再被消费者正常消费
  5. 默认存储3天,不管是否消费被删除
  6. 默认死信队列中的消息无法读取,需要将权限配置为6
消息幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。支付时重复提交了多次但最后还是只支付了一次的钱

  • 三种实现语义
    • at most once:每条消息最多消费一次
    • at least once:每条消息至少消费一次
    • exactly one:确定消费一次
      rocketmq支持at least once语义
  • 消息重复情况
    • 发送重复:消息发送到服务端并且持久化了,网络断开或者宕机了,生产者判断发送失败了会造次发送
    • 投递重复:消费者收到消息并完成业务处理了,准备发送消息接收时宕机了,服务端在恢复后会再次发送一遍这个消息
    • 负载均衡时消息重复:broker服务重启,扩容,缩容会触发rebalance造成消费者收到重复的消息
  • 解决:
    • 业务唯一标识:例如订单号
    • 利用数据库唯一索引或主键索引
    • 利用redis判断

实操

dledger模式不支持批量发送/升级v4.8+

基础消息

发送者
package cn.jaminye.sample.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * @author Jamin
 * @date 2021/8/15 9:36
 */

public class Producer {
	public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

		DefaultMQProducer product = new DefaultMQProducer("product");
		product.setNamesrvAddr("192.168.147.134:9876");
		product.start();

		/**
		 * 同步发送
		 */
		Message message = new Message("java-topic", "hello-world".getBytes());
		SendResult sendResult = product.send(message);
		/**
		 * 批量发送 topic必须相同
		 */
		/*Message message1 = new Message("batch-topic2", "hello-world1".getBytes());
		Message message2 = new Message("batch-topic2", "hello-world2".getBytes());
		Message message3 = new Message("batch-topic2", "hello-world3".getBytes());
		List<Message> messages = new ArrayList<>(8);
		messages.add(message1);
		messages.add(message2);
		messages.add(message3);
		SendResult sendResult = product.send(messages);*/
		/**
		 *异步发送
		 */
		/*CountDownLatch countDownLatch = new CountDownLatch(1);
		Message message = new Message("async-topic", "async-topic".getBytes());
		product.send(message, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				countDownLatch.countDown();
			}

			@Override
			public void onException(Throwable throwable) {
				System.err.println(throwable.getMessage());
			}
		});
		countDownLatch.await();*/
		/**
		 * 单向发送
		 */
		//主题,标签,key,内容
		/*Message message = new Message("send-one-way", "tag1", "1", "send-one-way".getBytes());
		product.sendOneway(message);
		//这种方法无返回值,等待发送完成
		Thread.sleep(5000);*/

		// System.out.println("=================发送结果=============" + sendResult);
		product.shutdown();
	}
}

消费者
package cn.jaminye.sample.consumer;

/**
 * 拉
 *
 * @author Jamin
 * @date 2021/8/15 14:48
 */
public class Consumer {
	/**
	 * 拉模式
	 *
	 * @param args
	 * @author Jamin
	 * @date 2021/8/16 10:13
	 */
	// public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
	// 	/**
	// 	 * 拉模式,已弃用方式
	// 	 */
	// 	/*DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull-group");
	// 	consumer.setNamesrvAddr("192.168.147.134:9876");
	// 	consumer.start();
	// 	MessageQueue messageQueue = new MessageQueue();
	// 	messageQueue.setQueueId(2);
	// 	messageQueue.setBrokerName("broker-a");
	// 	messageQueue.setTopic("java-topic");
	// 	PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, 0, 2);
	// 	pullResult.getMsgFoundList().forEach(System.out::println);
	// 	consumer.shutdown();*/
	// 	/**
	// 	 * 现用
	// 	 */
	// 	DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull-group");
	// 	consumer.setNamesrvAddr("192.168.147.134:9876");
	// 	consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
	// 	consumer.subscribe("java-topic", "*");
	// 	consumer.start();
	// 	List<MessageExt> messageExtList = consumer.poll();
	// 	messageExtList.forEach(System.out::println);
	// 	consumer.shutdown();
	// }

	/**
	 * 推模式
	 *
	 * @param args
	 * @author Jamin
	 * @date 2021/8/16 10:13
	 */
	/*public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pull-group");
		consumer.setNamesrvAddr("192.168.147.134:9876");
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe("java-topic", "*");
		//负载
		consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();

	}*/


}

顺序消息

生产者
package cn.jaminye.order.product;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author Jamin
 * @date 2021/8/16 10:24
 */
public class Product {
	public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
		DefaultMQProducer producer = new DefaultMQProducer("order-group");
		producer.setNamesrvAddr("192.168.147.134:9876");
		producer.start();
		String[] strings = {"下单", "付款", "生成订单"};
		for (int i = 0; i < 100; i++) {
			for (int j = 0; j < 3; j++) {
				String s = "订单__" + i + "___" + strings[j];
				Message message = new Message("order-topic", s.getBytes(RemotingHelper.DEFAULT_CHARSET));
				//根据id取模入队列使分类消息进一个队列
				producer.send(message, new MessageQueueSelector() {
					@Override
					public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
						int index = ((Integer) o) % list.size();
						return list.get(index);
					}
				}, i);
			}
		}
		producer.shutdown();
	}
}
消费者
package cn.jaminye.order.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author Jamin
 * @date 2021/8/16 10:36
 */
public class Consumer {
	public static void main(String[] args) throws Exception {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer");
		//消费组订阅的消息未过期从头开始,已过期从当前开始
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe("order-topic", "*");
		consumer.setNamesrvAddr("192.168.147.134:9876");
		//顺序取
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
				list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		/*consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
				list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});*/
		consumer.start();
	}
}

广播消息

consumer.setMessageModel(MessageModel.BROADCASTING);

延迟消息

//1-18 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message1.setDelayTimeLevel(3);

批量消息

/**
 * 批量发送 topic必须相同  官方示例中小于1m 不能是延迟,事务消息
 */
Message message1 = new Message("batch-topic2", "hello-world1".getBytes());
Message message2 = new Message("batch-topic2", "hello-world2".getBytes());
Message message3 = new Message("batch-topic2", "hello-world3".getBytes());
List<Message> messages = new ArrayList<>(8);
messages.add(message1);
messages.add(message2);
messages.add(message3);
SendResult sendResult = product.send(messages);

过滤消息

  1. 表达式过滤
    consumer.subscribe(“filter-topic”, “TAG1 || TAG2”);
  2. sql过滤
  • 需要配置enablePropertyFilter=true
  • message1.putUserProperty("a", "1");
  • consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));
  • 基本语法>,<,>=,between,in,and,or,not

事务消息

  • 代码
    //组名不能与其他组名相同
    TransactionMQProducer transactionGroupProducer = new TransactionMQProducer("transactionGroup");
    ExecutorService executorService = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
    transactionGroupProducer.setExecutorService(executorService);
    transactionGroupProducer.setNamesrvAddr("192.168.147.134:9876");
    TransactionListenerImpl transactionListener = new TransactionListenerImpl();
    transactionGroupProducer.setTransactionListener(transactionListener);
    transactionGroupProducer.start();
    for (int i = 0; i < 10; i++) {
    	Message message = new Message("transaction-topic", String.valueOf(i).getBytes());
    	message.putUserProperty("name", String.valueOf(i));
    	TransactionSendResult transactionSendResult = transactionGroupProducer.sendMessageInTransaction(message, null);
    	System.out.println(transactionSendResult.getSendStatus());
    }
    }
    
    public class TransactionListenerImpl implements TransactionListener {
    	@Override
    	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
    		try {
    			//开启事务
    			//	do something
    			if ("1".equals(msg.getProperty("name"))) {
    				System.out.println("unknow");
    				return LocalTransactionState.UNKNOW;
    			}
    			System.out.println("success");
    			return LocalTransactionState.COMMIT_MESSAGE;
    		} catch (Exception ex) {
    			System.out.println("回滚事务");
    			return LocalTransactionState.ROLLBACK_MESSAGE;
    		}
    
    
    	}
    
    	@Override
    	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    		System.out.println("进入check");
    		//	do something query db
    		return true ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
    
    	}
    }
    
  • 流程
    1. 发送消息到服务端,这个消息暂存在服务端,不会被消费者读取到
    2. 持久化成功后会返回生产者一个ack,确认消息是否成功
    3. 成功回调执行executeLocalTransaction方法,执行本地事务,持久化到数据库类的操作,这块的回滚自行处理,最终返回本地事务的执行结果
    4. 根据返回结果进行操作,commit的话会将当前消息移动到实际的topic下,回滚就删除消息
    5. 如果本地事务返回unknown,服务端会定时调用checkLocalTransaction方法进行查询,最多15次
    6. 根据checkLocalTransaction方法进行执行回滚或者提交

acl 权限控制

  1. 开启权限控制
aclEnable=true
  1. 配置文件
#全局白名单
globalWhiteRemoteAddresses:
#- 192.168.147.*

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
 #白名单地址
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  #针对每个主题
  topicPerms:
  - topicA=DENY
  - topicB=PUB|SUB
  - topicC=SUB
  - java-topic=DENY
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
  - product2=DENY
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true
  1. 代码
DefaultMQProducer product = new DefaultMQProducer("product2", new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678")));
product.setNamesrvAddr("192.168.147.134:9876");
product.start();
Message message = new Message("java-topic", "hello-world".getBytes());
SendResult sendResult = product.send(message);
System.out.println(sendResult);
product.shutdown();

springboot整合rocketmq

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

配置

rocketmq.name-server=192.168.147.134:9876
rocketmq.consumer.group=springboot-group

普通消息发送

@Component
public class SpringProducer {
	@Resource
	RocketMQTemplate mqTemplate;

	public void sendMessage() {
		Message<String> message = MessageBuilder.withPayload("12345").build();
		//topic:tag
		mqTemplate.syncSend("topic-1" + ":" + "TAG1", message, 100000);
		mqTemplate.syncSend("topic-1" + ":" + "TAG2", message, 100000);
	}

}
@Component
// selectorType 过滤使用tag还是sql selectorExpression tag或者sql consumeMode顺序还是正常的 messageModel广播还是集群
@RocketMQMessageListener(topic = "topic-1", consumerGroup = "springboot-group", selectorType = SelectorType.SQL92, selectorExpression = "TAGS='TAG1'", consumeMode = ConsumeMode.CONCURRENTLY,
		messageModel = MessageModel.CLUSTERING)
public class SpringConsumer implements RocketMQListener<String> {
	@Override
	public void onMessage(String s) {
		System.out.println(s);
	}
}
事务消息
@Component
public class Producer {
	@Resource
	RocketMQTemplate rocketMQTemplate;

	public void sendMessage() {
		Message<String> message1 =
				MessageBuilder.withPayload("123").setHeader(RocketMQHeaders.TRANSACTION_ID, "1").setHeader(RocketMQHeaders.TOPIC, "123")
						.setHeader(RocketMQHeaders.TAGS, "1231").setHeader("a", 1).build();
		TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("springboot-producer1:TAG1", message1, null);
		System.out.println(transactionSendResult);
	}
}
@RocketMQTransactionListener
public class Listener implements RocketMQLocalTransactionListener {
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
		System.out.println("message===============" + message);
		// 获取时添加前缀RocketMQHeaders.PREFIX
		String tags = message.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS, String.class);
		System.out.println("id==================" + tags);
		System.out.println("UNKNOWN==================");
		return RocketMQLocalTransactionState.UNKNOWN;
	}

	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
		System.out.println("message===============" + message.getPayload());
		return RocketMQLocalTransactionState.COMMIT;
	}
}
总结
  1. 使用RocketMQTemplate进行发送消息,相关属性都以rocketmq_开头
  2. topic:tags

源码阅读

环境搭建

  1. 源码地址 源码地址 使用4.7.1版本源码
  2. 在项目根目录下创建conf文件夹,复制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三个文件到conf下
  3. 在本机添加环境变量ROCKETMQ_HOME指向项目根目录
  4. 启动nameser
  5. 修改conf目录下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等参数具体可参考上方配置
  6. 启动broker 配置启动参数-c broker.conf文件地址

namesever

  1. 配置信息:创建nameseverconfig与nettyserverconfig
  2. 初始化,启动,监听9876端口,提供给客户端拉取路由信息
  3. 创建处理请求的线程与定时扫描的线程(10s扫描一次,判断最后最后更新时间+2分钟,超出会删除这个broker并关闭连接)

broker

  1. 启动了很多组件
  2. 注册到nameserver,每30s(可以配置修改但最长为60s)发送一次心跳

producer

  1. DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
  2. 判断组名是否符合规定
  3. 启动各种定时任务,缓存nameserver上所有的主题,与broker建立心跳
  4. 发送消息采用索引自增取模的方式进行

文件存储

  1. org.apache.rocketmq.store.DefaultMessageStore#putMessage
  2. 使用零拷贝追加到commitlog,同步或异步刷盘,主从同步
  3. 定时任务:每10s启动启动一次,
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>