最近做的一个项目需要在服务端对连接端进行管理,故将方案记录于此。
方案实现的结果与背景
因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。
完成类似于心跳的接收以及处理
即:
当连接过长事件(keep-alive Time)没有发送新的消息时,则在服务端切断其客户端的连接。
具体细节
在处理连接(Accpet事件)时:
将SocketChannel存入HashSet;
以SocketChannel的HashCode作为Key来存储连接时间(以服务器时间为准)
(开辟一个HashMap或者利用Redis进行缓存)
在处理读取(Readable)事件时:
以SocketChannel的HashCode作为Key来存储读取事件发生的时间(以服务器时间为准);
处理读取事件
开启一个定时反复运行的管理线程,每次运行对HashSet中的SocketChannel进行轮询,并以SocketChannel的HashCode去取对应的时间(LastSendTime)
获取当前时间(CurrentTime),进行计算,如果大于Keep-Alive Time,则删除HashMap(/Redis)中的键值对,以及HashSet中的SocketChannel对象,并关闭SocketChannel。
连接端
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress("127.0.0.2",1234)); serverChannel.configureBlocking(false); AnalyisUtil util=new AnalyisUtil(); RedisConnectionPool connectionPool=new RedisConnectionPool(); Selector selector = Selector.open(); SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int select = selector.select(); if (select > 0) { Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); // 接收连接请求 if (selectionKey.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) selectionKey .channel(); SocketChannel socketChannel = channel.accept(); logger.info("接收到一个新的连接请求"+ socketChannel.getRemoteAddress().toString()); socketChannel.configureBlocking(false); //每接收请求,注册到同一个selector中处理 socketChannel.register(selector, SelectionKey.OP_READ);
//在Redis中存储连接的时间,以SocketChannel的HashCode作为Key connectionPool.getJedis().set("LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//将SocketChannel放入HashSet中管理 connectedSokectList.add(socketChannel); } else if (selectionKey.isReadable()) { //执行读事件,在读事件的处理函数中,重新以SocketChannel的HashCode再次存储事件,以刷新时间 util.handleReadEvent(selectionKey,messageQueue,logger); } } } }
连接处理线程
package ConnectionSystem; import Util.RedisConnectionPool; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; import java.util.Iterator; public class ConnectionManagerTask implements Runnable { private HashSet<SocketChannel> connectedSokectList; private long keepalive_Time=5000; private Logger logger=Logger.getLogger(ConnectionManagerTask.class); ConnectionManagerTask(HashSet<SocketChannel> list){ logger.info("TCP监听已经启动... ..."); this.connectedSokectList=list; } private long cucalateIsAlive(Date lastSendTime) throws ParseException { Date currentTime=new Date(); return currentTime.getTime()-lastSendTime.getTime(); } private boolean handleSocket(SocketChannel channel){ int channel_code= channel.hashCode(); RedisConnectionPool connectionPool=new RedisConnectionPool(); Jedis jedisCilent; SocketAddress ipLocation; try{ ipLocation=channel.getRemoteAddress(); jedisCilent=connectionPool.getJedis(); String SendTime=jedisCilent.get("LST_"+channel_code); if(SendTime!=null) { SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date lastSendTime = dfs.parse(SendTime); if (cucalateIsAlive(lastSendTime) > keepalive_Time) { //超过时间 try { if(channel.isConnected()){ channel.close(); jedisCilent.del("LST_"+channel_code); logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ",上次回应时间:" + lastSendTime); }else { logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."+ ",上次回应时间:" + lastSendTime); } return true; } catch (IOException e) { logger.error("通道,ip:" + ipLocation + "关闭时发生了异常",e); } }else { return false; } } if(channel.isConnected()){ channel.close(); logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ":未检测到登陆时间... ..."); }else { logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."); } }catch (Exception e){ logger.error("通道关闭时发生了异常",e); } return true; } @Override public void run() { logger.info("当前连接数"+connectedSokectList.size()); if(connectedSokectList.isEmpty()){ return; } Iterator<SocketChannel> iterator = connectedSokectList.iterator(); while (iterator.hasNext()){ SocketChannel socketChannel=iterator.next(); Boolean removeFlag=handleSocket(socketChannel); if(removeFlag){ iterator.remove(); } } } }