zoukankan      html  css  js  c++  java
  • 利用netty实现支持高并发的Tcp短连接接收服务

          如果你对Netty有所了解,我们利用Netty写Tcp服务时,通常会继承SimpleChannelUpstreamHandler类,重写messageReceived函数进行数据的接收,如下就是个简单的tcp短连接服务样例:

    public class TelnetServerHandler extends SimpleChannelUpstreamHandler {
    
        private static final Logger logger = Logger.getLogger(
                TelnetServerHandler.class.getName());
    
        @Override
        public void handleUpstream(
                ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            if (e instanceof ChannelStateEvent) {
                logger.info(e.toString());
            }
            super.handleUpstream(ctx, e);
        }
    
        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            // Send greeting for a new connection.
            e.getChannel().write(
                    "Welcome to " + InetAddress.getLocalHost().getHostName() + "!
    ");
            e.getChannel().write("It is " + new Date() + " now.
    ");
        }
    
        @Override
        public void messageReceived(
                ChannelHandlerContext ctx, MessageEvent e) {
    
            String request = (String) e.getMessage();
    
            // Generate and write a response.
            String response;
            boolean close = false;
            if (request.length() == 0) {
                response = "Please type something.
    ";
            } else if (request.toLowerCase().equals("bye")) {
                response = "Have a good day!
    ";
                close = true;
            } else {
                response = "Did you say '" + request + "'?
    ";
            }
    
            // We do not need to write a ChannelBuffer here.
            // We know the encoder inserted at TelnetPipelineFactory will do the conversion.
            ChannelFuture future = e.getChannel().write(response);
            if (close) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

       

          但是,如果有高并发用户,数据发送出现乱序。例如有A用户发送1--2--3--4--5--6,B用户发送a--b--c--d--e--f。而服务端接收的数据顺序可能是:1--2--a--b--3--4--c--d---e---f---5---6。
         所以为了支持高并发的用户,不合适采用单线程接收用户数据的机制,应该实现支持异步的数据接收;并且为每个连接通道建立状态来进行缓冲数据的维护,直至客户端关闭时,将接收的数据给业务层去处理,如下面 receiver.receive(receivedBytes)函数。

         下面是个简单的实现:

        public class ChannelHandler extends SimpleChannelHandler {
            private ConcurrentHashMap<SocketAddress,ByteArrayOutputStream> socket2ByteArrayMap = new ConcurrentHashMap<>();
            
            public ChannelHandler() {
            }
    
            public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                channels.add(e.getChannel());
                ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
                logger.info(e.getRemoteAddress().toString());
                
               try {
                    SocketAddress curSocketAddress = e.getRemoteAddress();
                    ByteArrayOutputStream  baos = socket2ByteArrayMap.get(curSocketAddress);
                    if(baos == null){
                        baos = new ByteArrayOutputStream(2000);
                        socket2ByteArrayMap.put(curSocketAddress, baos);
                    }
                    baos.write(buffer.array());
                    
                } catch (IOException ie) {
                    Thread.currentThread().interrupt();
                }
            }
    
            public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
                logger.error("Error", event.getCause());
                Channel c = context.getChannel();
                c.close().addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess())
                            channels.remove(future.getChannel());
                        else
                            logger.error("FAILED to close channel");
                    }
                });
            }
    
            @Override
            public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                SocketAddress curSocketAddress = e.getChannel().getRemoteAddress();
                ByteArrayOutputStream  baos =socket2ByteArrayMap.remove(curSocketAddress);
                if(baos != null && baos.size() != 0){
                    byte[] receivedBytes = baos.toByteArray();
                    receiver.receive(receivedBytes);
                }
                super.channelClosed(ctx, e);
            }
    
        }

         以上用socket2ByteArrayMap容器维护每个客户端接收到的数据,直至对方关闭为止。 

         很简单吧。。

  • 相关阅读:
    Combine 框架,从0到1 —— 4.在 Combine 中使用计时器
    Combine 框架,从0到1 —— 4.在 Combine 中使用通知
    Combine 框架,从0到1 —— 3.使用 Subscriber 控制发布速度
    Combine 框架,从0到1 —— 2.通过 ConnectablePublisher 控制何时发布
    使用 Swift Package Manager 集成依赖库
    iOS 高效灵活地配置可复用视图组件的主题
    构建个人博客网站(基于Python Flask)
    Swift dynamic关键字
    Swift @objcMembers
    仅用递归函数操作逆序一个栈(Swift 4)
  • 原文地址:https://www.cnblogs.com/gisorange/p/3455436.html
Copyright © 2011-2022 走看看