zoukankan      html  css  js  c++  java
  • 实现RPC就是这么简单 之 Netty 实现(二)心跳检测和粘包处理

        TCP 粘包和拆包

            TCP 是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上,一个完整的数据包可能会被TCP拆分为多个数据包进行发送,也有可能把多个小的数据包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

        粘包问题的解决策略

             由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组,这个问题只能够通过上层的应用协议栈设计来解决;大致可以分为:

    • 定长消息
    • 在包尾增加分隔符进行分割
    • 将消息分为消息头和消息体,消息头中包含消息体长度
    • 更复杂的应用层协议

           在这里使用消息头和消息体的方式解决粘包问题。Netty提供了LengthFieldPrepender和LengthFieldBasedFrameDecoder进行消息头部大编码和解码

         本次实在 实现RPC就是这么简单 之 Netty 实现 基础上优化,仅仅贴出了修改代码。

    /**
     *
     * @author zhangwei
     * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37 $
     */
    public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
    
    	/** 服务端口号 **/
    	private int port = 12000;
    
    	private RpcServerHandler rpcServerHandler;
    
    	private void publishedService() throws Exception {
    
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup(5);
    		try {
    			ServerBootstrap serverBootstrap = new ServerBootstrap();
    			serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    					.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel channel) throws Exception {
    							channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(2048, 0, 2, 0, 2))
    									.addLast(new LengthFieldPrepender(2))
    									.addLast(new RpcDecoder(SrpcRequest.class))
    									.addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler)
    									.addLast("idleStateHandler", new IdleStateHandler(10, 5, 0))
    									.addLast(new ChannelInboundHandlerAdapter() {
    
    								/**
    								 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
    								 *      java.lang.Object)
    								 */
    								public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
    									if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
    										IdleStateEvent event = (IdleStateEvent) evt;
    										if (event.state() == IdleState.READER_IDLE) {
    											System.out.println("read 空闲");
    											ctx.disconnect();
    										} else if (event.state() == IdleState.WRITER_IDLE) {
    											System.out.println("write 空闲");
    										} else if (event.state() == IdleState.ALL_IDLE) {
    											System.out.println("读写都空闲");
    										}
    									}
    
    								}
    
    							});
    						}
    					});
    
    			// 绑定主机+端口
    			ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync();
    
    			// 等待服务监听端口关闭
    			future.channel().closeFuture().sync();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    	/**
    	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
    	 */
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		publishedService();
    	}
    
    	/**
    	 * @see org.springframework.context.Lifecycle#start()
    	 */
    	@Override
    	public void start() {
    	}
    
    	/**
    	 * @see org.springframework.context.Lifecycle#stop()
    	 */
    	@Override
    	public void stop() {
    
    	}
    
    	/**
    	 * @see org.springframework.context.Lifecycle#isRunning()
    	 */
    	@Override
    	public boolean isRunning() {
    		return false;
    	}
    
    	/**
    	 * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
    	 */
    	@Override
    	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    		Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
    		Map<String, Object> handlerMap = new HashMap<String, Object>();
    		if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
    			for (Object serviceBean : serviceBeanMap.values()) {
    				String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf().getName();
    				handlerMap.put(interfaceName, serviceBean);
    			}
    		}
    		System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet());
    		rpcServerHandler = new RpcServerHandler(handlerMap);
    	}
    }
     
    
    /**
     *
     * @author zhangwei_PF
     * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31 $
     */
    @Sharable
    public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> {
    
    	// final CountDownLatch latch = new CountDownLatch(1);
    
    	private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1);
    
    	// private SrpcResponse response;
    
    	@Override
    	public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception {
    		// this.response = response;
    		// latch.countDown();
    		responseHodler.put(response);
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		ctx.close();
    	}
    
    	public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception {
    
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap bootstrap = new Bootstrap();
    			bootstrap.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG))
    					.handler(new ChannelInitializer<SocketChannel>() {
    
    						@Override
    						protected void initChannel(SocketChannel ch) throws Exception {
    							ch.pipeline().addLast(new LengthFieldPrepender(2))
    									.addLast(new LengthFieldBasedFrameDecoder(2048, 0, 2, 0, 2))
    									.addLast(new RpcEncoder(SrpcRequest.class))
    									.addLast(new RpcDecoder(SrpcResponse.class))
    									.addLast(SrpcRequestSender.this)
    									.addLast("idleStateHandler", new IdleStateHandler(10, 10, 0))
    									.addLast(new ChannelInboundHandlerAdapter() {
    
    								/**
    								 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
    								 *      java.lang.Object)
    								 */
    								public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
    									if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
    										IdleStateEvent event = (IdleStateEvent) evt;
    										if (event.state() == IdleState.READER_IDLE) {
    											System.out.println("read idle");
    
    										} else if (event.state() == IdleState.WRITER_IDLE) {
    											System.out.println("write idle");
    											ctx.channel().writeAndFlush("ping");
    										} else if (event.state() == IdleState.ALL_IDLE) {
    											System.out.println("all idle");
    											ctx.channel().writeAndFlush("ping");
    										}
    									}
    
    								}
    
    							});
    						}
    
    					});
    			ChannelFuture future = bootstrap.connect(host, port).sync();
    			Channel channel = future.channel();
    
    			channel.writeAndFlush(request).sync();
    			/**
    			 *
    			 * 使用闭锁实现等待
    			 */
    			// latch.await();
    			SrpcResponse response = responseHodler.take();
    			channel.closeFuture();
    			return response;
    		} finally {
    			group.shutdownGracefully();
    		}
    
    	}
    
    }
    

      

  • 相关阅读:
    [NOI2001]炮兵阵地
    POJ 2411 Mondriaan's Dream
    【模板】割点(割顶)
    [Noip2007]Core树网的核
    2018.09.09模拟总结
    2018.09.08模拟总结
    [USACO11JAN]Roads and Planes
    最优贸易
    [USACO08JAN]Telephone Lines
    Calabash(葫芦娃)
  • 原文地址:https://www.cnblogs.com/wei-zw/p/8797746.html
Copyright © 2011-2022 走看看