zoukankan      html  css  js  c++  java
  • NIO 服务端TCP连接管理的方案

    最近做的一个项目需要在服务端对连接端进行管理,故将方案记录于此。

    方案实现的结果与背景

       因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。

       完成类似于心跳的接收以及处理

        即:

          当连接过长事件(keep-alive Time)没有发送新的消息时,则在服务端切断其客户端的连接。

    具体细节

        在处理连接(Accpet事件)时:

          将SocketChannel存入HashSet;

             以SocketChannel的HashCode作为Key来存储连接时间(以服务器时间为准)

          (开辟一个HashMap或者利用Redis进行缓存)

       在处理读取(Readable)事件时:

           以SocketChannel的HashCode作为Key来存储读取事件发生的时间(以服务器时间为准);

           处理读取事件


        开启一个定时反复运行的管理线程,每次运行对HashSet中的SocketChannel进行轮询,并以SocketChannel的HashCode去取对应的时间(LastSendTime)

        获取当前时间(CurrentTime),进行计算,如果大于Keep-Alive Time,则删除HashMap(/Redis)中的键值对,以及HashSet中的SocketChannel对象,并关闭SocketChannel。

         连接端

           ServerSocketChannel serverChannel = ServerSocketChannel.open();
                serverChannel.bind(new InetSocketAddress("127.0.0.2",1234));
                serverChannel.configureBlocking(false);
                AnalyisUtil util=new AnalyisUtil();
                RedisConnectionPool connectionPool=new RedisConnectionPool();
                Selector selector = Selector.open();
                SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    int select = selector.select();
                    if (select > 0) {
                        Set<SelectionKey> selectedKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectedKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            // 接收连接请求
                            if (selectionKey.isAcceptable()) {
                                ServerSocketChannel channel = (ServerSocketChannel) selectionKey
                                        .channel();
                                SocketChannel socketChannel = channel.accept();
                                logger.info("接收到一个新的连接请求"+ socketChannel.getRemoteAddress().toString());
                                socketChannel.configureBlocking(false);
                                //每接收请求,注册到同一个selector中处理
                                socketChannel.register(selector, SelectionKey.OP_READ);
                     //在Redis中存储连接的时间,以SocketChannel的HashCode作为Key connectionPool.getJedis().set(
    "LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                     //将SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); }
    else if (selectionKey.isReadable()) { //执行读事件,在读事件的处理函数中,重新以SocketChannel的HashCode再次存储事件,以刷新时间 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }

        连接处理线程

        

    package ConnectionSystem;
    
    import Util.RedisConnectionPool;
    import org.apache.log4j.Logger;
    import redis.clients.jedis.Jedis;
    import java.io.IOException;
    import java.net.SocketAddress;
    import java.nio.channels.SocketChannel;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Iterator;
    
    public class ConnectionManagerTask implements Runnable {
        private HashSet<SocketChannel> connectedSokectList;
        private long keepalive_Time=5000;
        private Logger logger=Logger.getLogger(ConnectionManagerTask.class);
    
        ConnectionManagerTask(HashSet<SocketChannel> list){
            logger.info("TCP监听已经启动... ...");
            this.connectedSokectList=list;
        }
    
        private long cucalateIsAlive(Date lastSendTime) throws ParseException {
            Date currentTime=new Date();
            return currentTime.getTime()-lastSendTime.getTime();
        }
    
        private boolean handleSocket(SocketChannel channel){
            int channel_code= channel.hashCode();
            RedisConnectionPool connectionPool=new RedisConnectionPool();
            Jedis jedisCilent;
            SocketAddress ipLocation;
            try{
                ipLocation=channel.getRemoteAddress();
                jedisCilent=connectionPool.getJedis();
                String SendTime=jedisCilent.get("LST_"+channel_code);
                if(SendTime!=null) {
                    SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    Date lastSendTime = dfs.parse(SendTime);
                    if (cucalateIsAlive(lastSendTime) > keepalive_Time) {
                        //超过时间
                        try {
                            if(channel.isConnected()){
                                channel.close();
                                jedisCilent.del("LST_"+channel_code);
                                logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ",上次回应时间:" + lastSendTime);
                            }else {
                                logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."+ ",上次回应时间:" + lastSendTime);
                            }
                            return true;
                        } catch (IOException e) {
                            logger.error("通道,ip:" + ipLocation + "关闭时发生了异常",e);
                        }
                    }else {
                        return false;
                    }
                }
                if(channel.isConnected()){
                    channel.close();
                    logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ":未检测到登陆时间... ...");
                }else {
                    logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ...");
                }
    
            }catch (Exception e){
                logger.error("通道关闭时发生了异常",e);
            }
            return true;
        }
    
        @Override
        public void run() {
            logger.info("当前连接数"+connectedSokectList.size());
            if(connectedSokectList.isEmpty()){
                return;
            }
            Iterator<SocketChannel> iterator = connectedSokectList.iterator();
            while (iterator.hasNext()){
                SocketChannel socketChannel=iterator.next();
                Boolean removeFlag=handleSocket(socketChannel);
                if(removeFlag){
                    iterator.remove();
                }
            }
        }
    }

        

  • 相关阅读:
    把git项目放到个人服务器上
    关于fcitx无法切换输入法的问题解决
    博客变迁通知
    (欧拉回路 并查集 别犯傻逼的错了) 7:欧拉回路 OpenJudge 数据结构与算法MOOC / 第七章 图 练习题(Excercise for chapter7 graphs)
    (并查集) HDU 1856 More is better
    (并查集 不太会) HDU 1272 小希的迷宫
    (并查集 注意别再犯傻逼的错了) HDU 1213 How Many Tables
    (最小生成树 Kruskal算法) 51nod 1212 无向图最小生成树
    (并查集) HDU 1232 畅通工程
    (最小生成树 Prim) HDU 1233 还是畅通工程
  • 原文地址:https://www.cnblogs.com/rekent/p/8968194.html
Copyright © 2011-2022 走看看