zoukankan      html  css  js  c++  java
  • netty实现websocket客户端(附:测试服务端代码)

    1,客户端启动类

    package test3;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketVersion;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    public final class WebSocketClient {
    
        static final String URL = System.getProperty("url", "ws://127.0.0.1:5688/ws");
    
        public void connect(String URL, ChannelGroup clients, ChannelHandlerContext ctx) throws Exception {
            URI uri = new URI(URL);
            String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
            final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
            final int port;
            if (uri.getPort() == -1) {
                if ("ws".equalsIgnoreCase(scheme)) {
                    port = 80;
                } else if ("wss".equalsIgnoreCase(scheme)) {
                    port = 443;
                } else {
                    port = -1;
                }
            } else {
                port = uri.getPort();
            }
    
            if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                System.err.println("Only WS(S) is supported.");
                return;
            }
    
            final boolean ssl = "wss".equalsIgnoreCase(scheme);
            final SslContext sslCtx;
            if (ssl) {
                sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                final WebSocketClientHandler handler = new WebSocketClientHandler(ctx, WebSocketClientHandshakerFactory
                        .newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
    
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline();
                        if (sslCtx != null) {
                            p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                        }
                        p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
                    }
                });
    
                Channel ch = b.connect(uri.getHost(), port).sync().channel();
                handler.handshakeFuture().sync();
                /*
                 * String msg = "222"; WebSocketFrame frame = new
                 * TextWebSocketFrame(msg); ch.writeAndFlush(frame);
                 */
                // ch.writeAndFlush("222");
                BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    String msg = console.readLine();
                    if (msg == null) {
                        break;
                    } else if ("bye".equals(msg.toLowerCase())) {
                        ch.writeAndFlush(new CloseWebSocketFrame());
                        ch.closeFuture().sync();
                        break;
                    } else if ("ping".equals(msg.toLowerCase())) {
                        WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
                        ch.writeAndFlush(frame);
                    } else {
                        WebSocketFrame frame = new TextWebSocketFrame(msg);
                        ch.writeAndFlush(frame);
                    }
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    }

    (2)

    package test3;
    
    import java.time.LocalDateTime;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelPromise;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import test.SocketHandlerInitializer;
    
    public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    
        private Channel outboundChannel;
        private ChannelHandlerContext channel;
        private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        private final WebSocketClientHandshaker handshaker;
        private ChannelPromise handshakeFuture;
        
        
        public WebSocketClientHandler(ChannelHandlerContext channel, WebSocketClientHandshaker handshaker) {
            this.handshaker = handshaker;
            this.channel = channel;
        }
    
        public ChannelFuture handshakeFuture() {
            return handshakeFuture;
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("handlerAdded");
            handshakeFuture = ctx.newPromise();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channelActive");
            handshaker.handshake(ctx.channel());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("WebSocket Client 链接失败!");
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelRead0");
            Channel ch = ctx.channel();
            if (!handshaker.isHandshakeComplete()) {
                try {
                    handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                    System.out.println("WebSocket Client connected!");
                    handshakeFuture.setSuccess();
                } catch (WebSocketHandshakeException e) {
                    System.out.println("WebSocket Client failed to connect");
                    handshakeFuture.setFailure(e);
                    
                }
                return;
            }
    
            if (msg instanceof FullHttpResponse) {
                FullHttpResponse response = (FullHttpResponse) msg;
                throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus()
                        + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
            }
    
            WebSocketFrame frame = (WebSocketFrame) msg;
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
                // resposnse(ctx, frame);
                
                channel.writeAndFlush(textFrame.text());
                System.out.println("WebSocket Client received message: " + textFrame.text());
            } else if (frame instanceof PongWebSocketFrame) {
                System.out.println("WebSocket Client received pong");
            } else if (frame instanceof CloseWebSocketFrame) {
                System.out.println("WebSocket Client received closing");
                ch.close();
            }
        }
    
        private void response(ChannelHandlerContext ctx, final WebSocketFrame msg) {
            // 获取客户端传输过来的消息
            String content = msg.toString();
            clients.writeAndFlush(new TextWebSocketFrame("[服务器收到相应]" + LocalDateTime.now() + "接受萨达到消息, 消息为:" + content));
    
            final Channel inboundChannel = ctx.channel();
    
            Bootstrap b = new Bootstrap();
            b.group(inboundChannel.eventLoop()).channel(ctx.channel().getClass())
                    .handler(new SocketHandlerInitializer(inboundChannel));
    
            ChannelFuture f = b.connect("127.0.0.1", 5688);
            outboundChannel = f.channel();
            msg.retain();
    
            ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("isSuccess:true");
                        outboundChannel.writeAndFlush("2222222222");
                    } else {
                        System.out.println("isSuccess:false");
                        inboundChannel.close();
                    }
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            if (!handshakeFuture.isDone()) {
                handshakeFuture.setFailure(cause);
            }
            ctx.close();
        }
    
    }

    (3)测试用的服务端代码

    package com.googosoft.websocket;
    
    import java.io.IOException;
    import javax.websocket.DecodeException;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.ServerEndpoint;
    
    @ServerEndpoint("/echo")
    public class EchoServer {
        
        @OnOpen
        public void initSession(Session session) {
        }
        
        @OnMessage
        public void onMessage(String message, Session session) 
            throws IOException, InterruptedException {
            System.out.println("Received: " + message);
            session.getBasicRemote().sendText("This is the first server message");
        }
        
        @OnError  
        public void handleError(Throwable thw) {
            thw.printStackTrace();
            if (thw instanceof DecodeException) {
               System.out.println("Error decoding incoming message: " + ((DecodeException)thw).getText());
           } else {
               System.out.println("Server WebSocket error: " + thw.getMessage());
           }
        }
        
        @OnClose
        public void processClose(Session session){
            
        }
    }

    在测试的时候,服务端的代码我把它放在了一个web项目里面充当服务端 

    客户端就用的普通的Java项目,在main方法里面建立链接,实现通信

  • 相关阅读:
    js中return、return true、return false的区别
    flask,中间键
    flask,自定义中间件
    flask,闪现
    flask获取前端发送过来的数据
    flask中的如何将后端写前端的代码设置session
    flask中的正则匹配
    flask中的四剑客 及其他参数
    支付宝接口
    flasks框架简介
  • 原文地址:https://www.cnblogs.com/excellencesy/p/11246992.html
Copyright © 2011-2022 走看看