zoukankan      html  css  js  c++  java
  • java-websocket客户端 断线重连 注入Service问题

    java版客户端:

    使用开源项目java-websocket, github地址: https://github.com/TooTallNate/Java-WebSocket

    github上有很多示例,具体可以去查看

    此处主要是记录java-websocket实现客户端,并解决无法使用Service层方法(service为null)的问题,以及断线重连

    引用包

    <dependency>
        <groupId>org.java-websocket</groupId>
        <artifactId>Java-WebSocket</artifactId>
        <version>1.3.9</version>
    </dependency>

    第一版,使用getBean获取Service层方法,并且实现断线重连

    使用的是GitHub上的demo示例

    import com.alibaba.fastjson.JSONArray;import com.sensor.vibration.utils.ApplicationContextRegister;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.java_websocket.client.WebSocketClient;
    import org.java_websocket.drafts.Draft;
    import org.java_websocket.handshake.ServerHandshake;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    
    import java.net.URI;
    import java.util.Map;
    
    /** This example demonstrates how to create a websocket connection to a server. Only the most important callbacks are overloaded. */
    
    @Slf4j
    public class SensorWebSocketClient extends WebSocketClient {
        @Autowired
        private UserService userService;
    
        public SensorWebSocketClient(URI serverUri , Draft draft ) {
            super( serverUri, draft );
        }
    
        public SensorWebSocketClient(URI serverURI ) {
            super( serverURI );
        }
    
        public SensorWebSocketClient(URI serverUri, Map<String, String> httpHeaders ) {
            super(serverUri, httpHeaders);
        }
    
        @Override
        public void onOpen( ServerHandshake handshakedata ) {
            System.out.println( "opened connection" );
            // if you plan to refuse connection based on ip or httpfields overload: onWebsocketHandshakeReceivedAsClient
        }
    
        @Override
        public void onMessage( String msg ) {
            log.info("[websocket] 收到消息={}",msg);
        }
    
        @Override
        public void onClose( int code, String reason, boolean remote ) {
            // The codecodes are documented in class org.java_websocket.framing.CloseFrame
            System.out.println( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason );
        }
    
        @Override
        public void onError( Exception ex ) {
            ex.printStackTrace();
            // if the error is fatal then onClose will be called additionally
        }
    
    }

    新建一个类,创建一个方法,启动websocket

    import java.net.URI;
    import java.net.URISyntaxException;
    
    
    /**
     * Simple example to reconnect blocking and non-blocking.
     */
    
    public class ReconnectClient {
        public static void reconnect() throws URISyntaxException, InterruptedException{
            SensorWebSocketClient c = new SensorWebSocketClient( new URI( "ws://localhost:5005/websocket" ) );
            c.connectBlocking();
    
            new Thread(new Runnable() {
                public void run() {
                    System.out.println("Runnable running..");
                }
            }) {
                public void run() {
                    while (true){
                        try{
                            Thread.sleep(3000);
                            c.send("");
                        }catch (Exception e){
                            c.reconnect();
                        }
                    }
                };
            }.start();
        }
    }

    在新建一个类,程序启动的时候,调用上面的方法

    import com.sensor.vibration.utils.Common;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.net.URISyntaxException;
    import java.util.Map;
    
    
    @Slf4j
    @Component
    public class InitStart implements CommandLineRunner {
    
        @Override
        public void run(String... args) throws URISyntaxException, InterruptedException{
            ReconnectClient.reconnect();
        }
    
    }

    中间的启动类的方法可以省去,直接写在InitStart的run方法里面

    现在还不能使用Service层的方法,会报service为null异常,百度后,参考别人使用getBean方法,写一个工具类

    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @Lazy(false)
    public class ApplicationContextRegister  implements ApplicationContextAware {
        private static ApplicationContext APPLICATION_CONTEXT;
    
        /**
         * 设置spring上下文  *  * @param applicationContext spring上下文  * @throws BeansException
         */
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            APPLICATION_CONTEXT = applicationContext;
        }
    
        public static ApplicationContext getApplicationContext() {
            return APPLICATION_CONTEXT;
        }
    
    
    }

    在 SensorWebSocketClient.java 中使用Service

    @Autowired
    private UserService userService;
    
    ApplicationContext act = ApplicationContextRegister.getApplicationContext();
    userService=act.getBean(UserService.class);

    但是 领导不让用getBean这种方法,放弃

    第二版,使用Service层方法版本 + 断线重连

    实现:

    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.java_websocket.client.WebSocketClient;
    import org.java_websocket.handshake.ServerHandshake;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import java.net.URI;
    
    /**
     * Created by Chow on 2019/8/22
     */
    
    @Slf4j
    @Component
    public class WebSocketClientStart {
        @Autowired
        private UserService userService;
    
    
        @Bean
        public WebSocketClient webSocketClient() {
            try {
                WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:5005/websocket")) {
                    @Override
                    public void onOpen(ServerHandshake handshakedata) {
                        log.info("[websocket] 连接成功");
                    }
    
                    @Override
                    public void onMessage(String msg) {
                        try{
                            log.info("[websocket] 收到消息={}",msg);
    
                            if (msg == null || StringUtils.isBlank(msg)){
                                log.error("the msg message of websocket received is null");
                                this.send("");
                                return;
                            }
                            JSONArray jsonArray = JSONArray.parseArray(msg);
                            if (jsonArray == null || jsonArray.size() == 0){
                                log.info("log: the message of websocket received is empty");
                            }
    
                            vibrationAlarmService.alarmAnalysis(jsonArray);
                            this.send("");
                        }catch (Exception e){
                            log.error(e.getMessage(), e);
                            this.send("");
                        }
    
                    }
    
                    @Override
                    public void onClose(int code, String reason, boolean remote) {
                        log.info("[websocket] 退出连接");
                    }
    
                    @Override
                    public void onError(Exception ex) {
                        log.info("[websocket] 连接错误={}",ex.getMessage());
                    }
                };
                webSocketClient.connect();
                new Thread(new Runnable() {
    
                    public void run() {
                        System.out.println("Runnable running..");
                    }
    
                }) {
    
                    public void run() {
                        while (true){
                            try{
                                Thread.sleep(3000);
                                webSocketClient.send("");
                            }catch (Exception e){
                                webSocketClient.reconnect();
                            }
                        }
    
                    }
    
                }.start();
                return webSocketClient;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
    
    }

     测试服务端代码

    需要引入包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
        <version>2.1.3.RELEASE</version>
    </dependency>

    server:

    import java.io.IOException;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.ServerEndpoint;
    
    import org.springframework.stereotype.Component;
    
    @ServerEndpoint(value = "/websocket")
    @Component
    public class MyWebSocket {
        //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
        private static int onlineCount = 0;
    
        //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
        private static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();
    
        //与某个客户端的连接会话,需要通过它来给客户端发送数据
        private Session session;
    
        /**
         * 连接建立成功调用的方法*/
        @OnOpen
        public void onOpen(Session session) {
            this.session = session;
            webSocketSet.add(this);     //加入set中
            addOnlineCount();           //在线数加1
            System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
            try {
                sendMessage("当前在线人数为" + getOnlineCount());
            } catch (IOException e) {
                System.out.println("IO异常");
            }
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose() {
            webSocketSet.remove(this);  //从set中删除
            subOnlineCount();           //在线数减1
            System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
        }
    
        /**
         * 收到客户端消息后调用的方法
         *
         * @param message 客户端发送过来的消息*/
        @OnMessage
        public void onMessage(String message, Session session) {
            System.out.println("来自客户端的消息:" + message);
    
            //群发消息
            for (MyWebSocket item : webSocketSet) {
                try {
                    item.sendMessage(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @OnError
        public void onError(Session session, Throwable error) {
            System.out.println("发生错误");
            error.printStackTrace();
        }
    
    
        public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        }
    
    
        /**
         * 群发自定义消息
         * */
        public static void sendInfo(String message) throws IOException {
            for (MyWebSocket item : webSocketSet) {
                try {
                    item.sendMessage(message);
                } catch (IOException e) {
                    continue;
                }
            }
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        public static synchronized void addOnlineCount() {
            MyWebSocket.onlineCount++;
        }
    
        public static synchronized void subOnlineCount() {
            MyWebSocket.onlineCount--;
        }
    }
  • 相关阅读:
    The Python Standard Library
    Python 中的round函数
    Python文件类型
    Python中import的用法
    Python Symbols 各种符号
    python 一行写多个语句
    免费SSL证书(https网站)申请,便宜SSL https证书申请
    元宇宙游戏Axie龙头axs分析
    OLE DB provider "SQLNCLI10" for linked server "x.x.x.x" returned message "No transaction is active.".
    The operation could not be performed because OLE DB provider "SQLNCLI10" for linked server "xxx.xxx.xxx.xxx" was unable to begin a distributed transaction.
  • 原文地址:https://www.cnblogs.com/zhzhlong/p/11351458.html
Copyright © 2011-2022 走看看