zoukankan      html  css  js  c++  java
  • NIO 服务端TCP连接管理的方案

    最近做的一个项目需要在服务端对连接端进行管理,故将方案记录于此。

    方案实现的结果与背景

       因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。

       完成类似于心跳的接收以及处理

        即:

          当连接过长事件(keep-alive Time)没有发送新的消息时,则在服务端切断其客户端的连接。

    具体细节

        在处理连接(Accpet事件)时:

          将SocketChannel存入HashSet;

             以SocketChannel的HashCode作为Key来存储连接时间(以服务器时间为准)

          (开辟一个HashMap或者利用Redis进行缓存)

       在处理读取(Readable)事件时:

           以SocketChannel的HashCode作为Key来存储读取事件发生的时间(以服务器时间为准);

           处理读取事件


        开启一个定时反复运行的管理线程,每次运行对HashSet中的SocketChannel进行轮询,并以SocketChannel的HashCode去取对应的时间(LastSendTime)

        获取当前时间(CurrentTime),进行计算,如果大于Keep-Alive Time,则删除HashMap(/Redis)中的键值对,以及HashSet中的SocketChannel对象,并关闭SocketChannel。

         连接端

           ServerSocketChannel serverChannel = ServerSocketChannel.open();
                serverChannel.bind(new InetSocketAddress("127.0.0.2",1234));
                serverChannel.configureBlocking(false);
                AnalyisUtil util=new AnalyisUtil();
                RedisConnectionPool connectionPool=new RedisConnectionPool();
                Selector selector = Selector.open();
                SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    int select = selector.select();
                    if (select > 0) {
                        Set<SelectionKey> selectedKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectedKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            // 接收连接请求
                            if (selectionKey.isAcceptable()) {
                                ServerSocketChannel channel = (ServerSocketChannel) selectionKey
                                        .channel();
                                SocketChannel socketChannel = channel.accept();
                                logger.info("接收到一个新的连接请求"+ socketChannel.getRemoteAddress().toString());
                                socketChannel.configureBlocking(false);
                                //每接收请求,注册到同一个selector中处理
                                socketChannel.register(selector, SelectionKey.OP_READ);
                     //在Redis中存储连接的时间,以SocketChannel的HashCode作为Key connectionPool.getJedis().set(
    "LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                     //将SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); }
    else if (selectionKey.isReadable()) { //执行读事件,在读事件的处理函数中,重新以SocketChannel的HashCode再次存储事件,以刷新时间 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }

        连接处理线程

        

    package ConnectionSystem;
    
    import Util.RedisConnectionPool;
    import org.apache.log4j.Logger;
    import redis.clients.jedis.Jedis;
    import java.io.IOException;
    import java.net.SocketAddress;
    import java.nio.channels.SocketChannel;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Iterator;
    
    public class ConnectionManagerTask implements Runnable {
        private HashSet<SocketChannel> connectedSokectList;
        private long keepalive_Time=5000;
        private Logger logger=Logger.getLogger(ConnectionManagerTask.class);
    
        ConnectionManagerTask(HashSet<SocketChannel> list){
            logger.info("TCP监听已经启动... ...");
            this.connectedSokectList=list;
        }
    
        private long cucalateIsAlive(Date lastSendTime) throws ParseException {
            Date currentTime=new Date();
            return currentTime.getTime()-lastSendTime.getTime();
        }
    
        private boolean handleSocket(SocketChannel channel){
            int channel_code= channel.hashCode();
            RedisConnectionPool connectionPool=new RedisConnectionPool();
            Jedis jedisCilent;
            SocketAddress ipLocation;
            try{
                ipLocation=channel.getRemoteAddress();
                jedisCilent=connectionPool.getJedis();
                String SendTime=jedisCilent.get("LST_"+channel_code);
                if(SendTime!=null) {
                    SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    Date lastSendTime = dfs.parse(SendTime);
                    if (cucalateIsAlive(lastSendTime) > keepalive_Time) {
                        //超过时间
                        try {
                            if(channel.isConnected()){
                                channel.close();
                                jedisCilent.del("LST_"+channel_code);
                                logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ",上次回应时间:" + lastSendTime);
                            }else {
                                logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."+ ",上次回应时间:" + lastSendTime);
                            }
                            return true;
                        } catch (IOException e) {
                            logger.error("通道,ip:" + ipLocation + "关闭时发生了异常",e);
                        }
                    }else {
                        return false;
                    }
                }
                if(channel.isConnected()){
                    channel.close();
                    logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ":未检测到登陆时间... ...");
                }else {
                    logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ...");
                }
    
            }catch (Exception e){
                logger.error("通道关闭时发生了异常",e);
            }
            return true;
        }
    
        @Override
        public void run() {
            logger.info("当前连接数"+connectedSokectList.size());
            if(connectedSokectList.isEmpty()){
                return;
            }
            Iterator<SocketChannel> iterator = connectedSokectList.iterator();
            while (iterator.hasNext()){
                SocketChannel socketChannel=iterator.next();
                Boolean removeFlag=handleSocket(socketChannel);
                if(removeFlag){
                    iterator.remove();
                }
            }
        }
    }

        

  • 相关阅读:
    基于雪花算法的单机版
    Spring cloud gateway自定义filter以及负载均衡
    logback转义符与MDC
    录音地址文件保存
    maven加载本地jar
    ES Log4J配置信息
    java线程池
    openstreetmap的数据下载
    php更新版本后(路径更改后)要做的调整
    重启IIS
  • 原文地址:https://www.cnblogs.com/rekent/p/8968194.html
Copyright © 2011-2022 走看看