在实际的项目中应该如何使用netty去通信呢?
一般来说,会有以下三种情况,
1长连接 也就是服务器和客户端的通道一直不关闭,如果服务器性能非常好,并且在客户端数量不是很多的情况下,可以选择使用这种方式。
2短连接 一次性批量提交数据,我们可能会吧我们的数据保存在数据库中,比如1个小时提交提交一次。这种做法的弊端是不能够实时传输,实时性要求不高的情况可以推荐使用
3一种特殊的长连接 在特定时间的内,如果服务器和客服端没有通讯 就断开连接 下次当客户端需要再次发送数据的时候,再次连接 。但是这种方式我们需要考虑?
1连接超时以后我们如何断开连接? 而在需要发送数据的时候我们怎么再次连接?
2服务器宕机了怎么办?
1连接超时以后我们如何断开连接?(在netty中可以通过ReadTimeoutHandler类实现超过多长时间断开连接)
而在需要发送数据的时候我们怎么再次连接?
(我们可以封装一个方法 调用该方法就可以获得连接)
2服务器宕机了怎么办?
生产环境我们一般会部署netty集群,结合zookeeper完成。
下面我用代码演示一下
Server端口变化不大 ,主要是客户端
Server端代码
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception { // EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap bootStrap = new ServerBootstrap(); bootStrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf delimiters = Unpooled.copiedBuffer("$".getBytes()) ; // sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiters )); // sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5));//超时时间设置 sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture f = bootStrap.bind(8888).sync(); ChannelFuture f1 = bootStrap.bind(9999).sync(); //服务器一直不关闭 f.channel().closeFuture().sync(); f1.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server " + msg); Data data = new Data(); data.setId(1); data.setName("server replay"); // 写操作完成以后就断开连接 ctx.writeAndFlush(data).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
序列化工具代码 这里使用jboss的Marshalling
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; public final class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
================
一个简单的实体类 (Data)
import java.io.Serializable; public class Data implements Serializable{ /** * */ private static final long serialVersionUID = -963211196207628767L; private Integer id; private String name; @Override public String toString() { return " [id=" + id + ", name=" + name + "]"; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
==================================================================================================
以下是客户端代码
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; public class Client { EventLoopGroup workGroup = null; Bootstrap bootstrap = null; ChannelFuture channelFuture = null; private static class ClientFactory { private static Client instance = new Client(); } public static Client getClientInstance() { return ClientFactory.instance; } private Client() { // 数理化相关对象 workGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf delimiters = Unpooled.copiedBuffer("$".getBytes()) ; // sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, // delimiters )); // sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect() { try { this.channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.channelFuture == null) { this.connect(); } if (!this.channelFuture.channel().isActive()) { this.connect(); } return this.channelFuture; } public static void main(String[] args) throws Exception { final Client client = getClientInstance(); client.connect(); ChannelFuture cf = client.getChannelFuture(); Data data = new Data(); data.setId(1); data.setName("first"); cf.channel().writeAndFlush(data); Thread.sleep(6000); //因为设置的5秒超时 这里通过一个子线程 模拟再次连接 new Thread(new Runnable() { public void run() { System.out.println("进入子线程..."); ChannelFuture cf = client.getChannelFuture(); Data data = new Data(); data.setId(2); data.setName("second"); cf.channel().writeAndFlush(data); try { cf.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("子线程结束!!"); } }).start(); cf.channel().closeFuture().sync(); System.out.println("主线程结束!!"); } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { /** * 通道刚刚建立的时候 发送认证消息 (认证通过 建立连接 失败 关闭连接) */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub // ctx.channel().writeAndFlush("发送认证消息"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { //do something msg System.out.println("Client: " + (Data)msg); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
一下是运行结果
服务器端
客户端