zoukankan      html  css  js  c++  java
  • 原创 netty初探及业务具体实现

    1、实现ApplicationRunner,完成生命周期绑定
    @Component
    public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyBootsrapRunner.class);
    
    @Value("${netty.websocket.port}")
    private int port;
    @Value("${netty.websocket.ip}")
    private String ip;
    @Value("${netty.websocket.path}")
    private String path;
    @Value("${netty.websocket.max-frame-size}")
    private long maxFrameSize;
    
    private ServerSocketChannel serverChannel;
    
    
    public void run(ApplicationArguments args) throws Exception {
    	EventLoopGroup bossGroup = new NioEventLoopGroup();
    	EventLoopGroup workerGroup = new NioEventLoopGroup();
    	ServerBootstrap serverBootstrap = new ServerBootstrap();
    	serverBootstrap.group(bossGroup, workerGroup)
    	.channel(NioServerSocketChannel.class)
    	//暂时不能处理的客户端连接请求队列长度
    	.option(ChannelOption.SO_BACKLOG, 300)
    	//有数据立即发送
    	.childOption(ChannelOption.TCP_NODELAY, true)
    	//保持连接
    	.childOption(ChannelOption.SO_KEEPALIVE, true)
    	.childHandler(new ChannelInitializer<SocketChannel>() {
    		@Override
    		protected void initChannel(SocketChannel socketChannel) throws Exception {
    			socketChannel.pipeline().addLast(new SocketChooseHandler());
    		}
    	});
    	//绑定端口,同步等待成功
    	try {
    		ChannelFuture future = serverBootstrap.bind(port).sync();
    		if (future.isSuccess()) {
    			serverChannel = (ServerSocketChannel) future.channel();
    			LOGGER.info("netty 服务启动,ip={},port={}", this.ip, this.port);
    		} else {
    			LOGGER.info("服务端启动失败:{}", future.cause().getMessage());
    		}
    		//等待服务监听端口关闭,主线程阻塞
    		future.channel().closeFuture().sync();
    	} finally {
    		bossGroup.shutdownGracefully();
    		workerGroup.shutdownGracefully();
    	}
    }
    
    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
    	if (this.serverChannel != null) {
    		this.serverChannel.close();
    	}
    	LOGGER.info("websocket 服务停止");
    }
    
    }
    

    2、兼容socket,选择处理器
    @Component
    public class SocketChooseHandler extends ByteToMessageDecoder {
    // WebSocket握手的协议前缀
    private static final String WEBSOCKET_PREFIX = "GET /";

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        String protocol = getBufStart(in);
        if (protocol.startsWith(WEBSOCKET_PREFIX)) {
            //  websocket连接时,执行以下处理
            // HttpServerCodec:将请求和应答消息解码为HTTP消息
            ctx.pipeline().addLast(new HttpServerCodec());
            // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
            ctx.pipeline().addLast(new HttpObjectAggregator(65535));
            // ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
            ctx.pipeline().addLast(new ChunkedWriteHandler());
            ctx.pipeline().addLast(new WebSocketFrameAggregator(65535));
            //  若超过80秒未收到约定心跳,则主动断开channel释放资源
            ctx.pipeline().addLast(new IdleStateHandler(80, 0, 0));
            //用于处理websocket, /ws为访问websocket时的uri
            ctx.pipeline().addLast(new WebSocketServerProtocolHandler("/channel"));
            ctx.pipeline().addLast(new WebsocketMessageHandler());
        } else {
            //  常规TCP连接时,执行以下处理
            ctx.pipeline().addLast(new IdleStateHandler(0, 0, 60));
    	// ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 2, 2, -4, 4, true));
    	// ctx.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { 'k' })));
            ctx.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ctx.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
            ctx.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
            ctx.pipeline().addLast(new TcpMessageHandler());
        }
        in.resetReaderIndex();
        ctx.pipeline().remove(this.getClass());
    }
    
    private String getBufStart(ByteBuf in) {
        int length = in.readableBytes();
        // 标记读位置
        in.markReaderIndex();
        byte[] content = new byte[length > 10? 10 : length];
        in.readBytes(content);
        return new String(content);
    }
    

    }
    3、处理请求流
    socket处理器
    @Slf4j
    @Sharable
    public class TcpMessageHandler extends ChannelInboundHandlerAdapter {
    // 用于记录和管理所有客户端的channel
    public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    DeviceUseRecordService deviceUseRecordService = SpringUtil.getBean(DeviceUseRecordService.class);
    /**
     * @Description:客户端与服务端创建连接的时候调用
     * @author TangYan
     * @date 2020/5/12 10:45
     * @param ctx
     * @return void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("netty:TCP客户端与服务端连接开始");
    }
    
    /**
     * @Description:客户端与服务端断开连接时调用
     * @author TangYan
     * @date 2020/5/12 10:45
     * @param ctx
     * @return void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("netty:TCP客户端与服务端连接关闭");
    }
    
    /**
     * @Description: 服务端接收客户端发送过来的数据结束之后调用
     * @author TangYan
     * @date 2020/5/12 10:44
     * @param ctx
     * @return void
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        log.info("netty:TCP信息接收完毕");
    }
    
    /**
     * @Description:程出现异常的时候调用
     * @author TangYan
     * @date 2020/5/12 10:44
     * @param ctx
     * @param cause
     * @return void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("", cause);
        ctx.channel().writeAndFlush(0 + "
    ");
        ctx.close();
        users.remove(ctx.channel());
        UserChannelRel.removeByValue(ctx.channel().id());
        log.info("netty:当前连接出现异常");
    }
    
    /**
     * @Description:服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
     * @author TangYan
     * @date 2020/5/12 10:44
     * @param channelHandlerContext
     * @param info
     * @return void
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("来自TCP客户端" + ctx.channel().remoteAddress() + "的信息: " + msg);
        long k = deviceUseRecordService.uploadApp(ctx.channel().remoteAddress().toString().substring(1).split(":")[0], msg.toString());
        ctx.channel().writeAndFlush(k + "
    ");
    }
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
        users.add(ctx.channel());
        log.info("netty:TCP客户端与服务端连接开始");
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    	String channelId = ctx.channel().id().asLongText();
        // 当触发handlerRemoved,ChannelGroup会自动移除对应的客户端channel
        users.remove(ctx.channel());
    	// UserChannelRel.removeByValue(ctx.channel().id());
        log.info("客户端被移除,channelId为:" + channelId);
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲)
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt; // 强制类型转换
            if (event.state() == IdleState.READER_IDLE) {
                log.info("进入读空闲...");
            } else if (event.state() == IdleState.WRITER_IDLE) {
            	log.info("进入写空闲...");
            } else if (event.state() == IdleState.ALL_IDLE) {
            	log.info("channel关闭前users数量为:"+ users.size());
            	log.info("进入读写空闲...");
                Channel channel = ctx.channel();
                //关闭无用的channel,以防资源浪费
                channel.close();
                log.info("channel关闭后users数量为:"+ users.size());
            }
        }
    }
    }
    

    netty处理器
    @Sharable
    public class WebsocketMessageHandler extends SimpleChannelInboundHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);
    
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
    	if (msg instanceof TextWebSocketFrame) {
    		TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
    		// 业务层处理数据
    		try {
    			CommonUserService commonUserService = SpringUtil.getBean(CommonUserService.class);
    			commonUserService.loadUserByUsername(textWebSocketFrame.text());
    		} catch (Exception e) {
    			LOGGER.error(e.getMessage());
    		}
    		// 响应客户端
    		ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
    	} else {
    		// 不接受文本以外的数据帧类型
    		ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
    	}
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	super.channelInactive(ctx);
    	LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	super.channelActive(ctx);
    	LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
    }
    }
    

    一个简单的netty处理业务的功能实现了。

    化繁为简,极致高效。 所有代码为本人原创,转载请联系本人。
  • 相关阅读:
    oracle之三 自动任务调度
    oracle之三资源管理
    oracle之三rman 维护
    oracle之三目录库和辅助库
    oracle之三rman 不完全恢复
    oracle之三rman 备份
    oracle之三rman 完全恢复
    oracle之三RMAN概述
    oracle之三闪回flashback
    Vimium
  • 原文地址:https://www.cnblogs.com/crissblog/p/14861462.html
Copyright © 2011-2022 走看看