1,客户端启动类
package test3; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; public final class WebSocketClient { static final String URL = System.getProperty("url", "ws://127.0.0.1:5688/ws"); public void connect(String URL, ChannelGroup clients, ChannelHandlerContext ctx) throws Exception { URI uri = new URI(URL); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { System.err.println("Only WS(S) is supported."); return; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { final WebSocketClientHandler handler = new WebSocketClientHandler(ctx, WebSocketClientHandshakerFactory .newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler); } }); Channel ch = b.connect(uri.getHost(), port).sync().channel(); handler.handshakeFuture().sync(); /* * String msg = "222"; WebSocketFrame frame = new * TextWebSocketFrame(msg); ch.writeAndFlush(frame); */ // ch.writeAndFlush("222"); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String msg = console.readLine(); if (msg == null) { break; } else if ("bye".equals(msg.toLowerCase())) { ch.writeAndFlush(new CloseWebSocketFrame()); ch.closeFuture().sync(); break; } else if ("ping".equals(msg.toLowerCase())) { WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 })); ch.writeAndFlush(frame); } else { WebSocketFrame frame = new TextWebSocketFrame(msg); ch.writeAndFlush(frame); } } } finally { group.shutdownGracefully(); } } }
(2)
package test3; import java.time.LocalDateTime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.GlobalEventExecutor; import test.SocketHandlerInitializer; public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private Channel outboundChannel; private ChannelHandlerContext channel; private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; public WebSocketClientHandler(ChannelHandlerContext channel, WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; this.channel = channel; } public ChannelFuture handshakeFuture() { return handshakeFuture; } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); handshakeFuture = ctx.newPromise(); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); handshaker.handshake(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("WebSocket Client 链接失败!"); } @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead0"); Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { try { handshaker.finishHandshake(ch, (FullHttpResponse) msg); System.out.println("WebSocket Client connected!"); handshakeFuture.setSuccess(); } catch (WebSocketHandshakeException e) { System.out.println("WebSocket Client failed to connect"); handshakeFuture.setFailure(e); } return; } if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; // resposnse(ctx, frame); channel.writeAndFlush(textFrame.text()); System.out.println("WebSocket Client received message: " + textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { System.out.println("WebSocket Client received pong"); } else if (frame instanceof CloseWebSocketFrame) { System.out.println("WebSocket Client received closing"); ch.close(); } } private void response(ChannelHandlerContext ctx, final WebSocketFrame msg) { // 获取客户端传输过来的消息 String content = msg.toString(); clients.writeAndFlush(new TextWebSocketFrame("[服务器收到相应]" + LocalDateTime.now() + "接受萨达到消息, 消息为:" + content)); final Channel inboundChannel = ctx.channel(); Bootstrap b = new Bootstrap(); b.group(inboundChannel.eventLoop()).channel(ctx.channel().getClass()) .handler(new SocketHandlerInitializer(inboundChannel)); ChannelFuture f = b.connect("127.0.0.1", 5688); outboundChannel = f.channel(); msg.retain(); ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("isSuccess:true"); outboundChannel.writeAndFlush("2222222222"); } else { System.out.println("isSuccess:false"); inboundChannel.close(); } } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } ctx.close(); } }
(3)测试用的服务端代码
package com.googosoft.websocket; import java.io.IOException; import javax.websocket.DecodeException; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; @ServerEndpoint("/echo") public class EchoServer { @OnOpen public void initSession(Session session) { } @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { System.out.println("Received: " + message); session.getBasicRemote().sendText("This is the first server message"); } @OnError public void handleError(Throwable thw) { thw.printStackTrace(); if (thw instanceof DecodeException) { System.out.println("Error decoding incoming message: " + ((DecodeException)thw).getText()); } else { System.out.println("Server WebSocket error: " + thw.getMessage()); } } @OnClose public void processClose(Session session){ } }
在测试的时候,服务端的代码我把它放在了一个web项目里面充当服务端
客户端就用的普通的Java项目,在main方法里面建立链接,实现通信