zoukankan      html  css  js  c++  java
  • Netty服务器连接池管理设计思路

    应用场景:

    在RPC框架中,使用Netty作为高性能的网络通信框架时,每一次服务调用,都需要与Netty服务端建立连接的话,很容易导致Netty服务器资源耗尽。所以,想到连接池技术,将与同一个Netty服务器地址建立的连接放入池中维护,同一个地址的连接确保只建立一次。这样,凡是连接同一个Netty服务器的客户端,拿到的都是同一个连接,不需要新建连接,就可以大大减少连接的个数,从而大幅度提升服务器性能。

    用一张图说明一下设计思路:

    解释一下,途中主要定义了两个类,ConnectClient是负责管理连接的,是一个抽象类,init和send是抽象方法,而NettyClient是负责与Netty服务器通信的,它继承了ConnectClient,并实现了它的几个抽象方法,init方法中会与Netty服务器建立连接,send方法会向Netty服务器发送消息。

    值得注意的是,ConnectClient类中有一个非抽象方法,就是asyncSend(),它里面调用了自己的send()抽象方法。抽象方法不能直接调用,必须拿到NettyClient这个具体实现类的实例对象才能调。这个asyncSend()方法有一个关键的入参,即Class<? extends ConnectClient> netImpl,它是一个Class类型,是从外部传进来的,所以这就比较有灵活性了,好处就是这个ConnectClient类不需要依赖任何具体的实现,只要传进一个自己的子类的Class即可,它就可以用这个Class通过反射的方式创建出具体的实现类的实例对象,然后调用其send方法。可以理解成ConnectClient用asyncSend方法包装了NettyClient的send方法,目的是让外部不要直接调用NettyClient中的send方法,而是调用自己的asyncSend方法,然后在这个asyncSend方法中,会先获取连接,再调用NettyClient中的send方法发送消息。

    模拟代码:

    下面就通过几段代码来模拟Tcp客户端和Tcp服务器端建立连接并发送消息的场景。
    这里并没有真的使用Netty框架,因为本文不是讲怎么使用Netty框架,而是分享如何管理连接。

    首先,模拟一个Tcp服务端程序(就当做是Netty的服务器):

     1 /**
     2  * 模拟TCP服务端
     3  *
     4  * @author syj
     5  */
     6 public class NetChannel {
     7 
     8     /**
     9      * 建立连接
    10      *
    11      * @param host
    12      * @param port
    13      */
    14     public void connect(String host, int port) {
    15         System.out.println("模拟连接TCP服务器成功: host=" + host + ",port=" + port);
    16     }
    17 
    18     /**
    19      * 发送消息
    20      *
    21      * @param msg
    22      */
    23     public void send(String msg) {
    24         System.out.println("模拟向TCP服务器发送消息成功:" + msg);
    25     }
    26 }

    定义一个Tcp客户端,负责与Netty服务器通信:

     1 /**
     2  * 模拟TCP客户端
     3  *
     4  * @author syj
     5  */
     6 public class NetClient extends ConnectClient {
     7 
     8     // 模拟TCP服务器
     9     private NetChannel channel;
    10 
    11     /**
    12      * 建立连接
    13      *
    14      * @param address 格式 host:port, 例如 192.168.1.103:9999
    15      * @throws Exception
    16      */
    17     @Override
    18     public void init(String address) throws Exception {
    19         if (address == null || address.trim().length() == 0) {
    20             throw new RuntimeException(">>>> address error");
    21         }
    22         String[] split = address.split(":");
    23         if (split.length != 2) {
    24             throw new RuntimeException(">>>> address error");
    25         }
    26         String host = split[0];
    27         int port = Integer.valueOf(split[1]);
    28         channel = new NetChannel();
    29         channel.connect(host, port);
    30     }
    31 
    32     /**
    33      * 发送消息
    34      *
    35      * @param msg
    36      * @throws Exception
    37      */
    38     @Override
    39     public void send(String msg) throws Exception {
    40         channel.send(msg);
    41     }
    42 }

    连接管理类:

    该类使用一个ConcurrentHashMap作为连接池,来保存与TCP服务器建立的连接,key是TCP服务器的地址,value是连接对象。

    由于是多线程环境,为保证线程安全问题,使用synchronized加锁,避免一个连接被创建多次。

    由于可能会有很多针对同一个TCP服务器的连接请求,使用lockClientMap来管理锁,同一个TCP服务器的请求使用同一把锁,保证同一个TCP服务器的连接只创建一次。

    这样既保证了线程安全,又能降低性能消耗。

     1 import java.util.concurrent.ConcurrentHashMap;
     2 
     3 /**
     4  * TCP连接管理
     5  *
     6  * @author syj
     7  */
     8 public abstract class ConnectClient {
     9 
    10     /**
    11      * 建立连接
    12      *
    13      * @param address
    14      * @throws Exception
    15      */
    16     public abstract void init(String address) throws Exception;
    17 
    18     /**
    19      * 发送消息
    20      *
    21      * @param msg
    22      * @throws Exception
    23      */
    24     public abstract void send(String msg) throws Exception;
    25 
    26     /**
    27      * 发送消息
    28      *
    29      * @param address
    30      * @param msg
    31      * @param netImpl
    32      * @throws Exception
    33      */
    34     public static void asyncSend(String address, String msg, Class<? extends ConnectClient> netImpl) throws Exception {
    35         ConnectClient connect = ConnectClient.getConnect(address, netImpl);
    36         connect.send(msg);
    37     }
    38 
    39     // 连接池
    40     private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap;
    41     //
    42     private static volatile ConcurrentHashMap<String, Object> lockClientMap = new ConcurrentHashMap<>();
    43 
    44     /**
    45      * 获取连接
    46      * 确保同一个TCP服务器地址对应的连接只建立一次
    47      *
    48      * @param netImpl
    49      * @return
    50      * @throws Exception
    51      */
    52     public static ConnectClient getConnect(String address, Class<? extends ConnectClient> netImpl) throws Exception {
    53         // 创建连接池
    54         if (connectClientMap == null) {
    55             synchronized (ConnectClient.class) {
    56                 if (connectClientMap == null) {
    57                     connectClientMap = new ConcurrentHashMap<>();
    58                 }
    59             }
    60         }
    61 
    62         // 获取连接
    63         ConnectClient connectClient = connectClientMap.get(address);
    64         if (connectClient != null) {
    65             return connectClient;
    66         }
    67 
    68         // 获取锁,同一个地址使用同一把锁
    69         Object lock = lockClientMap.get(address);
    70         if (lock == null) {
    71             lockClientMap.putIfAbsent(address, new Object());
    72             lock = lockClientMap.get(address);
    73         }
    74         synchronized (lock) {
    75             connectClient = connectClientMap.get(address);
    76             if (connectClient != null) {
    77                 return connectClient;
    78             }
    79 
    80             // 新建连接
    81             ConnectClient client = netImpl.newInstance();
    82             client.init(address);
    83             // 放入连接池
    84             connectClientMap.put(address, client);
    85         }
    86         connectClient = connectClientMap.get(address);
    87         return connectClient;
    88     }
    89 }

    任务类用于并发连接测试:

     1 import java.util.UUID;
     2 
     3 /**
     4  * 任务
     5  *
     6  * @author syj
     7  */
     8 public class Task implements Runnable {
     9 
    10     private Class<? extends ConnectClient> netType;// 客户端类型
    11     private String address;
    12     private long count;
    13 
    14     public Task(String address, long count, Class<? extends ConnectClient> netType) {
    15         this.address = address;
    16         this.count = count;
    17         this.netType = netType;
    18     }
    19 
    20     @Override
    21     public void run() {
    22         try {
    23             String uuid = UUID.randomUUID().toString().replace("-", "");
    24             String msg = String.format("%s 	 %s 	 %s 	 %s", Thread.currentThread().getName(), count, address, uuid);
    25             ConnectClient.asyncSend(address, msg, netType);
    26             // 模拟业务耗时
    27             Thread.sleep((long) (Math.random() * 1000));
    28         } catch (Exception e) {
    29             e.printStackTrace();
    30         }
    31     }
    32 }

    测试类(模拟了10个TCP服务器的地址和端口):

    通过一个死循环来模拟测试高并发场景下,连接的线程安全和性能表现。

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3 
     4 /**
     5  * 模拟TCP客户端并发获取连接发送消息
     6  *
     7  * @author syj
     8  */
     9 public class App {
    10 
    11     // TCP服务器通信地址和端口
    12     public static final String[] NET_ADDRESS_ARR = {
    13             "192.168.1.101:9999",
    14             "192.168.1.102:9999",
    15             "192.168.1.103:9999",
    16             "192.168.1.104:9999",
    17             "192.168.1.105:9999",
    18             "192.168.1.106:9999",
    19             "192.168.1.107:9999",
    20             "192.168.1.108:9999",
    21             "192.168.1.109:9999",
    22             "192.168.1.110:9999"
    23     };
    24 
    25     public static ExecutorService executorService = Executors.newCachedThreadPool();
    26     public static volatile long count;// 统计任务执行总数
    27 
    28     public static void main(String[] args) {
    29         while (true) {
    30             try {
    31                 Thread.sleep(5);// 防止 CPU 100%
    32             } catch (InterruptedException e) {
    33                 e.printStackTrace();
    34             }
    35             executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));
    36             executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));
    37         }
    38     }
    39 }

    测试结果:

     1 模拟连接TCP服务器成功: host=192.168.1.107,port=9999
     2 模拟向TCP服务器发送消息成功:pool-1-thread-14      14      192.168.1.107:9999      3f31022b959e4962b00b0719fa206416
     3 模拟连接TCP服务器成功: host=192.168.1.108,port=9999
     4 模拟向TCP服务器发送消息成功:pool-1-thread-37      37      192.168.1.108:9999      2e4e4c6db63145f190f76d1dbe59f1c4
     5 模拟连接TCP服务器成功: host=192.168.1.106,port=9999
     6 模拟向TCP服务器发送消息成功:pool-1-thread-49      49      192.168.1.106:9999      e50ea4937c1c4425b647e4606ced7a1f
     7 模拟连接TCP服务器成功: host=192.168.1.103,port=9999
     8 模拟向TCP服务器发送消息成功:pool-1-thread-17      17      192.168.1.103:9999      21cfcd3665aa4688aea0ac90b68e5a22
     9 模拟连接TCP服务器成功: host=192.168.1.102,port=9999
    10 模拟向TCP服务器发送消息成功:pool-1-thread-25      25      192.168.1.102:9999      bbdbde3e28ab4ac0901c1447ac3ddd3f
    11 模拟连接TCP服务器成功: host=192.168.1.101,port=9999
    12 模拟向TCP服务器发送消息成功:pool-1-thread-10      10      192.168.1.101:9999      08cc445cc06a44f5823a8487d05e3e30
    13 模拟连接TCP服务器成功: host=192.168.1.105,port=9999
    14 模拟向TCP服务器发送消息成功:pool-1-thread-45      45      192.168.1.105:9999      3e925cf96b874ba09c59e63613e60662
    15 模拟连接TCP服务器成功: host=192.168.1.104,port=9999
    16 模拟向TCP服务器发送消息成功:pool-1-thread-53      53      192.168.1.104:9999      2408dab5c0ca480b8c2593311f3ec7d5
    17 模拟向TCP服务器发送消息成功:pool-1-thread-13      13      192.168.1.105:9999      5a3c0f86046f4cb99986d0281e567e31
    18 模拟向TCP服务器发送消息成功:pool-1-thread-36      36      192.168.1.107:9999      b85d9d79461d4345a2da8f8dd00a572a
    19 模拟向TCP服务器发送消息成功:pool-1-thread-9      9      192.168.1.102:9999      c2895f68a33745d7a4370034b6474461
    20 模拟向TCP服务器发送消息成功:pool-1-thread-41      41      192.168.1.102:9999      a303193a58204e7fadaf64cec8eaa86d
    21 模拟向TCP服务器发送消息成功:pool-1-thread-59      59      192.168.1.101:9999      08785c0acfc14c618cf3762d35055e9b
    22 模拟向TCP服务器发送消息成功:pool-1-thread-54      54      192.168.1.107:9999      6fa8e3939a904271b03b78204d4a146a
    23 模拟向TCP服务器发送消息成功:pool-1-thread-15      15      192.168.1.102:9999      229989d1405b49cdb31052b081a33869
    24 模拟向TCP服务器发送消息成功:pool-1-thread-7      7      192.168.1.107:9999      8e3c8d1007a34a01b166101fae30449c
    25 模拟连接TCP服务器成功: host=192.168.1.109,port=9999
    26 模拟向TCP服务器发送消息成功:pool-1-thread-8      8      192.168.1.109:9999      ca63dd93685641d19c875e4809e9a8dc
    27 模拟向TCP服务器发送消息成功:pool-1-thread-1      1      192.168.1.106:9999      cd9f473797de46ef8361f3b8b0a6d575
    28 模拟向TCP服务器发送消息成功:pool-1-thread-27      27      192.168.1.102:9999      872d825fd64e409d8b992e12e0372daa
    29 模拟向TCP服务器发送消息成功:pool-1-thread-3      3      192.168.1.103:9999      baace7f8f06242f68cac0c43337e49cf
    30 模拟向TCP服务器发送消息成功:pool-1-thread-39      39      192.168.1.108:9999      bc0d70348f574cbba449496b3142e518
    31 模拟向TCP服务器发送消息成功:pool-1-thread-55      55      192.168.1.106:9999      95ba7c57a1d84c18a6ab328eb01e85f1
    32 模拟向TCP服务器发送消息成功:pool-1-thread-38      38      192.168.1.108:9999      a571001c573c4851a4bb1e0dcb9a204a
    33 模拟向TCP服务器发送消息成功:pool-1-thread-4      4      192.168.1.104:9999      dcdd6093afc345e39453883cf049fa21
    34 模拟向TCP服务器发送消息成功:pool-1-thread-28      28      192.168.1.106:9999      0ba4362898f84335bb336d17780855fc
    35 模拟向TCP服务器发送消息成功:pool-1-thread-47      47      192.168.1.108:9999      db993121a9934558942a09a9d9a8e03f
    36 模拟向TCP服务器发送消息成功:pool-1-thread-30      30      192.168.1.102:9999      a0e50592deca471b9c5982c83d00f303
    37 模拟连接TCP服务器成功: host=192.168.1.110,port=9999
    38 模拟向TCP服务器发送消息成功:pool-1-thread-51      51      192.168.1.110:9999      41703aba37ca47148d23d6826264a05a
    39 模拟向TCP服务器发送消息成功:pool-1-thread-5      5      192.168.1.102:9999      15f453cc0a7743f79dc105963f39f946
    40 模拟向TCP服务器发送消息成功:pool-1-thread-52      52      192.168.1.105:9999      9ca521963bf84c418335e7702e471fa9
    41 模拟向TCP服务器发送消息成功:pool-1-thread-40      40      192.168.1.101:9999      bec1d265b7dc46f5afebc42fea10a313
    42 模拟向TCP服务器发送消息成功:pool-1-thread-26      26      192.168.1.104:9999      a44662dc498045e78eb531b6ee6fc27b
    43 模拟向TCP服务器发送消息成功:pool-1-thread-11      11      192.168.1.109:9999      6104c4fd2dab4d44af86f0cd1e3e272d
    44 模拟向TCP服务器发送消息成功:pool-1-thread-24      24      192.168.1.105:9999      344025da2a6c4190a87403c5d96b321e
    45 模拟向TCP服务器发送消息成功:pool-1-thread-22      22      192.168.1.110:9999      aa0b4c48527446738d28e99eef4957f5
    46 模拟向TCP服务器发送消息成功:pool-1-thread-23      23      192.168.1.107:9999      79f5fc4278164cd68ac1a260322e6f68
    47 模拟向TCP服务器发送消息成功:pool-1-thread-56      56      192.168.1.109:9999      39c38939ced140058f25fe903a3b1f4f
    48 模拟向TCP服务器发送消息成功:pool-1-thread-18      18      192.168.1.109:9999      c29ea09b5f264b488f3e15e91c5f2bd5

    可见,与每个TCP服务器的连接只会建立一次,连接得到复用。

  • 相关阅读:
    HDNOIP普及+提高整合
    [BZOJ4016][FJOI2014]最短路径树问题
    [BZOJ3697]采药人的路径
    [COJ0985]WZJ的数据结构(负十五)
    [KOJ6024]合并果子·改(强化版)
    [KOJ6023]合并果子·改
    [KOJ0574NOIP200406合并果子]
    Atomic operations on the x86 processors
    Javascript 严格模式详解
    const C语言(转)
  • 原文地址:https://www.cnblogs.com/jun1019/p/10963627.html
Copyright © 2011-2022 走看看