zoukankan      html  css  js  c++  java
  • WebSocket和kafka实现数据实时推送到前端

    一. 需求背景
         最近新接触一个需求,需要将kafka中的数据实时推送到前端展示。最开始想到的是前端轮询接口数据,但是无法保证轮询的频率和消费的频率完全一致,或造成数据缺失等问题。最终确定用利用WebSocket实现数据的实时推送。
     
    二. websocket简介
         网上已经有好多介绍WebSocket的文章了,就不详细介绍了,这里只做简单介绍。 WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
     
    三. 服务端实现
      1. pom文件
      这里需要引用三个依赖。第一个为WebSocket需要的依赖,另外两个为kafka的依赖
     
     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>person</groupId>
     8     <artifactId>wbSocketkafka</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <dependencies>
    12         <!-- webSocket所需依赖 -->
    13         <dependency>
    14             <groupId>javax</groupId>
    15             <artifactId>javaee-api</artifactId>
    16             <version>7.0</version>
    17         </dependency>
    18         <!-- kafka 所需依赖 -->
    19         <dependency>
    20             <groupId>org.apache.kafka</groupId>
    21             <artifactId>kafka_2.9.2</artifactId>
    22             <version>0.8.1.1</version>
    23         </dependency>
    24         <dependency>
    25             <groupId>org.apache.kafka</groupId>
    26             <artifactId>kafka-clients</artifactId>
    27             <version>RELEASE</version>
    28         </dependency> 
    29     </dependencies>
    30 </project>

      2. webSocket服务端实现

     1 //此处定义接口的uri
     2 @ServerEndpoint("/wbSocket")
     3 public class WebSocket {
     4     private Session session;
     5     public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此处定义静态变量,以在其他方法中获取到所有连接
     6     
     7     /**
     8      * 建立连接。
     9      * 建立连接时入参为session
    10      */
    11     @OnOpen
    12     public void onOpen(Session session){
    13         this.session = session;
    14         wbSockets.add(this); //将此对象存入集合中以在之后广播用,如果要实现一对一订阅,则类型对应为Map。由于这里广播就可以了随意用Set
    15         System.out.println("New session insert,sessionId is "+ session.getId());
    16     }
    17     /**
    18      * 关闭连接
    19      */
    20     @OnClose
    21     public void onClose(){
    22         wbSockets.remove(this);//将socket对象从集合中移除,以便广播时不发送次连接。如果不移除会报错(需要测试)
    23         System.out.println("A session insert,sessionId is "+ session.getId());
    24     }
    25     /**
    26      * 接收前端传过来的数据。
    27      * 虽然在实现推送逻辑中并不需要接收前端数据,但是作为一个webSocket的教程或叫备忘,还是将接收数据的逻辑加上了。
    28      */
    29     @OnMessage
    30     public void onMessage(String message ,Session session){
    31         System.out.println(message + "from " + session.getId());
    32     }
    33 
    34     public void sendMessage(String message) throws IOException {
    35         this.session.getBasicRemote().sendText(message);
    36     }
    37 }

      3. kafka消费者实现

     1 public class ConsumerKafka extends Thread {
     2 
     3     private KafkaConsumer<String,String> consumer;
     4     private String topic = "kafkaTopic";
     5 
     6     public ConsumerKafka(){
     7 
     8     }
     9 
    10     @Override
    11     public void run(){
    12         //加载kafka消费者参数
    13         Properties props = new Properties();
    14         props.put("bootstrap.servers", "localhost:9092");
    15         props.put("group.id", "ytna");
    16         props.put("enable.auto.commit", "true");
    17         props.put("auto.commit.interval.ms", "1000");
    18         props.put("session.timeout.ms", "15000");
    19         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    20         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    21         //创建消费者对象
    22         consumer = new KafkaConsumer<String,String>(props);
    23         consumer.subscribe(Arrays.asList(this.topic));
    24         //死循环,持续消费kafka
    25         while (true){
    26             try {
    27                //消费数据,并设置超时时间
    28                 ConsumerRecords<String, String> records = consumer.poll(100);
    29                 //Consumer message
    30                 for (ConsumerRecord<String, String> record : records) {
    31                     //Send message to every client
    32                     for (WebSocket webSocket :wbSockets){
    33                         webSocket.sendMessage(record.value());
    34                     }
    35                 }
    36             }catch (IOException e){
    37                 System.out.println(e.getMessage());
    38                 continue;
    39             }
    40         }
    41     }
    42 
    43     public void close() {
    44         try {
    45             consumer.close();
    46         } catch (Exception e) {
    47             System.out.println(e.getMessage());
    48         }
    49     }
    50 
    51     //供测试用,若通过tomcat启动需通过其他方法启动线程
    52     public static void main(String[] args){
    53         ConsumerKafka consumerKafka = new ConsumerKafka();
    54         consumerKafka.start();
    55     }
    56 }
     
    P.S. 需要注意的是WebSocket对tomcat版本是有要求的,笔者使用的是7.0.7.8。
     
    四. 前端简单实现
     1 <!DOCTYPE html>
     2 <html lang="en">
     3 <head>
     4     <meta charset="UTF-8">
     5     <title>WebSocket client</title>
     6     <script type="text/javascript">
     7         var socket;
     8         if (typeof (WebSocket) == "undefined"){
     9             alert("This explorer don't support WebSocket")
    10         }
    11 
    12         function connect() {
    13             //Connect WebSocket server
    14             socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
    15             //open
    16             socket.onopen = function () {
    17                 alert("WebSocket is open");
    18             }
    19             //Get message
    20             socket.onmessage = function (msg) {
    21                 alert("Message is " + msg);
    22             }
    23             //close
    24             socket.onclose = function () {
    25                 alert("WebSocket is closed");
    26             }
    27             //error
    28             socket.onerror = function (e) {
    29                 alert("Error is " + e);
    30             }
    31         }
    32 
    33         function close() {
    34             socket.close();
    35         }
    36 
    37         function sendMsg() {
    38             socket.send("This is a client message ");
    39         }
    40     </script>
    41 </head>
    42 <body>
    43     <button onclick="connect()">connect</button>
    44     <button onclick="close()">close</button>
    45     <button onclick="sendMsg()">sendMsg</button>
    46 </body>
    47 </html>
     
    五. 结语
         以上基本可以实现将kafka数据实时推送到前端。这是笔者第一篇笔记,不足之处请指出、谅解。
         源码:https://github.com/youtNa/webSocketkafka
       引用:1. webSocket百度百科
  • 相关阅读:
    爬虫之JSON
    爬虫bs4案例
    爬虫bs4
    爬虫之Xpath案例
    爬虫之xpath
    监控 Kubernetes 集群应用
    手动部署k8s-prometheus
    ingress之tls和path使用
    ingress安装配置
    kube-dns和coreDNS的使用
  • 原文地址:https://www.cnblogs.com/nayt/p/6931499.html
Copyright © 2011-2022 走看看