zoukankan      html  css  js  c++  java
  • golang 创建一个简单的连接池,减少频繁的创建与关闭

    一、连接池的描述图片如下:

    二、连接池代码如下:

    package main;
    
    import (
    	"time"
    	"sync"
    	"errors"
    	"net"
    	"fmt"
    )
    
    //频繁的创建和关闭连接,对系统会造成很大负担
    //所以我们需要一个池子,里面事先创建好固定数量的连接资源,需要时就取,不需要就放回池中。
    //但是连接资源有一个特点,我们无法保证连接长时间会有效。
    //比如,网络原因,人为原因等都会导致连接失效。
    //所以我们设置一个超时时间,如果连接时间与当前时间相差超过超时时间,那么就关闭连接。
    
    //只要类型实现了ConnRes接口中的方法,就认为是一个连接资源类型
    type ConnRes interface {
    	Close() error;
    }
    
    //工厂方法,用于创建连接资源
    type Factory func() (ConnRes, error)
    
    //连接
    type Conn struct {
    	conn ConnRes;
    	//连接时间
    	time time.Time;
    }
    
    //连接池
    type ConnPool struct {
    	//互斥锁,保证资源安全
    	mu sync.Mutex;
    	//通道,保存所有连接资源
    	conns chan *Conn;
    	//工厂方法,创建连接资源
    	factory Factory;
    	//判断池是否关闭
    	closed bool;
    	//连接超时时间
    	connTimeOut time.Duration;
    }
    
    //创建一个连接资源池
    func NewConnPool(factory Factory, cap int, connTimeOut time.Duration) (*ConnPool, error) {
    	if cap <= 0 {
    		return nil, errors.New("cap不能小于0");
    	}
    	if connTimeOut <= 0 {
    		return nil, errors.New("connTimeOut不能小于0");
    	}
    
    	cp := &ConnPool{
    		mu:          sync.Mutex{},
    		conns:       make(chan *Conn, cap),
    		factory:     factory,
    		closed:      false,
    		connTimeOut: connTimeOut,
    	};
    	for i := 0; i < cap; i++ {
    		//通过工厂方法创建连接资源
    		connRes, err := cp.factory();
    		if err != nil {
    			cp.Close();
    			return nil, errors.New("factory出错");
    		}
    		//将连接资源插入通道中
    		cp.conns <- &Conn{conn: connRes, time: time.Now()};
    	}
    
    	return cp, nil;
    }
    
    //获取连接资源
    func (cp *ConnPool) Get() (ConnRes, error) {
    	if cp.closed {
    		return nil, errors.New("连接池已关闭");
    	}
    
    	for {
    		select {
    		//从通道中获取连接资源
    		case connRes, ok := <-cp.conns:
    			{
    				if !ok {
    					return nil, errors.New("连接池已关闭");
    				}
    				//判断连接中的时间,如果超时,则关闭
    				//继续获取
    				if time.Now().Sub(connRes.time) > cp.connTimeOut {
    					connRes.conn.Close();
    					continue;
    				}
    				return connRes.conn, nil;
    			}
    		default:
    			{
    				//如果无法从通道中获取资源,则重新创建一个资源返回
    				connRes, err := cp.factory();
    				if err != nil {
    					return nil, err;
    				}
    				return connRes, nil;
    			}
    		}
    	}
    }
    
    //连接资源放回池中
    func (cp *ConnPool) Put(conn ConnRes) error {
    	if cp.closed {
    		return errors.New("连接池已关闭");
    	}
    
    	select {
    	//向通道中加入连接资源
    	case cp.conns <- &Conn{conn: conn, time: time.Now()}:
    		{
    			return nil;
    		}
    	default:
    		{
    			//如果无法加入,则关闭连接
    			conn.Close();
    			return errors.New("连接池已满");
    		}
    	}
    }
    
    //关闭连接池
    func (cp *ConnPool) Close() {
    	if cp.closed {
    		return;
    	}
    	cp.mu.Lock();
    	cp.closed = true;
    	//关闭通道
    	close(cp.conns);
    	//循环关闭通道中的连接
    	for conn := range cp.conns {
    		conn.conn.Close();
    	}
    	cp.mu.Unlock();
    }
    
    //返回池中通道的长度
    func (cp *ConnPool) len() int {
    	return len(cp.conns);
    }
    
    func main() {
    
    	cp, _ := NewConnPool(func() (ConnRes, error) {
    		return net.Dial("tcp", ":8080");
    	}, 10, time.Second*10);
    
    	//获取资源
    	conn1, _ := cp.Get();
    	conn2, _ := cp.Get();
    
    	//这里连接池中资源大小为8
    	fmt.Println("cp len : ", cp.len());
    	conn1.(net.Conn).Write([]byte("hello"));
    	conn2.(net.Conn).Write([]byte("world"));
    	buf := make([]byte, 1024);
    	n, _ := conn1.(net.Conn).Read(buf);
    	fmt.Println("conn1 read : ", string(buf[:n]));
    	n, _ = conn2.(net.Conn).Read(buf);
    	fmt.Println("conn2 read : ", string(buf[:n]));
    
    	//等待15秒
    	time.Sleep(time.Second * 15);
    	//我们再从池中获取资源
    	conn3, _ := cp.Get();
    	//这里显示为0,因为池中的连接资源都超时了
    	fmt.Println("cp len : ", cp.len());
    	conn3.(net.Conn).Write([]byte("test"));
    	n, _ = conn3.(net.Conn).Read(buf);
    	fmt.Println("conn3 read : ", string(buf[:n]));
    
    	//把三个连接资源放回池中
    	cp.Put(conn1);
    	cp.Put(conn2);
    	cp.Put(conn3);
    	//这里显示为3
    	fmt.Println("cp len : ", cp.len());
    	cp.Close();
    }

    三、8080服务端代码如下:

    package main;
    
    import (
    	"net"
    	"io"
    	"log"
    )
    
    func handler(conn net.Conn) {
    	for {
    		io.Copy(conn, conn);
    	}
    }
    
    func main() {
    	lis, err := net.Listen("tcp", ":8080");
    	if err != nil {
    		log.Fatal(err);
    	}
    
    	for {
    		conn, err := lis.Accept();
    		if err != nil {
    			continue;
    		}
    		go handler(conn);
    	}
    }

    测试结果如下:

  • 相关阅读:
    大数据-storm理论
    大数据-hadoop理论
    大数据-spark理论(3)sparkSql,sparkStreaming,spark调优
    大数据-spark理论(2)算子,shuffle优化
    大数据-spark理论(1)初识,原理,搭建
    Linux Bash 文件加载顺序
    Git Flow 自动化发布
    IntelliJ 报错 Error occurred during initialization of VM
    Jenkins Pipeline Build 不下载最新的包
    【从零单排】Java 8 实战演练
  • 原文地址:https://www.cnblogs.com/jkko123/p/7235257.html
Copyright © 2011-2022 走看看