zoukankan      html  css  js  c++  java
  • netty2 案例:数据通信

    在实际的项目中应该如何使用netty去通信呢?

    一般来说,会有以下三种情况,

    1长连接 也就是服务器和客户端的通道一直不关闭,如果服务器性能非常好,并且在客户端数量不是很多的情况下,可以选择使用这种方式。

    2短连接  一次性批量提交数据,我们可能会吧我们的数据保存在数据库中,比如1个小时提交提交一次。这种做法的弊端是不能够实时传输,实时性要求不高的情况可以推荐使用

    3一种特殊的长连接 在特定时间的内,如果服务器和客服端没有通讯 就断开连接 下次当客户端需要再次发送数据的时候,再次连接 。但是这种方式我们需要考虑?

     1连接超时以后我们如何断开连接? 而在需要发送数据的时候我们怎么再次连接?

     2服务器宕机了怎么办?

     1连接超时以后我们如何断开连接?(在netty中可以通过ReadTimeoutHandler类实现超过多长时间断开连接)

     而在需要发送数据的时候我们怎么再次连接?

     (我们可以封装一个方法 调用该方法就可以获得连接)

     2服务器宕机了怎么办?

     生产环境我们一般会部署netty集群,结合zookeeper完成。

     下面我用代码演示一下

      Server端口变化不大 ,主要是客户端

     Server端代码

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Server {
    	public static void main(String[] args) throws Exception {
    		//
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    
    		EventLoopGroup workGroup = new NioEventLoopGroup();
    
    		ServerBootstrap bootStrap = new ServerBootstrap();
    
    		bootStrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
    				.childHandler(new ChannelInitializer<SocketChannel>() {
    
    					@Override
    					protected void initChannel(SocketChannel sc) throws Exception {
    //						ByteBuf delimiters = Unpooled.copiedBuffer("$".getBytes()) ;
    //						sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiters ));
    //						sc.pipeline().addLast(new StringDecoder());
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());  
    						sc.pipeline().addLast(new ReadTimeoutHandler(5));//超时时间设置
    						sc.pipeline().addLast(new ServerHandler());
    					}
    				});
    		ChannelFuture f = bootStrap.bind(8888).sync();
    		ChannelFuture f1 = bootStrap.bind(9999).sync();
            //服务器一直不关闭 
    		f.channel().closeFuture().sync();
    		f1.channel().closeFuture().sync();
    		bossGroup.shutdownGracefully();
    		workGroup.shutdownGracefully();
    	}
    
    }
    

      

    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	
    			System.out.println("server " + msg);
    			Data data = new Data();
    			data.setId(1);
    			data.setName("server replay");
    			// 写操作完成以后就断开连接
    			ctx.writeAndFlush(data).addListener(ChannelFutureListener.CLOSE);
    
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

      序列化工具代码 这里使用jboss的Marshalling

      

    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    
    public final class MarshallingCodeCFactory {
    
    	public static MarshallingDecoder buildMarshallingDecoder() {
        	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		//创建了MarshallingConfiguration对象,配置了版本号为5 
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		//根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
    		return decoder;
        }
    
      
        public static MarshallingEncoder buildMarshallingEncoder() {
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
    		MarshallingEncoder encoder = new MarshallingEncoder(provider);
    		return encoder;
        }
    }
    

      ================

          一个简单的实体类 (Data)

         

    import java.io.Serializable;
    
    public class Data implements Serializable{
    	/**
    	 * 
    	 */
    	private static final long serialVersionUID = -963211196207628767L;
    	private Integer id;
    	private String name;
    	
    	@Override
    	public String toString() {
    		return " [id=" + id + ", name=" + name + "]";
    	}
    
    	public Integer getId() {
    		return id;
    	}
    
    	public void setId(Integer id) {
    		this.id = id;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    }
    

      ==================================================================================================

       以下是客户端代码

       

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Client {
    	EventLoopGroup workGroup = null;
    	Bootstrap bootstrap = null;
    	ChannelFuture channelFuture = null;
    
    	private static class ClientFactory {
    		private static Client instance = new Client();
    	}
    
    	public static Client getClientInstance() {
    		return ClientFactory.instance;
    	}
    
    	private Client() {
    		// 数理化相关对象
    		workGroup = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		bootstrap.group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				// ByteBuf delimiters = Unpooled.copiedBuffer("$".getBytes()) ;
    				// sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
    				// delimiters ));
    				// sc.pipeline().addLast(new StringDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				sc.pipeline().addLast(new ReadTimeoutHandler(5));
    				sc.pipeline().addLast(new ClientHandler());
    			}
    		});
    	}
    
    	public void connect() {
    		try {
    			this.channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    	public ChannelFuture getChannelFuture() {
    		if (this.channelFuture == null) {
    			this.connect();
    		}
    		if (!this.channelFuture.channel().isActive()) {
    			this.connect();
    		}
    		return this.channelFuture;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		final Client client = getClientInstance();
    		client.connect();
    		ChannelFuture cf = client.getChannelFuture();
    		Data data = new Data();
    		data.setId(1);
    		data.setName("first");
    		cf.channel().writeAndFlush(data);
    		Thread.sleep(6000);
    		//因为设置的5秒超时 这里通过一个子线程 模拟再次连接
    		new Thread(new Runnable() {
    			public void run() {
    				System.out.println("进入子线程...");
    				ChannelFuture cf = client.getChannelFuture();
    				Data data = new Data();
    				data.setId(2);
    				data.setName("second");
    				cf.channel().writeAndFlush(data);
    				try {
    					cf.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    				System.out.println("子线程结束!!");
    			}
    		}).start();
    		cf.channel().closeFuture().sync();
    
    		System.out.println("主线程结束!!");
    	}
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    	
    	/**
    	 * 通道刚刚建立的时候 发送认证消息 (认证通过 建立连接 失败 关闭连接)
    	 */
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		// TODO Auto-generated method stub
    	//	ctx.channel().writeAndFlush("发送认证消息");
    		super.channelActive(ctx);
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try {
    			//do something msg
    			System.out.println("Client: " + (Data)msg);
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		ctx.close();
    	}
    }
    

      一下是运行结果

       服务器端

        

      客户端 

       

       

  • 相关阅读:
    jQuery Validate 验证成功时的提示信息
    MySQL定时任务实现方法
    tp5获取器的用法。
    使用layui异步请求上传图片在tp5.1环境下出现“请对上传接口返回json”的错误的解决方法
    微信小程序底部菜单栏的使用方法
    接口测试中的接口到底是什么?
    【web自动化测试】requests-html 这个解析库,能让你更轻松的获取网页内容
    一个软件测试小白的进阶之路
    Python进阶:@property 动态属性
    百度网盘限速怎么办?
  • 原文地址:https://www.cnblogs.com/javabigdata/p/7454578.html
Copyright © 2011-2022 走看看