RabbitMq基础

MQ相关概念

什么是mq

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。

为什么要用mq
  • 流量削峰
  • 应用解耦
  • 异步处理

RabbitMq

RabbitMq的概念

RabbitMQ 是一个消息中间件:它接受并转发消息。

四大核心概念

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

安装并开启rabbitmq的web界面
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

消息应答

概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了部分突然就会挂掉,会发生什么情况。rabbitmq一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的信息,因为他无法接收到

为了保证消息在发送过程中不丢失,rabbitmq引入了消息应答机制,消息应答就是:

消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接受到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者由于接收太多来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

手动应答
  • Channel.basicAck(勇于肯定确认) rabbitmq已经知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • channel.basicReject(用于否定确认) 与chanel.basicnack相比较少了一个参数 不处理该消息了 直接拒绝,可以将其丢弃了
multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

channel.basicAck(deliveryTag,true)后边的参数即为multiple

multiple的true和false代表不同的含义

true代表批量应答channel上未应答的消息

比如说channel上有传送tag的消息,5,6,7,8 当前tag是8 那么此时5-8这些还未应答的消息都会被确认收到消息应答

false同上边相比

只会应答tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答

尽量使用非全应答的消息,能够最大程度避免消息丢失

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已经关闭,连接已经关闭或者tcp连接丢失),导致消息未发送ack确认,rabbitmq将了解到消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
在这里插入图片描述
c1处理消息一,但由于c1断开连接,c1并没有向mq发送ack,此时消息一不能够被mq删除,会重新入队,c2消费者可以处理消息一,重新交由c2处理

消息手动应答代码

默认消息采用的是自动应答,所以我们想要实现消息消费过程中不丢失,需要把自动应答改为手动应答

        Channel getchannel = utils.getchannel();
        getchannel.queueDeclare("hello",false,false,false,null);
        boolean autoAck=false;
        getchannel.basicConsume("hello", autoAck, new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                String string = new String(delivery.getBody(),"utf-8");
                System.err.println(string);
                getchannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        }, s -> { });

消息生产者

        Channel getchannel = utils.getchannel();
        getchannel.queueDeclare("hello",false,false,false,null);
        getchannel.basicPublish("","hello",false,null,new String("肥肥是只猪").getBytes());
        System.out.println("success");

rabbitmq 持久化

概念

通过手动应答可以保证处理任务时任务不丢失,但是如何保证当mq服务停掉之后消息生产者发送过来的消息不丢失,默认情况下mq由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这么做,确保消息不会丢失需要做两件事:

我们需要将队列和消息都标记为持久化

队列如何实现持久化

如果要实现队列持久化,需要在声明队列的时候把durable参数设置为持久化

bolean durable=true
//消息队列持久化
channel.queueDeclare(queue_name,durable,false,false,null);

但是需要注意的是如果之前声明的队列不是持久化的,需要把原先的队列先删除,或者重新创建一个持久化的队列,不然就会出错

消息实现持久化

只需在消息进行发布的时候,添加参数即可

channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,new String("肥肥是只猪").getBytes());

将消息标记为持久化并不能完全保证不会丢失消息,尽管它告诉mq将消息保存到磁盘,但是这里依然存在当消息准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时并没有完全的写入磁盘,持久性保证不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

不公平分发

最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。 为了避免这种情况,我们可以设置参数

channel.basicQos(1);

在这里插入图片描述

预取值

​ 本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有 未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何 消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高 向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理 的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理 的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的 内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这 将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。

发布确认

发布确认原理

在这里插入图片描述
在设置了队列持久化和消息持久化之后,并不能完全保证消息的不丢失,因为消息持久化时存在一个消息缓存的间隔点,此时就需要发布确认来保证生产者在发布消息到相应队列时不会出现消息丢失

单一发布

简单的同步确认发布的方式,也就是发布一个消息之后只有他被确认发布,后续的消息才能继续发布

这种确认方式有一个最大的缺点就是:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过百条消息的吞吐量,当然对于某些应用程序来说这可能已经足够了

        Channel channel = utils.getchannel();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        long start=System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_ACCOUNT; i++) {
            channel.basicPublish("",QUEUE_NAME,null,(i+"").getBytes());
            boolean b = channel.waitForConfirms();
            if(b){
                System.out.println("消息发布成功");
            }
        }
        long end=System.currentTimeMillis();
        System.out.println("总共花费时间"+(end-start));//31362
批量发布

与单个确认消息相比,先发布一批消息然后一起确认可以极大的提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题的时,不知道那个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息

package com.example.mqfeifei.test;
import com.rabbitmq.client.Channel;
public class MultPartConfirm {
    public static void main(String[] args) throws Exception {
        Channel channel = utils.getchannel();
        channel.confirmSelect();
        channel.queueDeclare("cxf",false,false,false,null);
        long start=System.currentTimeMillis();
        for (int i = 0; i < 1000 ; i++) {
            channel.basicPublish("","cxf",null,(i+"").getBytes());
            if(i%100==0) channel.waitForConfirms();
        }
        long end=System.currentTimeMillis();
        System.out.println("总共耗费的时间为"+(end-start));//5827
    }
}
异步发布

异步确认逻辑虽然比上边两个都要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
在这里插入图片描述

Channel channel = utils.getchannel();
        channel.confirmSelect();
        channel.queueDeclare("aysnc",false,false,false,null);
        //添加确认的回调函数
        /**
         * 第一个参数为成功的回调函数
         * 第二个参数为失败的回调函数
         * 当然这里也可以进行确认失败的消息的处理
         * 1.将所有发送的消息存到一个线程安全的map中 key为在channel中的id  value为消息的message value
         * 2.在成功的回调函数中,将该成功的回调函数移除
         * 3. 最后线程安全的map 剩余的即为所有的未能被broker确认的消息,统一进行处理
         */
        channel.addConfirmListener((deliveryTag, multiple) -> System.out.println(deliveryTag+"已经被broker确认"), (deliveryTag, multiple) -> { });
        long start=System.currentTimeMillis();
        for (int i = 0; i < 1000 ; i++) {
            channel.basicPublish("","aysnc",null,(i+"").getBytes());
        }
        long end=System.currentTimeMillis();
        System.err.println("总共消耗的时间为"+(end-start)+"feifeifeifeifeifeifeifeifeifeifeifeifeifeifei");//31ms     
三种发布确认速度对比

单独发布消息

同步等待确认,简单,但吞吐量非常有限

批量发布消息

批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题

异步处理

最佳性能和资源使用,在出现错误的情况下可以很好的控制,但是实现起来稍微难些

交换机

rabbitmq消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至不知道这些消息传递到了哪些队列中

相反生产者只能将消息发送到交换机,交换机工作的内容非常简单,一方面他接受来自生产者的消息,另一方面将他们推入队列,交换机必须确切知道如何处理收到的消息,是应该吧这些消息放到特定队列还是说应该丢弃他们,这就由交换机的类型类决定
在这里插入图片描述

fanout

类似于广播的模式,它是将收到的所有消息广播到它知道的所有队列中。当然系统中也是有默认的exchange类型的
在这里插入图片描述

Direct exchange

什么是bindings,绑定是交换机和队列之间的桥梁关系,也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定参数:rouingkey 来表示也可称为binding key,创建绑定可以使用代码:

channel.queueBind(queueName,exchangename,"routingKey")

绑定之后的意义尤其交换类型决定

Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingkey队列中去
在这里插入图片描述

消费者代码如下:

    public static void main(String[] args) throws IOException {
        Channel channel = utils.getchannel();
        channel.exchangeDeclare("feifei_direct","direct");
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"feifei_direct","fei");
        channel.basicConsume(queue, true, (consumerTag, message) -> {
            String msg=new String(message.getBody());
            System.out.println(msg);
        }, consumerTag -> {});
    }

生产者代码如下

    public static void main(String[] args) throws Exception{
        Channel channel = utils.getchannel();
        channel.exchangeDeclare("feifei_direct","direct");
        channel.basicPublish("feifei_direct","xiang",null,"菲菲啊啊啊啊".getBytes("utf-8"));
    }
多重绑定

在这里插入图片描述
如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,这种情况下虽然绑定类型是direct但是它表现得就和fanout就有点类似了,就跟广播差不多

Topic exchange

发送到类型是topic交换机的消息routing_key不能随意写,必须满足一定的条件,它必须是一个单词列表,以点号分割开,这些单词可以是任意单词。当然这个单词列表最多不能超过255个字节

*:可以替代一个单词

#:可以替代零个或者多个单词

示例代码如下:

消费者端

public static void main(String[] args) throws Exception {
        Channel channel = utils.getchannel();
        String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("feifei_topic","topic");
        channel.queueBind(queue,"feifei_topic","cxf.*");
        channel.basicConsume(queue, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                String msg=new String(message.getBody());
                System.out.println(msg);
            }
        }, consumerTag -> {});
    }

生产者端

    public static void main(String[] args) throws Exception{
        Channel channel = utils.getchannel();
        channel.exchangeDeclare("feifei_topic","topic");
        channel.basicPublish("feifei_topic","cxf.name",null,"cxf".getBytes());
    }

死信队列

概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

死信的来源

消息 TTL 过期

队列达到最大长度(队列满了,无法再添加数据到 mq 中)

消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
在这里插入图片描述
下边示例下消息消息ttl过期的代码

c1的代码

public static void main(String[] args) throws Exception{
        /**
         * 正常队列的绑定和死信队列的绑定都在这里创建好
         * 同时,正常队列和死信队列的关系也在这里定义好了
         */
        Channel channel = utils.getchannel();
        channel.exchangeDeclare("normal","direct");
        channel.exchangeDeclare("sixin","direct");
        Map<String,Object> arg=new HashMap<>();
        //设置消息在队列中过期时间,但不建议在消费者定义,生产者定义更为灵活
//        arg.put("x-message-ttl",10000);
        arg.put("x-dead-letter-exchange","sixin");
        arg.put("x-dead-letter-routing-key","fei");
        channel.queueDeclare("normal_queue",false,false,false,arg);
        channel.queueDeclare("sixin_queue",false,false,false,null);
        channel.queueBind("normal_queue","normal","xiang");
        channel.queueBind("sixin_queue","sixin","fei");
    }

producer的代码

    public static void main(String[] args) throws Exception{
        Channel channel = utils.getchannel();
        channel.exchangeDeclare("normal","direct");
        AMQP.BasicProperties pro=new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i <10 ; i++) {
            channel.basicPublish("normal","xiang",pro,"肥肥".getBytes("utf-8"));
        }
    }

延迟队列

先来回顾下上边提到的死信队列,它的形成原因可能是

  • ttl过期
  • 超过最大队列长度
  • 消息被拒绝

此时如果对应的消费者一直不存在,生产者对消息设置了过期时间,在不考虑,超过最大队列长度和消息被拒绝,那么每条消息被消费的最小时间即为设置的过期时间,即随后所有的消息在过期后都会被,死信队列对应的消费者消费

这即为延迟队列

延迟队列概念

延时队列,队列内都是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或者之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

使用场景

以下图为例子
在这里插入图片描述
拿购票为例子,先进性座位的选取,在最后在进行付款,但付款肯定是有时效的,比如说要在30分钟内进行付款

  • 生成订单后,将其记录到延迟队列中,生产者端将ttl设置为30分钟
  • 延迟队列的消费者检查数据库中,对应订单的付款字段是否已经付款,未付款的话将座位的状态再次更改为可售
  • 若用户在30分钟内已经付款则更新该订单的数据库表
  • 若在30分钟内已经付款,延迟队列的消息在消费的时候也会在查表时,判断出已经进行了付款,即不用再进行操作

时延队列的优化,时延队列如果只是在和交换机绑定的队列中进行设置,此时使用时只需要将对应的时延的消息按照routingkey转发到不同的时延队列也同样可以实现,时延队列,但是同样的问题也就来了,如果我们需要的时延改变了,就得创建一个新的队列,设置我们需要的时延,此时即可以设置一个没有过期时间的队列,在消息的生产者端进行消息的生产
在这里插入图片描述

/*
按照上图架构图构建的mq
 */
@Configuration
public class MqConfig {
    @Bean("queue1")
    Queue queue1(){
        return QueueBuilder.nonDurable("10sde")
                .ttl(10000)
                .deadLetterExchange("direct_low")
                .deadLetterRoutingKey("sila")
                .build();
    }
    @Bean("queue2")
    Queue queue2(){
        return QueueBuilder.nonDurable("30sde")
                .ttl(30000)
                .deadLetterExchange("direct_low")
                .deadLetterRoutingKey("sila")
                .build();
    }
    @Bean("queue3")
    Queue queue3(){
        return QueueBuilder.nonDurable("sixin")
                .build();
    }
    @Bean("queue4")
    Queue queue4(){
        return QueueBuilder.nonDurable("0sde")
                .deadLetterRoutingKey("sila")
                .deadLetterExchange("direct_low")
                .build();
    }
    @Bean("direct_top")
    DirectExchange directExchange(){
        return new DirectExchange("direct_top");
    }
    @Bean("direct_low")
    DirectExchange directExchange1(){
        return new DirectExchange("direct_low");
    }
    @Bean
    Binding binding0(DirectExchange direct_top,Queue queue4){
        return BindingBuilder.bind(queue4).to(direct_top).with("0");
    }
    @Bean
    Binding binding1(Queue queue1,DirectExchange direct_top){
        return BindingBuilder.bind(queue1).to(direct_top).with("10");
    }
    @Bean
    Binding binding2(Queue queue2,DirectExchange direct_top){
        return BindingBuilder.bind(queue2).to(direct_top).with("30");
    }
    @Bean
    Binding binding3(Queue queue3,DirectExchange direct_low){
        return  BindingBuilder.bind(queue3).to(direct_low).with("sila");
    }

}

消息的生产者端进行消息过期时间的设置

@RestController
public class test {
    @Autowired
    AmqpTemplate amqpTemplate;
    @RequestMapping("/test")
    public void test(){
        amqpTemplate.convertAndSend("direct_top","0","菲菲菲", CorrelationData->{
            CorrelationData.getMessageProperties().setExpiration("20000");
            return CorrelationData;
        });
    }
}

看起来似乎没有什么问题,但是 最开始的时候,就介绍过如果使用在消息属性上设置ttl的方式,消息可能不会按时“死亡”,因为rabbbitmq只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时 时长很短,第二个消息并不会优先得到执行

这里就需要基于插件的延迟队列

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>