zoukankan      html  css  js  c++  java
  • go 使用线程池做请求限流

    描述:说到请求限流,一般都会用到MQ,无论何种MQ,都需要生产者和消费者才能发挥MQ的强大作用。但在对接项目,可能就会出现对接方不能够配合使用MQ的情况。此时,使用线程池做限流也是一种可行的思路。

    流程:

    1.需手动实现一个线程池。说到线程池,要考虑的因素有:核心线程数,任务队列,最大线程数,线程空闲时间,保留策略。

    ①开启线程池,接受任务,每接受一个任务创建一条线程。

    ②当线程数达到核心线程数时,之后的任务放入任务队列中。建议使用阻塞队列,防止内存溢出。

    ③当任务队列饱和,会在线程池中创建额外的线程来处理任务,直至达到最大线程数。

    ④当在线程池中的这部分额外线程处于空闲状态,并且达到线程空闲时间的要求,这部分线程会被销毁。

    ⑤当达到最大线程数,依然有后续的任务要处理,此时就要对这部分任务的去留做出决策。提供三种保留策略:

    Ⅰ.直接丢弃,不予处理。

    Ⅱ.开辟脱离线程池的线程来处理。

    Ⅲ.将任务队列中等待时间久的任务丢弃,加入后续任务。

    2.请求限流,先要了解server的运行原理

    ①服务端需要有一个监听器用来监听请求连接。当客户端发送来一个请求,服务端会先和客户端建立tcp连接。

    ②开辟一条线程用来单独处理这条tcp连接中发送来的http请求,直至http请求读取完毕,返回响应。我们要执行请求限流的操作便在此处进行,详细操作看代码。

    //线程池
    package myroutine
    
    import (
    	"fmt"
    	"strconv"
    )
    
    /**
     * @ Author      : jgbb
     * @ Date        : Created in 2019/9/4 13:19
     * @ Description : TODO 线程池
     * @ Modified by :
     * @ Version     : 1.0
     */
    
    func Init(poolSize int,name string) *RoutinePool{
    	pool := &RoutinePool{
    		Queue:make(chan func()),
    		PoolSize:poolSize,
    		Name:name,
    	}
    	defer pool.ExeTask()
    	return pool
    }
    
    type RoutinePool struct {
    	//缓存任务
    	Queue chan func()
    	PoolSize int
    	Name string
    }
    
    
    // 添加任务到线程池
    func (pool *RoutinePool) AddTask(task func()){
    	pool.Queue <- task
    }
    
    //执行任务
    func (pool *RoutinePool) ExeTask(){
    	counter := make(chan int)
    	for i:=0;i<pool.PoolSize;i++ {
    		go func() {
    			j := <- counter//哪条线程
    			var count int64= 0//计数(线程跑了多少次)
    			var stdout =pool.Name+"	线程"+strconv.Itoa(j)+"	"
    			for task := range pool.Queue{
    				count++
    				fmt.Printf("%p	%s
    ",pool,stdout+strconv.FormatInt(count,10))
    
    				task()
    			}
    		}()
    
    		counter <- i
    	}
    }
    

      

    //对server源码修改
    const(
    DefaultPoolSize = 10
    )
    //每个请求对应的线程池
    var PoolMap  = make(map[string]*myroutine.RoutinePool)
    
    //golang源码
    func (srv *Server) Serve(l net.Listener) error {
    	defer l.Close()
    	if fn := testHookServerServe; fn != nil {
    		fn(srv, l)
    	}
    	var tempDelay time.Duration // how long to sleep on accept failure
    
    	if err := srv.setupHTTP2_Serve(); err != nil {
    		return err
    	}
    
    	srv.trackListener(l, true)
    	defer srv.trackListener(l, false)
    
    	baseCtx := context.Background() // base is always background, per Issue 16220
    	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    	for {
    		rw, e := l.Accept()
    		if e != nil {
    			select {
    			case <-srv.getDoneChan():
    				return ErrServerClosed
    			default:
    			}
    			if ne, ok := e.(net.Error); ok && ne.Temporary() {
    				if tempDelay == 0 {
    					tempDelay = 5 * time.Millisecond
    				} else {
    					tempDelay *= 2
    				}
    				if max := 1 * time.Second; tempDelay > max {
    					tempDelay = max
    				}
    				srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
    				time.Sleep(tempDelay)
    				continue
    			}
    			return e
    		}
    		tempDelay = 0
    		c := srv.newConn(rw)
                    
                    /***********************修改开始*******************************/
    		//将c.server(ctx)的处理过程放入线程池中
    		//首先需要请求path,根据path获取对应的线程池
    		c.r = &connReader{conn: c}
    		c.r.setReadLimit(c.server.initialReadLimitSize()) //若不setReadLimit,无法读取到缓冲流中的数据
    		c.bufr = newBufioReader(c.r)//用来读取流
    		s,err := c.bufr.Peek(100)//缓冲流使用peek(),游标不会进行计数,这样才能流中的数据在后面的处理中复用。否则后续读取流会从游标开始
    		news := make([]byte,0)
    		for i:=0;i<100;i++ {
    			news = append(news,s[i])
    			if s[i] == 10 {
                                    //10表示换行符,到此获取到所需信息
    				break
    			}
    		}
    		if err != nil {
    			fmt.Errorf("my err:%v",err)
    		}
    		newss := string(news)
                    //请求path当作线程池名称
    		poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")]
    
    
    		c.setState(c.rwc, StateNew) // before Serve can return
    		//go c.serve(ctx) //源码
    
    		//放入线程池处理请求
    		putPoolMap(poolName).AddTask(func() {
    			c.serve(ctx)
    		})
                    /***********************修改结束*******************************/
    	}
    }
    
    //生成线程池
    //-参数1:线程池大小
    //-参数2:线程池名称
    func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{
    	if _,ok := PoolMap[name]; !ok {
                    //如果不存在对应的线程池,则生成一个
    		PoolMap[name] = myroutine.Init(poolSize,name)
    	}
            //返回对应的线程池
    	return PoolMap[name]
    }
    
    //默认使用此方法生成线程池
    //-参数1:线程池名称
    func putPoolMap(name string) *myroutine.RoutinePool{
    	return PutPoolMap(DefaultPoolSize,name)
    }        
    

      

  • 相关阅读:
    Java连接操作redis
    redis 6.0.x简介和安装
    设计模式之代理模式(proxy)
    设计模式之组合模式(composize)
    Linux Shell脚本调试方法
    linuxcfg.sh
    反向代理和正向代理区别
    WAF与IPS的区别总结
    【LemonCK】jQuery的基本使用
    【LemonCK】CSS盒子塌陷问题
  • 原文地址:https://www.cnblogs.com/asceticmonks/p/13266310.html
Copyright © 2011-2022 走看看