zoukankan      html  css  js  c++  java
  • Netty client 多路复用 连接池

    有两种方式:

    1)使用netty自带的

    Netty自带连接池的使用

    /Users/joyce/work/jds/trade/trade-shenjinrong/jincePfyhServer com.jincetrade.pfyhserver.client.PfyhClientPool

    2)自己for循环

    教你正确地利用Netty建立连接池

    演变:

    a)

    for (

    bootstrap.group(eventLoopGroup).connect

    )

    问题很明显,如果每一个channel都对应一个NIOEventLoopGroup,那么我们实际上构建了一个connection:thread = 1:1的模型,随着连接数不断地扩大,线程膨胀的问题就会突显出来。

     而nio本身是一个线程处理所有连接

    b)

    bootstrap.group(eventLoopGroup)

    for(

    bootstrap.connect

    )

    本文在2.b方式上结合apache common pool 2 实现

    package com.jds.test.proto;
    
    /**
     * Created by sunyuming on 19/8/13.
     */
    
    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     *
     * EWrapper连接池工厂
     * @author shane
     *
     */
    public class ClientPool extends BasePooledObjectFactory<Channel>{
    
        private String ip;
        private int port;
    
        public ClientPool() {
            ip = "127.0.0.1";
            port = 8866;
        }
    
        public static void main(String[] args) throws Exception {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxWaitMillis(2000);
            config.setMinIdle(3);
            config.setMaxIdle(5);
            config.setMaxTotal(5);
    
            /**
             * 销毁到minidle
             */
            config.setSoftMinEvictableIdleTimeMillis(10000);
            config.setTestOnBorrow(true);
    
            /**
             * 销毁所有
             */
            //   config.setMinEvictableIdleTimeMillis(3000);
    
            // 词句是必须的,否则(Soft)MinEvictableIdleTimeMillis不生效
            config.setTimeBetweenEvictionRunsMillis(100);
    
            GenericObjectPool<Channel> objectPool = new GenericObjectPool<Channel>(new ClientPool(), config);
    
            // 先睡5s让系统补足3个对象
            Thread.sleep(5000);
            List<Channel> list = new ArrayList<>();
            for(int i=0; i<4; ++i) {
                list.add(objectPool.borrowObject());
            }
    
            for(Channel channel : list) {
                objectPool.returnObject(channel);
            }
    
            /**
             * 休息8秒等待服务端断开连接
             */
            Thread.sleep(8000);
    
            /**
             * 触发activate,validate后失败,触发create
             */
            objectPool.borrowObject();
    
        }
    
        @Override
        public Channel create() throws Exception {
            ChannelFuture future = getBoot().connect(new InetSocketAddress("127.0.0.1",8866)).sync();
            return future.channel();
        }
    
        @Override
        public PooledObject<Channel> wrap(Channel channel) {
            System.out.println("创建" + new Date());
            return new DefaultPooledObject<Channel>(channel);
        }
    
        /**
         * 对象销毁
         * @param pooledObject
         */
        @Override
        public void destroyObject(PooledObject<Channel> pooledObject) throws Exception {
            System.out.println("销毁" + new Date());
            Channel channel = pooledObject.getObject();
            channel.close();
            super.destroyObject(pooledObject);
        }
    
        /**
         * 验证对象有效性
         * @param
         * @return
         */
        @Override
        public boolean validateObject(PooledObject<Channel> pooledObject) {
            System.out.println("validate " + pooledObject.getObject().isActive() + new Date());
            return pooledObject.getObject().isActive();
        }
    
        private static Bootstrap bootstrap;
        public static Bootstrap getBoot() {
            if(bootstrap == null) {
                synchronized (ClientPool.class) {
                    if (bootstrap == null) {
                        EventLoopGroup worker = new NioEventLoopGroup();
    
                        //辅助启动类
                        bootstrap = new Bootstrap();
    
                        //设置线程池
                        bootstrap.group(worker);
    
                        //设置socket工厂
                        bootstrap.channel(NioSocketChannel.class);
    
                        //设置管道
                        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //获取管道
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4));
                                pipeline.addLast(new ProtobufDecoder(MyBaseProto.BaseProto.getDefaultInstance()));
                                pipeline.addLast(new LengthFieldPrepender(4));
                                pipeline.addLast(new ProtobufEncoder());
    
                                pipeline.addLast(new IdleStateHandler(61, 30, 0, TimeUnit.SECONDS));
                                pipeline.addLast(new ClientHeartbeatHandler());
    
                                //处理类
                                pipeline.addLast(new ClientHandler4Heart());
                            }
                        });
    
                        return bootstrap;
                    } else {
                        return bootstrap;
                    }
                }
    
            } else {
                return bootstrap;
            }
        }
    
    
    
    }
    
  • 相关阅读:
    《STL源码剖析》-- 序列式容器
    IPV6 组播学习理解
    C 语言指针 引用学习
    C++ list 源码学习
    汇编基础学习---浮点运算
    C++ 运算符重载
    C++ 迭代器学习
    Play框架连接Mysql遇到的一些问题
    Jython中文乱码问题
    多线程之生产者消费者模式
  • 原文地址:https://www.cnblogs.com/silyvin/p/11503650.html
Copyright © 2011-2022 走看看