springboot集成rabbitmq接收消息问题

springboot集成rabbitmq接收消息问题

是在测试fanout模式的exchange消息模型时遇到的问题,发消息没毛病,问题一直是出在接收消息时,有一点很诡异,同样的代码,有时候不报错,报错则显示的是与对象的json转换有关。

报错信息:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134) ~[spring-rabbit-2.3.6.jar:2.3.6]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632) ~[spring-rabbit-2.3.6.jar:2.3.6]
	... 10 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `[B` out of START_OBJECT token
 at [Source: (String)"{"id":1,"module":"crud","name":"老狗","desc":"desc"}"; line: 1, column: 1]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1468) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1242) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1148) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:220) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$ByteDeser.deserialize(PrimitiveArrayDeserializers.java:478) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$ByteDeser.deserialize(PrimitiveArrayDeserializers.java:426) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526) ~[jackson-databind-2.11.4.jar:2.11.4]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468) ~[jackson-databind-2.11.4.jar:2.11.4]
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321) ~[spring-amqp-2.3.6.jar:2.3.6]
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291) ~[spring-amqp-2.3.6.jar:2.3.6]
	... 18 common frames omitted

在网上搜到的参考信息,解决了问题,贴上链接
https://www.cnblogs.com/mussessein/p/12106553.html,问题是出在cosumer接收消息时接收的处理方式
错误代码如下:注意是用@Payload byte[] msg 接收消息的,这种方式就会报错

/**
     * 监听并消费队列中的消息-fanoutExchange-one-这是第一条队列对应的消费
     者
     */

    @RabbitListener(queues = "local.huang.mq.fanout.one.queue",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgOne(@Payload byte[] msg){
        try {
            //监听消费队列中的消息,并进行解析处理
            EventInfo info=objectMapper.readValue(msg, EventInfo.class);
            log.info("消息模型fanoutExchange-one-消费者-监听消费到消息: {} ",info);
        }catch (Exception e){
            log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
        }
    }

正确方法如下:注意使用Message对象接收消息的(org.springframework.amqp.core.Message包下的)

    @RabbitListener(queues = "local.huang.mq.fanout.one.queue",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgOne(Message message){
        try {
            //监听消费队列中的消息,并进行解析处理
            byte[] body = message.getBody();
            EventInfo info=objectMapper.readValue(body, EventInfo.class);
            log.info("消息模型fanoutExchange-one-消费者-监听消费到消息: {} ",info);
        }catch (Exception e){
            log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
        }
    }

这样问题就解决了。
在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>