zoukankan      html  css  js  c++  java
  • netty: 以默认的ByteBuf作为传输数据

    client部分代码:

    //线程
    		EventLoopGroup worker = new NioEventLoopGroup();
    		//辅助类
    		Bootstrap b = new Bootstrap();
    		//注册server
    		b.group(worker)
    		.channel(NioSocketChannel.class)
    		.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				// TODO Auto-generated method stub
    				sc.pipeline().addLast(new ClientHandler());
    			}
    		});
    

      

    clientHandler部分代码:

    @Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		// TODO Auto-generated method stub
    		try {
    			ByteBuf buf = (ByteBuf)msg;
    			byte[] bytes = new byte[buf.readableBytes()];
    			buf.readBytes(bytes);
    			String result = new String(bytes, "utf-8");
    			System.out.println("Server: " + result);
    		}finally {
    			ReferenceCountUtil.release(msg);
    		}
    		
    	}
    

      

    下面查看完整代码 :

    client:

    public static void main(String[] args) throws InterruptedException {
    		
    		//线程
    		EventLoopGroup worker = new NioEventLoopGroup();
    		//辅助类
    		Bootstrap b = new Bootstrap();
    		//注册server
    		b.group(worker)
    		.channel(NioSocketChannel.class)
    		.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				// TODO Auto-generated method stub
                                   //不做任何处理,ByteBuf格式传输
    				sc.pipeline().addLast(new ClientHandler());
    			}
    		});
    		
    		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    		
    		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
    //		Thread.sleep(1000);
    //		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
    //		Thread.sleep(1000);
    //		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
    		//发送完毕,断开连接
    		cf.addListener(ChannelFutureListener.CLOSE);
    		
    		cf.channel().closeFuture().sync();
    		worker.shutdownGracefully();
    		
    	}
    	
    

      

    clientHandler代码:

    需要继承:ChannelHandlerAdapter这个类

    public class ClientHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		// TODO Auto-generated method stub
    		try {
    
                            //原始ByteBuf数据格式处理
    			ByteBuf buf = (ByteBuf)msg;
    			byte[] bytes = new byte[buf.readableBytes()];
    			buf.readBytes(bytes);
    			String result = new String(bytes, "utf-8");
    			System.out.println("Server: " + result);
    		}finally {
    
                           //接收处理完后,丢弃
    			ReferenceCountUtil.release(msg);
    		}
    		
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		// TODO Auto-generated method stub
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    	
    }    
    

      

    Server代码:

    public static void main(String[] args) throws InterruptedException {
    		
    		//第一个线程连接client端
    		EventLoopGroup boss = new NioEventLoopGroup();
    		//第二个线程处理逻辑
    		EventLoopGroup worker = new NioEventLoopGroup();
    		//辅助类,注册 server
    		ServerBootstrap b = new ServerBootstrap();
    		b.group(boss, worker)
    		.channel(NioServerSocketChannel.class)
    		.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				// TODO Auto-generated method stub
    				sc.pipeline().addLast(new ServerHandler());
    			}
    		});
    		
    		//绑定指定的端口方便监听
    		ChannelFuture cf = b.bind(8765).sync();
    		cf.channel().closeFuture().sync();
    		
    		boss.shutdownGracefully();
    		worker.shutdownGracefully();
    		
    	}
    

      

    serverHandler代码:

    需要继承:ChannelHandlerAdapter 类

    public class ServerHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		// TODO Auto-generated method stub
    		ByteBuf buf = (ByteBuf)msg;
    		byte[] bs = new byte[buf.readableBytes()];
    		buf.readBytes(bs);
    		String result = new String(bs, "utf-8");
    		System.out.println("Client: " + result);
    		
    		String response = "888888";
    		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    		//.addListener(ChannelFutureListener.CLOSE);
    		
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		// TODO Auto-generated method stub
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    	
    
    	
    }
    

      

  • 相关阅读:
    Linux中断管理 (2)软中断和tasklet
    Linux中断管理 (1)Linux中断管理机制
    Linux中断管理
    Linux内核访问用户空间文件:get_fs()/set_fs()的使用
    Linux进程管理 (1)进程的诞生
    Linux进程管理专题
    Linux内存管理 (23)一个内存Oops解析
    Linux内存管理 (22)内存检测技术(slub_debug/kmemleak/kasan)
    Linux内存管理 (21)OOM
    Linux内存管理 (20)最新更新和展望
  • 原文地址:https://www.cnblogs.com/achengmu/p/10944630.html
Copyright © 2011-2022 走看看