zoukankan      html  css  js  c++  java
  • JavaNetty(一)

    Netty 是由 JBOSS 提供的一个 java 开源框架。Netty 提供异步的、事件驱动的网络应用 程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你 快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty 相当 简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 socket 服务开发。 “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实 现经验,这些协议包括 FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项 目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能, 稳定性和伸缩性

    在设计上:针对多种传输类型的统一接口 - 阻塞和非阻塞;简单但更强大的线程模型; 真正的无连接的数据报套接字支持;链接逻辑支持复用;

    在性能上:比核心 Java API 更好的吞吐量,较低的延时;资源消耗更少,这个得益于 共享池和重用;减少内存拷贝 在健壮性上:消除由于慢,快,或重载连接产生的 OutOfMemoryError;消除经常发现 在 NIO 在高速网络中的应用中的不公平的读/写比 在安全上:完整的 SSL / TLS 和 StartTLS 的支持 

    Netty 的官网是:http://netty.io

    有 三 方 提 供 的 中 文 翻 译 Netty 用 户 手 册 ( 官 网 提 供 源 信 息 ): http://ifeve.com/netty5-user-guide/ 

    Netty 架构 

     代码:

    /**
     * 1. 双线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. 绑定服务监听端口并启动服务
     */
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server4HelloWorld {
    
    	// 监听线程组,监听客户端请求
    	private EventLoopGroup acceptorGroup =null;
    	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    	private EventLoopGroup clientGroup =null;
    	// 服务启动相关配置信息
    	private ServerBootstrap  bootstrap =null;
    	
    	public Server4HelloWorld(){
    		init();
    	}
    	private void init(){
    		// 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
    		acceptorGroup=new NioEventLoopGroup();
    		clientGroup=new  NioEventLoopGroup();
    		// 初始化服务的配置
    		bootstrap=new ServerBootstrap();
    		// 绑定线程组
    		bootstrap.group(acceptorGroup,clientGroup);
    		// 设定通讯模式为NIO, 同步非阻塞
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 设定缓冲区大小, 缓存区的单位是字节。
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    		.option(ChannelOption.SO_RCVBUF, 16*1024)
    		.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	/**
    	 * 监听处理逻辑。
    	 * @param port 监听端口。
    	 * @param acceptorHandlers 处理器, 如何处理客户端请求。
    	 * @return
    	 * @throws InterruptedException
    	 */
    	public ChannelFuture doAccept(int port,final ChannelHandler... acceptorHandlers)throws InterruptedException{
    		/*
    		 * childHandler是服务的Bootstrap独有的方法。是用于提供处理对象的。
    		 * 可以一次性增加若干个处理逻辑。是类似责任链模式的处理方式。
    		 * 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A-》B顺序依次处理。
    		 * 
    		 * ChannelInitializer - 用于提供处理器的一个模型对象。
    		 *  其中定义了一个方法,initChannel方法。
    		 *   方法是用于初始化处理逻辑责任链条的。
    		 *   可以保证服务端的Bootstrap只初始化一次处理器,尽量提供处理逻辑的重用。
    		 *   避免反复的创建处理器对象。节约资源开销。
    		 */
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		        // bind方法 - 绑定监听端口的。ServerBootstrap可以绑定多个监听端口。 多次调用bind方法即可
    				// sync - 开始监听逻辑。 返回一个ChannelFuture。 返回结果代表的是监听成功后的一个对应的未来结果
    				// 可以使用ChannelFuture实现后续的服务器和客户端的交互。
    				ChannelFuture future = bootstrap.bind(port).sync();
    				return future;
    	}
    	
    	/**
    	 * shutdownGracefully - 方法是一个安全关闭的方法。可以保证不放弃任何一个已接收的客户端请求。
    	 */
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	public static void main(String[] args) {
    		ChannelFuture future = null;
    		Server4HelloWorld server = null;
    		try{
    			server = new Server4HelloWorld();
    			future = server.doAccept(9999,new Server4HelloWorldHandler());
    			System.out.println("server started.");
    			
    			// 关闭连接的。
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    }
    

      

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @Sharable注解 - 
     *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
     *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
     *  如果handler是一个Sharable的,一定避免定义可写的实例变量。
     *  bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(new XxxHandler());
    			}
    		});
     */
    
    @Sharable
    public class Server4HelloWorldHandler extends ChannelHandlerAdapter{
    	/**
    	 * 业务处理逻辑
    	 * 用于处理读取数据请求的逻辑。
    	 * ctx - 上下文对象。其中包含于客户端建立连接的所有资源。 如: 对应的Channel
    	 * msg - 读取到的数据。 默认类型是ByteBuf,是Netty自定义的。是对ByteBuffer的封装。 不需要考虑复位问题。
    	 */
    	@Override
    	public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
    		// 获取读取的数据, 是一个缓冲。
    		ByteBuf readBuffer = (ByteBuf) msg;
    		// 创建一个字节数组,用于保存缓存中的数据。
    		byte[] tempDatas = new byte[readBuffer.readableBytes()];
    		// 将缓存中的数据读取到字节数组中。
    		readBuffer.readBytes(tempDatas);
    		String message = new String(tempDatas, "UTF-8");
    		System.out.println("from client : " + message);
    		if("exit".equals(message)){
    			ctx.close();
    			return;
    		}
    		String line = "server message to client!";
    		// 写操作自动释放缓存,避免内存溢出问题。
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		// 注意,如果调用的是write方法。不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行。
    		// ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		// ctx.flush();
    	}
    	/**
    	 * 异常处理逻辑, 当客户端异常退出的时候,也会运行。
    	 * ChannelHandlerContext关闭,也代表当前与客户端连接的资源关闭。
    	 */
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    }
    

      

    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * 1. 单线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. connect连接服务,并发起请求
     */
    /**
     * 因为客户端是请求的发起者,不需要监听。
     * 只需要定义唯一的一个线程组即可。
     */
    public class Client4HelloWorld {
    	// 处理请求和处理服务端响应的线程组
    	private  EventLoopGroup group=null;
    	// 客户端启动相关配置信息
    	private Bootstrap  bootstrap =null;
    	
    	public Client4HelloWorld(){
    		init();
    	}
    	private void init(){
    		group=new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 绑定线程组
    		bootstrap.group(group);
    		// 设定通讯模式为NIO
    		bootstrap.channel(NioSocketChannel.class);
    		
    	}
    	
    	public ChannelFuture doRequest(String host,int port,final ChannelHandler... handlers) throws InterruptedException{
    		/*
    		 * 客户端的Bootstrap没有childHandler方法。只有handler方法。
    		 * 方法含义等同ServerBootstrap中的childHandler
    		 * 在客户端必须绑定处理器,也就是必须调用handler方法。
    		 * 服务器必须绑定处理器,必须调用childHandler方法。
    		 */
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception{
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		// 建立连接。
    		ChannelFuture  future =this.bootstrap.connect(host,port).sync();
    		return future;
    	}
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4HelloWorld client = null;
    		ChannelFuture future = null;
    		try {
    			client = new Client4HelloWorld();
    			future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server (enter 'exit' for close client) > ");
    				String line = s.nextLine();
    				if("exit".equals(line)){
    					// addListener - 增加监听,当某条件满足的时候,触发监听器。
    					// ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接。
    					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
    						.addListener(ChannelFutureListener.CLOSE);
    					break;
    				}
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		} catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    
    }
    

      

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4HelloWorldHandler extends ChannelHandlerAdapter {
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			ByteBuf readBuffer = (ByteBuf) msg;
    			byte[] tempDatas = new byte[readBuffer.readableBytes()];
    			readBuffer.readBytes(tempDatas);
    			System.out.println("from server : " + new String(tempDatas, "UTF-8"));
    		}finally{
    			// 用于释放缓存。避免内存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    	
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    	
    	/*@Override // 断开连接时执行
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelInactive method run...");
    	}
    
    	@Override // 连接通道建立成功时执行
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelActive method run...");
    	}
    
    	@Override // 每次读取完成时执行
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("channelReadComplete method run...");
    	}*/
    
    }
    

      线程模型

     Netty 中支持单线程模型,多线程模型,主从多线程模型。 

     单线程模型 

    在 ServerBootstrap 调用方法 group 的时候,传递的参数是同一个线程组,且在构造线程 组的时候,构造参数为 1,这种开发方式,就是一个单线程模型。 

    个人机开发测试使用。不推荐

     多线程模型 

    在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听 的 acceptor 线程组,线程数为 1,也就是构造参数为 1。负责处理客户端任务的线程组,线 程数大于 1,也就是构造参数大于 1。这种开发方式,就是多线程模型。 长连接,且客户端数量较少,连接持续时间较长情况下使用。如:企业内部交流应用。

    主从多线程模型
    在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听 的 acceptor 线程组,线程数大于 1,也就是构造参数大于 1。负责处理客户端任务的线程组, 线程数大于 1,也就是构造参数大于 1。这种开发方式,就是主从多线程模型。 长连接,客户端数量相对较多,连接持续时间比较长的情况下使用。如:对外提供服务 的相册服务器。

  • 相关阅读:
    C# 设计模式
    FutureTask、Fork/Join、 BlockingQueue
    线程的几种创建方式
    行锁、表锁、乐观锁、悲观锁
    J.U.C之AQS
    同步容器并发容器
    线程不安全类
    线程封闭
    不可变对象
    安全发布对象—单例模式
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/12250827.html
Copyright © 2011-2022 走看看