zoukankan      html  css  js  c++  java
  • NIO 服务端TCP连接管理的方案

    最近做的一个项目需要在服务端对连接端进行管理,故将方案记录于此。

    方案实现的结果与背景

       因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。

       完成类似于心跳的接收以及处理

        即:

          当连接过长事件(keep-alive Time)没有发送新的消息时,则在服务端切断其客户端的连接。

    具体细节

        在处理连接(Accpet事件)时:

          将SocketChannel存入HashSet;

             以SocketChannel的HashCode作为Key来存储连接时间(以服务器时间为准)

          (开辟一个HashMap或者利用Redis进行缓存)

       在处理读取(Readable)事件时:

           以SocketChannel的HashCode作为Key来存储读取事件发生的时间(以服务器时间为准);

           处理读取事件


        开启一个定时反复运行的管理线程,每次运行对HashSet中的SocketChannel进行轮询,并以SocketChannel的HashCode去取对应的时间(LastSendTime)

        获取当前时间(CurrentTime),进行计算,如果大于Keep-Alive Time,则删除HashMap(/Redis)中的键值对,以及HashSet中的SocketChannel对象,并关闭SocketChannel。

         连接端

           ServerSocketChannel serverChannel = ServerSocketChannel.open();
                serverChannel.bind(new InetSocketAddress("127.0.0.2",1234));
                serverChannel.configureBlocking(false);
                AnalyisUtil util=new AnalyisUtil();
                RedisConnectionPool connectionPool=new RedisConnectionPool();
                Selector selector = Selector.open();
                SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    int select = selector.select();
                    if (select > 0) {
                        Set<SelectionKey> selectedKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectedKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            // 接收连接请求
                            if (selectionKey.isAcceptable()) {
                                ServerSocketChannel channel = (ServerSocketChannel) selectionKey
                                        .channel();
                                SocketChannel socketChannel = channel.accept();
                                logger.info("接收到一个新的连接请求"+ socketChannel.getRemoteAddress().toString());
                                socketChannel.configureBlocking(false);
                                //每接收请求,注册到同一个selector中处理
                                socketChannel.register(selector, SelectionKey.OP_READ);
                     //在Redis中存储连接的时间,以SocketChannel的HashCode作为Key connectionPool.getJedis().set(
    "LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                     //将SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); }
    else if (selectionKey.isReadable()) { //执行读事件,在读事件的处理函数中,重新以SocketChannel的HashCode再次存储事件,以刷新时间 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }

        连接处理线程

        

    package ConnectionSystem;
    
    import Util.RedisConnectionPool;
    import org.apache.log4j.Logger;
    import redis.clients.jedis.Jedis;
    import java.io.IOException;
    import java.net.SocketAddress;
    import java.nio.channels.SocketChannel;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Iterator;
    
    public class ConnectionManagerTask implements Runnable {
        private HashSet<SocketChannel> connectedSokectList;
        private long keepalive_Time=5000;
        private Logger logger=Logger.getLogger(ConnectionManagerTask.class);
    
        ConnectionManagerTask(HashSet<SocketChannel> list){
            logger.info("TCP监听已经启动... ...");
            this.connectedSokectList=list;
        }
    
        private long cucalateIsAlive(Date lastSendTime) throws ParseException {
            Date currentTime=new Date();
            return currentTime.getTime()-lastSendTime.getTime();
        }
    
        private boolean handleSocket(SocketChannel channel){
            int channel_code= channel.hashCode();
            RedisConnectionPool connectionPool=new RedisConnectionPool();
            Jedis jedisCilent;
            SocketAddress ipLocation;
            try{
                ipLocation=channel.getRemoteAddress();
                jedisCilent=connectionPool.getJedis();
                String SendTime=jedisCilent.get("LST_"+channel_code);
                if(SendTime!=null) {
                    SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    Date lastSendTime = dfs.parse(SendTime);
                    if (cucalateIsAlive(lastSendTime) > keepalive_Time) {
                        //超过时间
                        try {
                            if(channel.isConnected()){
                                channel.close();
                                jedisCilent.del("LST_"+channel_code);
                                logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ",上次回应时间:" + lastSendTime);
                            }else {
                                logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."+ ",上次回应时间:" + lastSendTime);
                            }
                            return true;
                        } catch (IOException e) {
                            logger.error("通道,ip:" + ipLocation + "关闭时发生了异常",e);
                        }
                    }else {
                        return false;
                    }
                }
                if(channel.isConnected()){
                    channel.close();
                    logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ":未检测到登陆时间... ...");
                }else {
                    logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ...");
                }
    
            }catch (Exception e){
                logger.error("通道关闭时发生了异常",e);
            }
            return true;
        }
    
        @Override
        public void run() {
            logger.info("当前连接数"+connectedSokectList.size());
            if(connectedSokectList.isEmpty()){
                return;
            }
            Iterator<SocketChannel> iterator = connectedSokectList.iterator();
            while (iterator.hasNext()){
                SocketChannel socketChannel=iterator.next();
                Boolean removeFlag=handleSocket(socketChannel);
                if(removeFlag){
                    iterator.remove();
                }
            }
        }
    }

        

  • 相关阅读:
    使用 ASP.NET Core MVC 创建 Web API(五)
    使用 ASP.NET Core MVC 创建 Web API(四)
    使用 ASP.NET Core MVC 创建 Web API(三)
    使用 ASP.NET Core MVC 创建 Web API(二)
    使用 ASP.NET Core MVC 创建 Web API(一)
    学习ASP.NET Core Razor 编程系列十九——分页
    学习ASP.NET Core Razor 编程系列十八——并发解决方案
    一个屌丝程序猿的人生(九十八)
    一个屌丝程序猿的人生(九十七)
    一个屌丝程序猿的人生(九十五)
  • 原文地址:https://www.cnblogs.com/rekent/p/8968194.html
Copyright © 2011-2022 走看看