zoukankan      html  css  js  c++  java
  • WebSocket 的使用

    Java 控制台程序实现类似广播功能

    服务器端代码

    添加 maven 依赖

    <dependency>
        <groupId>javax.websocket</groupId>
        <artifactId>javax.websocket-api</artifactId>
        <version>1.1</version>
        <scope>provided</scope>
    </dependency>
    

    服务器端代码

    package com.seliote.web.http;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 每次有 WebSocket 连接请求都会创建一个该类的实例
     */
    @ServerEndpoint(value = "/broadcast")
    public class SocketServer {
        private static final List<Session> onlinePeople = new ArrayList<>();
    
        @OnOpen
        public void onOpen(Session aSession) {
            System.out.println(System.currentTimeMillis() + ": OnOpen:::" + onlinePeople.size() + 1);
            if (!onlinePeople.contains(aSession)) {
                onlinePeople.add(aSession);
            }
        }
    
        // 该方法是用于被动接收信息的 
        @OnMessage
        public void onMessage(Session aSession, String aS) throws IOException {
            System.out.println(System.currentTimeMillis() + ": OnMessage:::" + aS);
            for (Session session : onlinePeople) {
                session.getBasicRemote().sendText(aS);
            }
        }
    
        // OnMessage 可以有多个不同签名的
        @OnMessage
        public void onMessage(Session aSession, InputStream aInputStream) {
            System.out.println(System.currentTimeMillis() + ": OnMessage");
            // TODO
        }
    
        /**
         * 每次有客户端异常关闭该方法也会调用
         * @param aSession
         * @param aCloseReason
         */
        @OnClose
        public void onClose(Session aSession, CloseReason aCloseReason) {
            System.out.println(System.currentTimeMillis() + ": OnClose:::" + aCloseReason.getReasonPhrase());
            if (onlinePeople.contains(aSession)) {
                onlinePeople.remove(aSession);
            }
        }
    
        @OnError
        public void onError(Session aSession, Throwable aThrowable) {
            System.out.println(System.currentTimeMillis() + ": OnError");
            aThrowable.printStackTrace();
        }
    }
    

    如果连接时需要携带客户端信息,那么可以在路径中加入参数,如客户端路径加入用户 Token 变为 127.0.0.1/broadcast/123456,服务器端的标注就可改为 @ServerEndpoint(value = "/broadcast/{token}") ,之后的 @OnOpen 方法中就可以有一个 @PathParam("token") String aToken 代表客户端传入的 Token

    客户端代码

    添加 maven 依赖,注意这里使用的是 tyrus-standalone-client 而非 javax.websocket-client-api 后者会报错

    <dependency>
        <groupId>org.glassfish.tyrus.bundles</groupId>
        <artifactId>tyrus-standalone-client</artifactId>
        <version>1.3.3</version>
        <scope>compile</scope>
    </dependency>
    

    客户端代码

    package com.seliote;
    
    import javax.websocket.*;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Scanner;
    
    @ClientEndpoint
    public class Demo {
        private static Session sSession;
    
        public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
            // https 协议对应使用 wss 
            URI uri = new URI("ws", "127.0.0.1:8080", "/broadcast", null, null);
            // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer
            sSession = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
            try (Scanner scanner = new Scanner(System.in)) {
                String broadcastMsg = "";
                while (true) {
                    broadcastMsg = scanner.nextLine();
                    // 通过 Session 对象主动发送信息
                    sSession.getBasicRemote().sendText(broadcastMsg);
                    //sSession.getBasicRemote().getSendStream().write(....);
                }
            }
        }
    
        @OnOpen
        public void onOpen() {
            System.out.println(System.currentTimeMillis() + ": OnOpen ");
        }
    
        // 该方法是用于被动接收信息的 
        @OnMessage
        public void onMessage(String aS) {
            System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
        }
    }
    

    在一个客户端输入信息后服务器会及时收到信息并广播给所有在线的客户端

    ------------------------------------------2019.01.09 更新

    如果需要支持相应的实体类型,WebSocket 服务器端大概长 这样,而客户端配置如下

    Maven 依赖(这里用了 JSONObject 而不是服务器端的 Jackson)

    <dependency>
        <groupId>org.glassfish.tyrus.bundles</groupId>
        <artifactId>tyrus-standalone-client</artifactId>
        <version>1.3.3</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20180813</version>
        <scope>compile</scope>
    </dependency>
    
    package com.seliote.demo;
    
    /**
     * @author seliote
     * @date 2019-01-09
     * @description WebSocket 信息实体
     */
    @SuppressWarnings({"unused", "WeakerAccess"})
    public class BroadcastMsg {
        private String mSessionId;
        private String mTimestamp;
        private String mMsg;
    
        public BroadcastMsg() {}
    
        public BroadcastMsg(String aSessionId, String aTimestamp, String aMsg) {
            mSessionId = aSessionId;
            mTimestamp = aTimestamp;
            mMsg = aMsg;
        }
    
        public String getSessionId() {
            return mSessionId;
        }
    
        public void setSessionId(String aSessionId) {
            mSessionId = aSessionId;
        }
    
        public String getTimestamp() {
            return mTimestamp;
        }
    
        public void setTimestamp(String aTimestamp) {
            mTimestamp = aTimestamp;
        }
    
        public String getMsg() {
            return mMsg;
        }
    
        public void setMsg(String aMsg) {
            mMsg = aMsg;
        }
    
        @Override
        public String toString() {
            return mSessionId + " - " + mTimestamp + " - " + mMsg;
        }
    }
    
    package com.seliote.demo;
    
    import org.json.JSONObject;
    
    import javax.websocket.Decoder;
    import javax.websocket.Encoder;
    import javax.websocket.EndpointConfig;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.nio.charset.StandardCharsets;
    
    /**
     * @author seliote
     * @date 2019-01-09
     * @description BroadcastMsg 用于 WebSocket 的编码与解码器
     */
    public class BroadcastMsgCoder implements Encoder.BinaryStream<BroadcastMsg>, Decoder.BinaryStream<BroadcastMsg> {
        
        @Override
        public void init(EndpointConfig aEndpointConfig) {
    
        }
    
        @Override
        public void destroy() {
    
        }
        
        @Override
        public void encode(BroadcastMsg aBroadcastMsg, OutputStream aOutputStream) throws IOException {
            aOutputStream.write(new JSONObject(aBroadcastMsg).toString().getBytes(StandardCharsets.UTF_8));
        }
    
        @Override
        public BroadcastMsg decode(InputStream aInputStream) throws IOException {
            byte[] buffer = new byte[1024];
            int length;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while ((length = aInputStream.read(buffer)) != -1) {
                byteArrayOutputStream.write(buffer, 0, length);
            }
            String json = new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
            JSONObject jsonObject = new JSONObject(json);
            return new BroadcastMsg(
                    jsonObject.getString("sessionId"),
                    jsonObject.getString("timestamp"),
                    jsonObject.getString("msg")
                    );
        }
    }
    
    package com.seliote.demo;
    
    import javax.websocket.ClientEndpoint;
    import javax.websocket.ContainerProvider;
    import javax.websocket.DeploymentException;
    import javax.websocket.EncodeException;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Scanner;
    
    @ClientEndpoint(
            encoders = BroadcastMsgCoder.class,
            decoders = BroadcastMsgCoder.class
    )
    public class Demo {
    
        public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
            // https 协议对应使用 wss
            URI uri = new URI("ws", "127.0.0.1:8080", "/time/1", null, null);
            // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer
            Session session = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
            try (Scanner scanner = new Scanner(System.in)) {
                //noinspection InfiniteLoopStatement
                while (true) {
                    // 通过 Session 对象主动发送信息
                    try {
                        String msg = scanner.nextLine();
                        BroadcastMsg broadcastMsg = new BroadcastMsg(
                                session.getId(),
                                System.currentTimeMillis() + "",
                                msg
                        );
                        session.getBasicRemote().sendObject(broadcastMsg);
                    } catch (EncodeException exp) {
                        exp.printStackTrace();
                    }
                    //sSession.getBasicRemote().getSendStream().write(....);
                }
            }
        }
    
        @OnOpen
        public void onOpen() {
            System.out.println(System.currentTimeMillis() + ": OnOpen ");
        }
    
        @OnMessage
        public void onMessage(String aS) {
            System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
        }
    
        @OnMessage
        public void onMessage(BroadcastMsg aBroadcastMsg) {
            System.out.println(aBroadcastMsg);
        }
    }
    
  • 相关阅读:
    程序员的成长阶梯和级别[转]
    【转】教你如何迅速秒杀99%的海量数据处理面试题
    【转】探索C#之布隆过滤器(Bloom filter)
    基于.NET平台常用的框架整理 [转]
    使用 Async 和 Await 的异步编程(C# 和 Visual Basic)[msdn.microsoft.com]
    使用异步编程
    Node.js Web框架收集
    js闭包的定义与应用
    null 与 undefined 区别
    git 基本操作—笔记
  • 原文地址:https://www.cnblogs.com/seliote/p/9761571.html
Copyright © 2011-2022 走看看