RabbitMQ整合springboot实现延迟消息
上一篇总结:
自动应答公平分发是不生效的,因为手动应答是分发的前提,生产者需要根据ack判断是否处理完消息
连接与信道知识起到连接和传输的作用,而服务者和消费者通过交换机、队列、RoutingKey来确定彼此
预取值时不是一次全取而是竞争关系取值
延迟消息
延时队列就是用来存放需要在指定时间被处理的元素的队列
场景:
1.订单在30分钟之内未支付则自动取消
2.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
TTL是RabbitMQ中最大存活时间,单位是毫秒
整合springboot
创建工程加入依赖
<dependencies>
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RabbitMQ测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
添加配置文件
spring.rabbitmq.host=192.168.6.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
测试:先在MQ中添加交换机与对列,进行绑定
@RunWith(SpringRunner.class)
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
//简单模式
// rabbitTemplate.convertAndSend("队列名","消息");
//发布订阅 交换机扇出类型
// rabbitTemplate.convertAndSend("交换机","routingKey","消息");
rabbitTemplate.convertAndSend("exchange1","routingKey1","test");
}
}
测试成功后正式编写代码
配置类,进行交换机创建及绑定
package com.lzq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class DeadLetterConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明xExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列A ttl为10s并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列A绑定X交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列B ttl为40s并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列B绑定X交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列QD绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
发送者
package com.lzq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
public class SendMsg {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列: "+message);
}
}
消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(String message) throws IOException {
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), message);
}
}
此时http://localhost:8080/sendMsg/123
就可收到打印信息
自定义延时时间
在发送时候设置发送时间
与自定时间同时使用,以时间短的为准
编写配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
//声明队列C 死信交换机
@Bean("queueC")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
//没有声明TTL属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列B绑定X交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
控制类添加
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列C:{}", new Date(),ttlTime, message);
}
http://localhost:8080/sendExpirationMsg/你好1/20000
总结:当生产者连续发消息时,若后者延时消息短,后者也不会生效,先进先出原则
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
二维码