zoukankan      html  css  js  c++  java
  • JavaNetty

     

    Netty的简单使用:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client {
    
    	public static void main(String[] args) throws Exception{
    		
    		EventLoopGroup group = new NioEventLoopGroup();
    		Bootstrap b = new Bootstrap();
    		b.group(group)
    		.channel(NioSocketChannel.class)
    		.handler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				sc.pipeline().addLast(new ClientHandler());
    			}
    		});
    		
    		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
    		//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
    		//发送消息
    		Thread.sleep(1000);
    
    		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
    		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
    		//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
    		Thread.sleep(2000);
    		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
    		//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
    		
    		cf1.channel().closeFuture().sync();
    		//cf2.channel().closeFuture().sync();
    		group.shutdownGracefully();
    	}
    }
    

      

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server {
    
    	public static void main(String[] args) throws Exception {
    		//1 创建线两个程组 
    		//一个是用于处理服务器端接收客户端连接的
    		//一个是进行网络通信的(网络读写的)
    		EventLoopGroup pGroup = new NioEventLoopGroup();
    		EventLoopGroup cGroup = new NioEventLoopGroup();
    		
    		//2 创建辅助工具类,用于服务器通道的一系列配置
    		ServerBootstrap b = new ServerBootstrap();
    		b.group(pGroup, cGroup)		//绑定俩个线程组
    		.channel(NioServerSocketChannel.class)		//指定NIO的模式
    		.option(ChannelOption.SO_BACKLOG, 1024)		//设置tcp缓冲区
    		.option(ChannelOption.SO_SNDBUF, 32*1024)	//设置发送缓冲大小
    		.option(ChannelOption.SO_RCVBUF, 32*1024)	//这是接收缓冲大小
    		.option(ChannelOption.SO_KEEPALIVE, true)	//保持连接
    		.childHandler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				//3 在这里配置具体数据接收方法的处理
    				sc.pipeline().addLast(new ServerHandler());
    			}
    		});
    		
    		//4 进行绑定 
    		ChannelFuture cf1 = b.bind(8765).sync();
    		//ChannelFuture cf2 = b.bind(8764).sync();
    		//5 等待关闭
    		cf1.channel().closeFuture().sync();
    		//cf2.channel().closeFuture().sync();
    		pGroup.shutdownGracefully();
    		cGroup.shutdownGracefully();
    	}
    }
    

      

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("server channel active... ");
    	}
    
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg)
    			throws Exception {
    			ByteBuf buf = (ByteBuf) msg;
    			byte[] req = new byte[buf.readableBytes()];
    			buf.readBytes(req);
    			String body = new String(req, "utf-8");
    			System.out.println("Server :" + body );
    			String response = "进行返回给客户端的响应:" + body ;
    			ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    			//.addListener(ChannelFutureListener.CLOSE);
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx)
    			throws Exception {
    		System.out.println("读完了");
    		ctx.flush();
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
    			throws Exception {
    		ctx.close();
    	}
    }
    

      

     aio代码:

    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Server {
    	//线程池
    	private ExecutorService executorService;
    	//线程组
    	private AsynchronousChannelGroup threadGroup;
    	//服务器通道
    	public AsynchronousServerSocketChannel assc;
    	
    	public Server(int port){
    		try {
    			//创建一个缓存池
    			executorService = Executors.newCachedThreadPool();
    			//创建线程组
    			threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
    			//创建服务器通道
    			assc = AsynchronousServerSocketChannel.open(threadGroup);
    			//进行绑定
    			assc.bind(new InetSocketAddress(port));
    			
    			System.out.println("server start , port : " + port);
    			//进行阻塞
    			assc.accept(this, new ServerCompletionHandler());
    			//一直阻塞 不让服务器停止
    			Thread.sleep(Integer.MAX_VALUE);
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String[] args) {
    		Server server = new Server(8765);
    	}
    	
    }
    

      

    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.ExecutionException;
    
    public class Client implements Runnable{
    
    	private AsynchronousSocketChannel asc ;
    	
    	public Client() throws Exception {
    		asc = AsynchronousSocketChannel.open();
    	}
    	
    	public void connect(){
    		asc.connect(new InetSocketAddress("127.0.0.1", 8765));
    	}
    	
    	public void write(String request){
    		try {
    			asc.write(ByteBuffer.wrap(request.getBytes())).get();
    			read();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	private void read() {
    		ByteBuffer buf = ByteBuffer.allocate(1024);
    		try {
    			asc.read(buf).get();
    			buf.flip();
    			byte[] respByte = new byte[buf.remaining()];
    			buf.get(respByte);
    			System.out.println(new String(respByte,"utf-8").trim());
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			e.printStackTrace();
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	@Override
    	public void run() {
    		while(true){
    			
    		}
    	}
    	
    	public static void main(String[] args) throws Exception {
    		Client c1 = new Client();
    		c1.connect();
    		
    		Client c2 = new Client();
    		c2.connect();
    		
    		Client c3 = new Client();
    		c3.connect();
    		
    		new Thread(c1, "c1").start();
    		new Thread(c2, "c2").start();
    		new Thread(c3, "c3").start();
    		
    		Thread.sleep(1000);
    		
    		c1.write("c1 aaa");
    		c2.write("c2 bbbb");
    		c3.write("c3 ccccc");
    	}
    }
    

      

    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    
    public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
    
    	@Override
    	public void completed(AsynchronousSocketChannel asc, Server attachment) {
    		//当有下一个客户端接入的时候 直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞
    		attachment.assc.accept(attachment, this);
    		read(asc);
    	}
    
    	private void read(final AsynchronousSocketChannel asc) {
    		//读取数据
    		ByteBuffer buf = ByteBuffer.allocate(1024);
    		asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
    			@Override
    			public void completed(Integer resultSize, ByteBuffer attachment) {
    				//进行读取之后,重置标识位
    				attachment.flip();
    				//获得读取的字节数
    				System.out.println("Server -> " + "收到客户端的数据长度为:" + resultSize);
    				//获取读取的数据
    				String resultData = new String(attachment.array()).trim();
    				System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
    				String response = "服务器响应, 收到了客户端发来的数据: " + resultData;
    				write(asc, response);
    			}
    			@Override
    			public void failed(Throwable exc, ByteBuffer attachment) {
    				exc.printStackTrace();
    			}
    		});
    	}
    	
    	private void write(AsynchronousSocketChannel asc, String response) {
    		try {
    			ByteBuffer buf = ByteBuffer.allocate(1024);
    			buf.put(response.getBytes());
    			buf.flip();
    			asc.write(buf).get();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	@Override
    	public void failed(Throwable exc, Server attachment) {
    		exc.printStackTrace();
    	}
    
    }
    

      bio:

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    public class Client {
    
    	final static String ADDRESS = "127.0.0.1";
    	final static int PORT = 8765;
    	
    	public static void main(String[] args) {
    		
    		Socket socket = null;
    		BufferedReader in = null;
    		PrintWriter out = null;
    		
    		try {
    			socket = new Socket(ADDRESS, PORT);
    			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    			out = new PrintWriter(socket.getOutputStream(), true);
    			
    			//向服务器端发送数据
    			out.println("接收到客户端的请求数据...");
    			String response = in.readLine();
    			System.out.println("Client: " + response);
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if(in != null){
    				try {
    					in.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			if(out != null){
    				try {
    					out.close();
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    			if(socket != null){
    				try {
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			socket = null;
    		}
    	}
    }
    

     

    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    
    public class Server {
    
    	final static int PROT = 8765;
    	
    	public static void main(String[] args) {
    		
    		ServerSocket server = null;
    		try {
    			server = new ServerSocket(PROT);
    			System.out.println(" server start .. ");
    			//进行阻塞
    			Socket socket = server.accept();
    			//新建一个线程执行客户端的任务
    			new Thread(new ServerHandler(socket)).start();
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if(server != null){
    				try {
    					server.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			server = null;
    		}
    
    	}
    }
    

      

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    public class ServerHandler implements Runnable{
    
    	private Socket socket ;
    	
    	public ServerHandler(Socket socket){
    		this.socket = socket;
    	}
    	
    	@Override
    	public void run() {
    		BufferedReader in = null;
    		PrintWriter out = null;
    		try {
    			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
    			out = new PrintWriter(this.socket.getOutputStream(), true);
    			String body = null;
    			while(true){
    				body = in.readLine();
    				if(body == null) break;
    				System.out.println("Server :" + body);
    				out.println("服务器端回送响的应数据.");
    			}
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if(in != null){
    				try {
    					in.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			if(out != null){
    				try {
    					out.close();
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    			if(socket != null){
    				try {
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			socket = null;
    		}
    	}
    }
    

      nio:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    public class Client {
    
    	//需要一个Selector 
    	public static void main(String[] args) {
    		
    		//创建连接的地址
    		InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
    		
    		//声明连接通道
    		SocketChannel sc = null;
    		
    		//建立缓冲区
    		ByteBuffer buf = ByteBuffer.allocate(1024);
    		
    		try {
    			//打开通道
    			sc = SocketChannel.open();
    			//进行连接
    			sc.connect(address);
    			
    			while(true){
    				//定义一个字节数组,然后使用系统录入功能:
    				byte[] bytes = new byte[1024];
    				System.in.read(bytes);
    				
    				//把数据放到缓冲区中
    				buf.put(bytes);
    				//对缓冲区进行复位
    				buf.flip();
    				//写出数据
    				sc.write(buf);
    				//清空缓冲区数据
    				buf.clear();
    			}
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			if(sc != null){
    				try {
    					sc.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    }
    

      

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class Server implements Runnable{
    	//1 多路复用器(管理所有的通道)
    	private Selector seletor;
    	//2 建立缓冲区
    	private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    	//3 
    	private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    	public Server(int port){
    		try {
    			//1 打开路复用器
    			this.seletor = Selector.open();
    			//2 打开服务器通道
    			ServerSocketChannel ssc = ServerSocketChannel.open();
    			//3 设置服务器通道为非阻塞模式
    			ssc.configureBlocking(false);
    			//4 绑定地址
    			ssc.bind(new InetSocketAddress(port));
    			//5 把服务器通道注册到多路复用器上,并且监听阻塞事件
    			ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
    			
    			System.out.println("Server start, port :" + port);
    			
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void run() {
    		while(true){
    			try {
    				//1 必须要让多路复用器开始监听
    				this.seletor.select();
    				//2 返回多路复用器已经选择的结果集
    				Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
    				//3 进行遍历
    				while(keys.hasNext()){
    					//4 获取一个选择的元素
    					SelectionKey key = keys.next();
    					//5 直接从容器中移除就可以了
    					keys.remove();
    					//6 如果是有效的
    					if(key.isValid()){
    						//7 如果为阻塞状态
    						if(key.isAcceptable()){
    							this.accept(key);
    						}
    						//8 如果为可读状态
    						if(key.isReadable()){
    							this.read(key);
    						}
    						//9 写数据
    						if(key.isWritable()){
    							//this.write(key); //ssc
    						}
    					}
    					
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	private void write(SelectionKey key){
    		//ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
    		//ssc.register(this.seletor, SelectionKey.OP_WRITE);
    	}
    
    	private void read(SelectionKey key) {
    		try {
    			//1 清空缓冲区旧的数据
    			this.readBuf.clear();
    			//2 获取之前注册的socket通道对象
    			SocketChannel sc = (SocketChannel) key.channel();
    			//3 读取数据
    			int count = sc.read(this.readBuf);
    			//4 如果没有数据
    			if(count == -1){
    				key.channel().close();
    				key.cancel();
    				return;
    			}
    			//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
    			this.readBuf.flip();
    			//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
    			byte[] bytes = new byte[this.readBuf.remaining()];
    			//7 接收缓冲区数据
    			this.readBuf.get(bytes);
    			//8 打印结果
    			String body = new String(bytes).trim();
    			System.out.println("Server : " + body);
    			
    			// 9..可以写回给客户端数据 
    			
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    		
    	}
    
    	private void accept(SelectionKey key) {
    		try {
    			//1 获取服务通道
    			ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
    			//2 执行阻塞方法
    			SocketChannel sc = ssc.accept();
    			//3 设置阻塞模式
    			sc.configureBlocking(false);
    			//4 注册到多路复用器上,并设置读取标识
    			sc.register(this.seletor, SelectionKey.OP_READ);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String[] args) {
    		
    		new Thread(new Server(8765)).start();;
    	}
    }
    

      

    import java.nio.IntBuffer;
    
    public class TestBuffer {
    	
    	public static void main(String[] args) {
    		
    		// 1 基本操作
    		
    		//创建指定长度的缓冲区
    		IntBuffer buf = IntBuffer.allocate(10);
    		buf.put(13);// position位置:0 - > 1
    		buf.put(21);// position位置:1 - > 2
    		buf.put(35);// position位置:2 - > 3
    		//把位置复位为0,也就是position位置:3 - > 0
    		buf.flip();
    		System.out.println("使用flip复位:" + buf);
    		System.out.println("容量为: " + buf.capacity());	//容量一旦初始化后不允许改变(warp方法包裹数组除外)
    		System.out.println("限制为: " + buf.limit());		//由于只装载了三个元素,所以可读取或者操作的元素为3 则limit=3
    		
    		
    		System.out.println("获取下标为1的元素:" + buf.get(1));
    		System.out.println("get(index)方法,position位置不改变:" + buf);
    		buf.put(1, 4);
    		System.out.println("put(index, change)方法,position位置不变:" + buf);;
    		
    		for (int i = 0; i < buf.limit(); i++) {
    			//调用get方法会使其缓冲区位置(position)向后递增一位
    			System.out.print(buf.get() + "	");
    		}
    		System.out.println("buf对象遍历之后为: " + buf);
    		
    		
    		// 2 wrap方法使用
    		
    		//  wrap方法会包裹一个数组: 一般这种用法不会先初始化缓存对象的长度,因为没有意义,最后还会被wrap所包裹的数组覆盖掉。 
    		//  并且wrap方法修改缓冲区对象的时候,数组本身也会跟着发生变化。                     
    		int[] arr = new int[]{1,2,5};
    		IntBuffer buf1 = IntBuffer.wrap(arr);
    		System.out.println(buf1);
    		
    		IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2);
    		//这样使用表示容量为数组arr的长度,但是可操作的元素只有实际进入缓存区的元素长度
    		System.out.println(buf2);
    		
    		
    		
    		// 3 其他方法
    		
    		IntBuffer buf1 = IntBuffer.allocate(10);
    		int[] arr = new int[]{1,2,5};
    		buf1.put(arr);
    		System.out.println(buf1);
    		//一种复制方法
    		IntBuffer buf3 = buf1.duplicate();
    		System.out.println(buf3);
    		
    		//设置buf1的位置属性
    		//buf1.position(0);
    		buf1.flip();
    		System.out.println(buf1);
    		
    		System.out.println("可读数据为:" + buf1.remaining());
    		
    		int[] arr2 = new int[buf1.remaining()];
    		//将缓冲区数据放入arr2数组中去
    		buf1.get(arr2);
    		for(int i : arr2){
    			System.out.print(Integer.toString(i) + ",");
    		}
    	}
    }
    

      

     

  • 相关阅读:
    Hiho----无间道之并查集
    Linux之压缩与解压
    Linux之用户和用户组
    Linux之关机/重启命令及一些符号
    Linux之基本操作命令
    Linux之vi/vim
    解决eclipse中maven报错Failed to read artifact descriptor for xxxx:jar
    CentOS 7下 solr 单机版安装与配置
    CentOS 7下 Tomcat7 安装与配置
    CentOS 7下 JDK 1.7 安装与配置
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/10905272.html
Copyright © 2011-2022 走看看