zoukankan      html  css  js  c++  java
  • 物联网架构成长之路(58)-用户与应用服务器之WebSocket通信

    一、前言

      之前的物联网篇,有介绍过用户手机APP或在H5端,可以作为mqtt的客户端,通过tcp:1883或者Websocket:8083,连接到EMQ Broker上,订阅设备的主题,从而收到设备上报的数据。但是,在处理工业物联网的时候,会出现,物联网平台会对接客户自己的应用。设备上报与下发都是通过物联网平台与客户的应用服务器进行通信。客户应用服务器转发物联网平台,设备上报的数据到客户应用下自己的用户终端(手机)。这个时候,就需要建立起客户应用服务器与用户的通道了。传统上,可以使用http轮询。也可以使用mqtt类似的进行通信。但是这里就介绍通过SpringBoot 简单搭建一个WebSocket服务。建立起客户应用服务器与用户的通信。

    二、简单流程

      物联网平台只做设备接入、设备管理、客户接入、客户管理。客户自己的应用与终端客户,则又客户自行开发。从而进行解耦,这个流程方式与功能划分,与阿里的物联网平台类似。后续我们也会基于物联网平台开发一些标准化的应用。

    三、后端代码

      3.1 pom.xml

    1 <dependency>
    2     <groupId>org.springframework.boot</groupId>
    3     <artifactId>spring-boot-starter-websocket</artifactId>
    4 </dependency>

      3.2 config配置

      WebSocketConfig.java

     1 package com.wunaozai.demo.websocket.config;
     2 
     3 import org.springframework.context.annotation.Bean;
     4 import org.springframework.context.annotation.Configuration;
     5 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
     6 
     7 @Configuration
     8 public class WebSocketConfig {
     9 
    10     /**
    11      * 自动注入Websocket Bean
    12      * @return
    13      */
    14     @Bean
    15     public ServerEndpointExporter serverEndpointExporter() {
    16         return new ServerEndpointExporter();
    17     }
    18 }

      GetHttpHeaderConfig.java

     1 package com.wunaozai.demo.websocket.config;
     2 
     3 import java.util.List;
     4 import java.util.Map;
     5 
     6 import javax.websocket.HandshakeResponse;
     7 import javax.websocket.server.HandshakeRequest;
     8 import javax.websocket.server.ServerEndpointConfig;
     9 import javax.websocket.server.ServerEndpointConfig.Configurator;
    10 
    11 import org.springframework.context.annotation.Bean;
    12 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    13 
    14 /**
    15  * 将请求中的Header设置到属性中
    16  * @author wunaozai
    17  * @Date 2020-10-27
    18  */
    19 public class GetHttpHeaderConfigurator extends Configurator {
    20 
    21     @Override
    22     public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
    23         System.out.println("modifyHandshake 协议升级101");
    24         Map<String, List<String>> headers = request.getHeaders();
    25         config.getUserProperties().put("headers", headers);
    26         super.modifyHandshake(config, request, response);
    27     }
    28     @Bean
    29     public ServerEndpointExporter serverEndpointExporter() {
    30         return new ServerEndpointExporter();
    31     }
    32 }

      3.3 service 服务层

      WebSocketService.java

     1 package com.wunaozai.demo.websocket.service;
     2 
     3 import java.io.IOException;
     4 import java.util.List;
     5 import java.util.Map;
     6 import java.util.concurrent.ConcurrentHashMap;
     7 
     8 import javax.websocket.EndpointConfig;
     9 import javax.websocket.OnClose;
    10 import javax.websocket.OnMessage;
    11 import javax.websocket.OnOpen;
    12 import javax.websocket.Session;
    13 import javax.websocket.server.PathParam;
    14 import javax.websocket.server.ServerEndpoint;
    15 
    16 import org.springframework.stereotype.Component;
    17 
    18 import com.google.gson.Gson;
    19 import com.wunaozai.demo.websocket.config.GetHttpHeaderConfigurator;
    20 
    21 /**
    22  * 服务层
    23  * 注意: 这里的ServerEndpoint不会进入拦截器
    24  * @author wunaozai
    25  * @Date 2020-10-27
    26  */
    27 @ServerEndpoint(value="/websocket/{token}", configurator=GetHttpHeaderConfigurator.class)
    28 @Component
    29 public class WebSocketService {
    30 
    31     //用来存放每个客户端对应的WebSocketServer对象
    32     private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    33 
    34     /**
    35      * 成功建立连接
    36      * @param session
    37      * @param token
    38      */
    39     @OnOpen
    40     public void onOpen(Session session, @PathParam(value="token") String token, EndpointConfig config) {
    41         try {
    42             Map<String, List<String>> headers = 
    43                     (Map<String, List<String>>) config.getUserProperties().get("headers");
    44             System.out.println(new Gson().toJson(headers)); //这里可以通过Headers或者token进行认证
    45             if(!checkToken(token)) {
    46                 //校验失败,禁止建立连接
    47                 session.close();    
    48             }
    49             System.out.println("建立连接: " + token);
    50             sessionPools.put(token, session);
    51         } catch (Exception e) {
    52             e.printStackTrace();
    53         }
    54     }
    55     
    56     @OnClose
    57     public void onClose(@PathParam(value="token") String token) {
    58         sessionPools.remove(token);
    59         System.out.println("断开连接: " + token);
    60     }
    61     
    62     @OnMessage
    63     public void onMessage(@PathParam(value="token")String token, String message) throws IOException {
    64         System.out.println("收到客户端发来信息: " + message);
    65     }
    66     
    67     //发送广播信息
    68     public void sendBroadCast(String message) throws IOException {
    69         //这里从第三方服务或调用该接口向所有客户端进行广播信息
    70         for(Session session: sessionPools.values()) {
    71             sendMessage(session, "{"msg":""+message+""}");
    72         }
    73     }
    74     //指定单个发送数据
    75     public void sendMessage(Session session, String message) throws IOException {
    76         if(session != null && session.isOpen()) {
    77             synchronized (session) {
    78                 session.getBasicRemote().sendText(message);
    79             }
    80         }
    81     }
    82     
    83     public String getToken() {
    84         //这里随机生成Token,并缓存到Redis,设置10分钟过期策略
    85         return "access_token_random_qcb0a6S";
    86     }
    87     public boolean checkToken(String token) {
    88         if("access_token_random_qcb0a6S".equals(token)) {
    89             return true;
    90         }
    91         return false;
    92     }
    93 }

      3.4 controller 控制器

      WebSocketController.java

     1 package com.wunaozai.demo.websocket.controller;
     2 
     3 import java.io.IOException;
     4 
     5 import org.springframework.beans.factory.annotation.Autowired;
     6 import org.springframework.stereotype.Controller;
     7 import org.springframework.web.bind.annotation.RequestMapping;
     8 import org.springframework.web.bind.annotation.ResponseBody;
     9 
    10 import com.wunaozai.demo.websocket.service.WebSocketService;
    11 
    12 @Controller
    13 @RequestMapping(value="/websocket")
    14 public class WebSocketController {
    15 
    16     @Autowired
    17     private WebSocketService websocketService;
    18     
    19     @RequestMapping(value="index")
    20     public String index() {
    21         return "websocket/index";
    22     }
    23     
    24     @ResponseBody
    25     @RequestMapping(value="send")
    26     public String sendBroadCast(String msg) throws IOException {
    27         websocketService.sendBroadCast(msg);
    28         return "ok.";
    29     }
    30     @ResponseBody
    31     @RequestMapping(value="token")
    32     public String getToken() {
    33         //这里可以判断请求是否合法,查询数据库
    34         //如果在拦截器已经做处理的,就不需要进行判断
    35         return websocketService.getToken();
    36     }
    37     
    38 }

      3.5 目录结构

      

    四、前端代码

     1 <!DOCTYPE html>
     2 <html lang="en">
     3   <head>
     4     <meta charset="utf-8">
     5     <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
     6     <meta name="description" content="">
     7     <meta name="author" content="">
     8     <title>Websocket 例子</title>
     9   </head>
    10   <body>
    11   <button id="btn-token">获取Token</button>
    12   <input id="token" placeholder="请填写Token"/>
    13   <br>
    14   <button id="btn-connect">连接WebSocket</button>
    15   <input id="message" value="Hello" placeholder="请填写要发送的信息"  />
    16   <button id="btn-send">发送信息</button>
    17   </body>
    18   
    19   <script type="text/javascript" src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
    20   <script type="text/javascript">
    21     jQuery(function(){
    22         initEvent();
    23     });
    24     
    25     function initEvent(){
    26         $("#btn-token").bind('click', function(){
    27             $.post('/websocket/token', function(ret){
    28                 console.log(ret);
    29                 $("#token").val(ret);
    30             });
    31         });
    32         $("#btn-connect").bind('click', function(){
    33             openSocket();
    34         });
    35         $("#btn-send").bind('click', function(){
    36             sendMessage();
    37         });
    38     }
    39     var socket;
    40     function openSocket() {
    41         if(typeof(WebSocket) == "undefined") {
    42             console.log("您的浏览器不支持WebSocket");
    43         }else{
    44             console.log("您的浏览器支持WebSocket");
    45             //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
    46             var token = $("#token").val();
    47             var socketUrl="ws://127.0.0.1:8080/websocket/"+token;
    48             console.log(socketUrl);
    49             if(socket!=null){
    50                 socket.close();
    51                 socket=null;
    52             }
    53             socket = new WebSocket(socketUrl);
    54             //打开事件
    55             socket.onopen = function() {
    56                 console.log("websocket已打开");
    57                 //socket.send("这是来自客户端的消息" + location.href + new Date());
    58             };
    59             //获得消息事件
    60             socket.onmessage = function(msg) {
    61                 var serverMsg = "收到服务端信息:" + msg.data;
    62                 console.log(serverMsg);
    63                 //发现消息进入    开始处理前端触发逻辑
    64             };
    65             //关闭事件
    66             socket.onclose = function() {
    67                 console.log("websocket已关闭");
    68             };
    69             //发生了错误事件
    70             socket.onerror = function() {
    71                 console.log("websocket发生了错误");
    72             }
    73         }
    74     }
    75     function sendMessage() {
    76         if(typeof(WebSocket) == "undefined") {
    77             console.log("您的浏览器不支持WebSocket");
    78         }else {
    79             // console.log("您的浏览器支持WebSocket");
    80             var message = $("#message").val();
    81             var msg = '{"contentText":"'+message+'"}';
    82             console.log(msg);
    83             socket.send(msg);
    84         }
    85     }
    86   </script>
    87 </html>

    五、运行效果

    1. 访问页面,然后请求获取Token,获取到【access_token_random_qcb0a6S】,然后拼接EndPoint,ws://127.0.0.1:8080/websocket/access_token_random_qcb0a6S

    2. 服务器执行协议升级握手协议101,并将请求里面的Headers设置到当前请求的Session中

    3. 完成协议升级。由Http协议转WebSocket协议。进入WebSocketService的OnOpen方法。

    4. OnOpen方法,判断当前ws连接的token或者headers里面是否包含认证信息(比如jwt或oauth、自定义token等方式)

    5. 完成连接建立,将ws连接session放到session池里面,以供下一步广播信息给ws客户端

    6. ws客户发送Hello到服务器,服务器可以做数据处理应答等业务操作

    7. 通过请求 http://127.0.0.1:8080/websocket/send?msg=Hi 模拟【物联网平台】通过rpc或者amqp等远程调用协议,调用发送设备上报的信息到ws客户端

    8.1 假如广播设备上报信息给所有客户端,那么将从session池中遍历所有ws客户端,然后依次发送数据。

    8.2 假如是单个发送数据。那么需要根据设备的ID信息,查询设备与用户的绑定信息,再转发数据给ws客户端(用户)

    9. 客户端(手机APP、H5等)通过弹框等方式,提醒用户

    参考资料:

              https://www.cnblogs.com/JohanChan/p/12522001.html

    本文地址:https://www.cnblogs.com/wunaozai/p/13889216.html
    本系列目录: https://www.cnblogs.com/wunaozai/p/8067577.html
    个人主页:https://www.wunaozai.com/

  • 相关阅读:
    比较.NET程序集(DLL或EXE)是否相同
    [转] JavaScript数组去重(12种方法)
    [转] js网络请求跨域问题汇总(携带cookie)
    [转] JS中arr.forEach()如何跳出循环
    [转] vue前端异常监控sentry实践
    [转] vue父组件触发子组件事件
    [转] vue 自定义组件使用v-model
    [转] Nginx配置性能优化
    [转] linux 查找文本过滤grep正则表达式命令详解用法
    [转] Nginx配置中的location、root、alias
  • 原文地址:https://www.cnblogs.com/wunaozai/p/13889216.html
Copyright © 2011-2022 走看看