通过Redis监听机制集成WebSocket实现主动数据推送(Mqtt物联网)

通过Redis监听机制集成WebSocket实现主动数据推送(附代码)

需求

后台实时获取Redis里写入的数据,前端实时展示。我这里应用场景是终端向mqtt推送消息,mqtt将消息存入Redis。后端将消息实时推送前端页面进行展示。

前端获取数据的方式

主动获取:

这种方式有很多,axios,jq,dwr,等等。这种方式有一个特点,都是前端主动去请求后端接口,后端进行响应,平时情况很好使,但在需要实时获取数据的场景则不好用。ajax虽然也可以实时获取数据,但也是需要不断轮询向后端发送请求来获取数据,对后台来说是一种很大的开销,尤其是数据量大,并发访问时。

被动获取:

这篇文章讲的就是前端如何被动获取数据,主要通过WebSocket来实现,特别适合数据推送,极大的提升性能。跟普通的http、https协议有很大的不同,它采用ws、wss的协议;有兴趣可以自己了解,废话少说,上代码。

1、定义配置类

package com.alinket.aidms.device.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebScoket支持
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

2、添加WebSocketServer类,暴漏端口给前端

package com.alinket.aidms.device.websocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class WebSocketServer {

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    /**
     * 收到客户端消息后调用的方法
     * 在此处已预留可以添加需要推送的硬件信息 目前来看项目不需要选择推送温湿度,浓度,压强等数据
     *
     * @param message 客户端发送过来的消息
     * @functionname onMassage
     * @author zhp
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        //群发消息
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        synchronized (session) {
            this.session.getBasicRemote().sendText(message);
        }
    }


    /**
     * 群发自定义消息
     */
    public void sendInfo(String message) {
        log.info("推送消息到窗口");
        for (WebSocketServer item : webSocketSet) {
            try {
                //这里可以设定全部推送
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket发生错误" + error);
        error.printStackTrace();
    }

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        log.info("websocket:有新连接开始监听!");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        log.info("连接关闭!");
    }

}

前端通过ws://IP:端口号/websocket(这里我自己路径是websocket,自行修改)来进行连接

3、Redis配置

package com.alinket.aidms.device.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfigWebSocket extends CachingConfigurerSupport {
    /**
     * retemplate相关配置
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // 值采用json序列化
        template.setValueSerializer(jacksonSeial);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }

    /**
     * 对hash类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    /**
     * 对redis字符串类型数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    /**
     * 对链表类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    /**
     * 对无序集合类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    /**
     * 对有序集合类型的数据操作
     *
     * @param redisTemplate
     * @return
     */
    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }
}

4、Redis配置文件(yml格式)

  redis:
    host: 127.0.0.1
    port: 6379
    database: 1
    timeout: 60s
    lettuce:
      pool:
        max-active: 8
        max-wait: 10000
        max-idle: 30
        min-idle: 10

5、通过监听从Redis消息队列中读取消息

package com.alinket.aidms.device.listener;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;


@Configuration
public class RedisMessageListener{

    //不同的频道名
    private static final  String channel = "aidms*";

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean//相当于xml中的bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter,MessageListenerAdapter listenerAdapterTimeOut) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //listenerAdapter的通道
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisMessageListener.channel));
        return container;
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    /**redis 读取内容的template */
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

    /**
     * 监听redis超时
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapterTimeOut(MessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessageTimeOut");
    }
}

6、消息监听器配置完成,写发送消息的类

package com.alinket.aidms.device.listener;

import com.alibaba.fastjson.JSONObject;
import com.alinket.aidms.device.websocket.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;


@Component
@Slf4j
public class MessageReceiver {

    WebSocketServer webSocketServer = new WebSocketServer();
    /**Battery*/
    public void receiveMessage(String message){
        System.out.println("data:{}"+message);
        try {
            webSocketServer.sendInfo(message);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }

    /**CH4*/
    public void receiveMessage2(String message){
        System.out.println("CH4:"+message);
        //CH4.add(message);
        List list = new ArrayList();
        list.add("CH4");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("CH4 Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }
    /**Humiture*/
    public void receiveMessage3(String message){
        System.out.println("Humiture:"+message);
        //Humiture.add(message);
        List list = new ArrayList();
        list.add("Humiture");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("Humiture Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }
    /**Location*/
    public void receiveMessage4(String message){
        System.out.println("Location:"+message);
        //Location.add(message);
        List list = new ArrayList();
        list.add("Location");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("Location Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }
    /**Pressure*/
    public void receiveMessage5(String message){
        System.out.println("Pressure:"+message);
        //Pressure.add(message);
        List list = new ArrayList();
        list.add("Pressure");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("Pressure Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }
    /**SignalIntensity*/
    public void receiveMessage6(String message){
        System.out.println("SignalIntensity:"+message);
        //SignalIntensity.add(message);
        List list = new ArrayList();
        list.add("SignalIntensity");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("SignalIntensity Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }
    /**Warn*/
    public void receiveMessage7(String message){
        System.out.println("Warn:"+message);
        //Warn.add(message);
        List list = new ArrayList();
        list.add("Warn");
        list.add(message.replaceAll("\\",""));
        String json = JSONObject.toJSONString(list);
        log.info("Warn Data is loading out");
        try {
            webSocketServer.sendInfo(json);
        } catch (Exception e) {
            log.info("消息发送失败!IO异常"+e);
            e.printStackTrace();
        }
        //return message;
    }

    public void receiveMessageTimeOut(String message){
        if(message.equals("accessToken")){
            // 1小时超时
            // 调用刷新accessToken接口
            log.info("accessToken超时");
            try {
//                appAuthorization.getRefreshToken();
            } catch (Exception e) {
                log.info("accessToken超时:刷新accessToken失败!");
                e.printStackTrace();
            }
        }
        if( message.equals("refreshToken")){
            // 1天超时
            // 调用刷新鉴权接口获取 accessToken 和 refreshToken
            log.info("refreshToken超时");
            try {
//                appAuthorization.getAppAuthorizationInfo();
            } catch (Exception e) {
                log.info("refreshToken超时:重新获取accessToken失败!");
                e.printStackTrace();
            }
        }
    }
}

至此所有配置完成,最后一步测试需要连接WebCocket,使用EMQ X主动发送数据

1、连接WebSocket,使用网上任意一款WebSocket在线测试工具即可

 

 输入地址端口(上边提到过),点击WebSocket连接,这是通过输入框可以实现客户端向服务器发送

2、使用EMQ X工具通过Mqtt向Redis里推送数据,实现客户端实时接收

 接受成功

 至此,全部完成

注意:这篇文章主要针对物联网,终端拿到的消息实时获取推送。终端给Mqtt,Mqtt给Redis,后端从Redis实时拿给前端。按需所取,码字不易,不喜勿喷。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>