websocket&nginx

WebSocket和nginx使用

http&ws https&wss

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。

它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。

在这里插入图片描述
其他特点包括:

(1)建立在 TCP 协议之上,服务器端的实现比较容易。
(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
(3)数据格式比较轻量,性能开销小,通信高效。
(4)可以发送文本,也可以发送二进制数据。
(5)没有同源限制,客户端可以与任意服务器通信。
(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

ws://example.com:80/some/path # 与http协议并行
wss://example.com:80/some/path # 与https协议并行

在这里插入图片描述

前端后端示例

vue

window.webSocket = new WebSocket(`${wsUrl}${id}`);
        // window.webSocket = new Rwebsocket(`${wsUrl}${id}`, null, { debug: true, reconnectInterval: 3000,automaticOpen:false })
        
        
        /*建立连接*/
        webSocket.onopen = e => {
            console.log('建立连接')
            heartCheck.reset().start(); // 成功建立连接后,重置心跳检测
        };
        /*连接关闭*/
        webSocket.onclose = e => {
            console.log("webSocket连接关闭");
        };
        // /*接收服务器推送消息*/
        webSocket.onmessage = e => {
            heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
            console.log(e,'接收服务器推送消息')
            let res=JSON.parse(e.data)
            callBack(res)
        };
        // /*连接发生错误时*/
        webSocket.onerror = e => {
            console.log('webSocket连接失败');
        }
  
  
        // 心跳检测, 每隔一段时间检测连接状态,如果处于连接中,就向server端主动发送消息,来重置server端与客户端的最大连接时间,如果已经断开了,发起重连。
        let heartCheck = {
          timeout: 55000, // 发一次心跳,比server端设置的连接时间稍微小一点,在接近断开的情况下以通信的方式去重置连接时间。
          serverTimeoutObj: null,
          reset: function () {
            // clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
          },
          start: function () {
            this.serverTimeoutObj = setInterval(function () {
                if (window.webSocket&&window.webSocket.readyState == 1) {
                    console.log("连接状态,发送消息保持连接");
                    window.webSocket.send(`{"toUserId":"${id}"}`);
                    heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
                } else {
                    console.log("断开状态,尝试重连");
                    window.webSocket.close();
                    Socket();
                }
            }, this.timeout)
          }
        }
  

java

pom.xml依赖

	 <!--websocket-->
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-websocket</artifactId>
     </dependency>

pojo消息实体

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SocketMessage {

    /** 类型 (1-预警消息、2-通知通告、3-系统消息)*/
    private Object type;

    /** 业务主键 */
    private String businessID;

    /** 消息标题 */
    private String title;

    /** 消息内容 */
    private String message;

    /** 提报数据 */
    private String commitType;

    /** 数据提报消息-组织ID */
    private String orgId;
}

WebSocketServer

/**
 * @description: WebSocket服务类
 * @autor: WJY
 * @create: 2021-10-27 11:15
 * @since: 1.8
 */
@ServerEndpoint("/ws/{userId}")
@Component
public class WebSocketServer {

    static Log log= LogFactory.get(WebSocketServer.class);
    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId="";

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            webSocketMap.put(userId,this);
            //加入set中
        }else{
            webSocketMap.put(userId,this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }

        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());

        try {
            webSocketMap.get(userId).
            sendMessage(JSONUtil.toJsonStr(new SocketMessage()));
            session.setMaxIdleTimeout(3600000);
        } catch (IOException e) {
            log.error("用户:"+userId+",网络异常!!!!!!");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                //从set中删除
                subOnlineCount();
            }
            log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
        }catch (Exception e){
            log.error("用户关闭连接!!!");
        }
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 发送自定义消息
     * */
    public void sendInfo(Object message, @PathParam("userId") String userId) throws IOException {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(JSONUtil.toJsonStr(message));
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId = this.userId;
                if (ObjectUtils.isNotEmpty(jsonObject) && ObjectUtils.isNotEmpty(jsonObject.getString("toUserId"))){
                    toUserId =jsonObject.getString("toUserId");
                }
                //传送给对应toUserId用户的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                }else{
                    log.error("请求的userId:"+toUserId+"不在该服务器上");
                    //否则不在这个服务器上,发送到mysql或者redis
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 错误消息处理
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

WebSocketConfig

/**
 * @description: webSocket对象配置类
 * @autor: WJY
 * @create: 2021-10-27 11:12
 * @since: 1.8
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

WebSocketController

/**
 * @description: WebSocket消息推送
 * @autor: WJY
 * @create: 2021-10-27 11:18
 * @since: 1.8
 */
@RestController
public class ForwardNewsController {

    @Resource
    private WebSocketUtils<SocketMessage> webSocketUtils;

    /**
     * 前端推送消息(测试消息推送接口)
     *
     * @param message
     * @param toUserId
     * @return
     */
    @GetMapping("/push/{toUserId}")
    public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) {
        // 模拟消息实时消息推送
        SocketMessage socketMessage = new SocketMessage("业务类型(1-预警消息、2-通知通告、3-系统消息)", "业务主键", "消息标题", "消息内容", null, null);
        webSocketUtils.sendCustomizeMessage(socketMessage, toUserId);
        return ResponseEntity.ok("MSG SEND SUCCESS");
    }
}

//    @GetMapping("index")
//    public ResponseEntity<String> index(){
//        return ResponseEntity.ok("请求成功");
//    }
//
//    /**
//     * 获取页面信息
//     * @return
//     */
//    @GetMapping("page")
//    public ModelAndView page(){
//        return new ModelAndView("websocket");
//    }

WebSocketUtils

/**
 * @description: webSocket工具类
 * @autor: WJY
 * @create: 2021-10-28 9:53
 * @since: 1.8
 */
@Component
@Slf4j
public class WebSocketUtils<T> {

    @Resource
    private WebSocketServer webSocketServer;

    /**
     * 发送自定义消息
     * @param msg
     */
    public void sendCustomizeMessage(T msg, String userId){
        try {
            webSocketServer.sendInfo(msg, userId);
        }catch (Exception i){
            log.error("WebSocketUtils.sendCustomizeMessage消息发送异常:{}", i.getMessage());
        }
    }
} 

权限路径过滤(springboog+springSecurity)

security:
  permit:
    list:
      - /ws/*

前端API

WebSocket 构造函数

WebSocket 对象作为一个构造函数,用于新建 WebSocket 实例。

var ws = new WebSocket('ws://localhost:8080');

执行上面语句之后,客户端就会与服务器进行连接。
实例对象的所有属性和方法清单,参见这里。

webSocket.readyState

readyState属性返回实例对象的当前状态,共有四种。

CONNECTING:值为0,表示正在连接。
OPEN:值为1,表示连接成功,可以通信了。
CLOSING:值为2,表示连接正在关闭。
CLOSED:值为3,表示连接已经关闭,或者打开连接失败。

下面是一个示例。

switch (ws.readyState) {
  case WebSocket.CONNECTING:
    // do something
    break;
  case WebSocket.OPEN:
    // do something
    break;
  case WebSocket.CLOSING:
    // do something
    break;
  case WebSocket.CLOSED:
    // do something
    break;
  default:
    // this never happens
    break;
}

webSocket.onopen

实例对象的onopen属性,用于指定连接成功后的回调函数。

ws.onopen = function () {
  ws.send('Hello Server!');
}

如果要指定多个回调函数,可以使用addEventListener方法。

ws.addEventListener('open', function (event) {
  ws.send('Hello Server!');
});

webSocket.onclose

实例对象的onclose属性,用于指定连接关闭后的回调函数。

ws.onclose = function(event) {
  var code = event.code;
  var reason = event.reason;
  var wasClean = event.wasClean;
  // handle close event
};

ws.addEventListener("close", function(event) {
  var code = event.code;
  var reason = event.reason;
  var wasClean = event.wasClean;
  // handle close event
});

webSocket.onmessage

实例对象的onmessage属性,用于指定收到服务器数据后的回调函数。

ws.onmessage = function(event) {
  var data = event.data;
  // 处理数据
};

ws.addEventListener("message", function(event) {
  var data = event.data;
  // 处理数据
});

注意,服务器数据可能是文本,也可能是二进制数据(blob对象或Arraybuffer对象)。

ws.onmessage = function(event){
  if(typeof event.data === String) {
    console.log("Received data string");
  }

  if(event.data instanceof ArrayBuffer){
    var buffer = event.data;
    console.log("Received arraybuffer");
  }
}

除了动态判断收到的数据类型,也可以使用binaryType属性,显式指定收到的二进制数据类型。

// 收到的是 blob 数据
ws.binaryType = "blob";
ws.onmessage = function(e) {
  console.log(e.data.size);
};

// 收到的是 ArrayBuffer 数据
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {
  console.log(e.data.byteLength);
};

webSocket.send()

实例对象的send()方法用于向服务器发送数据。

发送文本的例子。

ws.send('your message');

发送 Blob 对象的例子。

var file = document
  .querySelector('input[type="file"]')
  .files[0];
ws.send(file);

发送 ArrayBuffer 对象的例子。

// Sending canvas ImageData as ArrayBuffer
var img = canvas_context.getImageData(0, 0, 400, 320);
var binary = new Uint8Array(img.data.length);
for (var i = 0; i < img.data.length; i++) {
  binary[i] = img.data[i];
}
ws.send(binary.buffer);

webSocket.bufferedAmount

实例对象的bufferedAmount属性,表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。

var data = new ArrayBuffer(10000000);
socket.send(data);

if (socket.bufferedAmount === 0) {
  // 发送完毕
} else {
  // 发送还没结束
}

webSocket.onerror

实例对象的onerror属性,用于指定报错时的回调函数。

socket.onerror = function(event) {
  // handle error event
};

socket.addEventListener("error", function(event) {
  // handle error event
});

nginx相关配置

# 重点的两行配置
# 将nginx的请求头从http升级到websocket.
proxy_set_header Upgrade $http_upgrade;
# 进行nginx连接websocket
proxy_set_header Connection "upgrade";
user  root;
worker_processes  2;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;
    tcp_nodelay on;
    proxy_http_version 1.1;
    #keepalive_timeout  0; 0是禁止等待后台服务响应时间
    keepalive_timeout  65;
	
	server {
    	listen       80;
    	server_name  localhost;
    	client_max_body_size 2000M;
    	
    	# 开启gzip
        gzip on;

        # 启用gzip压缩的最小文件,小于设置值的文件将不会压缩
        gzip_min_length 1k;

        # gzip 压缩级别,1-9,数字越大压缩的越好,也越占用CPU时间,后面会有详细说明
        gzip_comp_level 9;

        # 进行压缩的文件类型。javascript有多种形式,后面的图片压缩不需要的可以自行删除
        gzip_types text/plain application/javascript application/x-javascript text/css application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png;

        # 是否在http header中添加Vary: Accept-Encoding,建议开启
        gzip_vary on;

        # 设置压缩所需要的缓冲区大小     
        gzip_buffers 4 16k;
    	
    	#获取用户真实ip
        proxy_set_header Host $host;
    	proxy_set_header X-Real-IP $remote_addr;
    	proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
		proxy_connect_timeout 1800;
        proxy_send_timeout 1800;
        proxy_read_timeout 1800;
   
		location / {
        	root   /tmp/production;
        	index  index.html index.htm;
		}
		
    	error_page   500 502 503 504  /50x.html;
    	location = /50x.html {
        	root   /usr/local/nginx/html;
    	}
    
   		# webSocket使用过必要的配置项项(单独配置的路径代理)
    	location /wsk/ {
            proxy_pass 代理的ip地址;
            # 重点的两行配置
            # 将nginx的请求头从http升级到websocket.
      		proxy_set_header Upgrade $http_upgrade;
      		#进行nginx连接websocket
    		proxy_set_header Connection "upgrade";
        }
    }

}

参考文献

https://www.ruanyifeng.com/blog/2017/05/websocket.html
https://www.tutorialspoint.com/websockets/websockets_communicating_server.htm
http://www.eclipse.org/jetty/documentation.php
https://www.cnblogs.com/mafly/p/websocket.html (nginx中502报错)
http://nginx.org/en/docs/http/websocket.html (nginx配置详解)

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>