zoukankan      html  css  js  c++  java
  • golang 典型并发任务

    仅执行一次

    场景

    懒汉式,线程安全

    适用于只执行一次的任务,比如加载配置文件。

    code

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    func init() {
    	rand.Seed(time.Now().UnixNano())
    }
    
    func main() {
    	var once sync.Once
    	for i := 0; i < 10; i++ {
    		once.Do(func() {
    			num := rand.Intn(10)
    			fmt.Println(num)
    		})
    	}
    }
    

    仅需任意任务完成

    场景

    这里所有任务都完成了,但是只用了最快的一个结果,所以是所有任务都完成了;

    当有一个任务完成时,取消其他任务,因为任务都是有开销的。

    code

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    func runTask(id int) string {
    	time.Sleep(10 * time.Millisecond)
    	return fmt.Sprintf("The result is from %d", id)
    }
    
    func firstResponse() string {
    	numOfRunner := 10
    	// 使用带缓存的channel,让goroutines不会堵塞
    	ch := make(chan string, numOfRunner)
    	for i := 0; i < numOfRunner; i++ {
    		go func(i int) {
    			ret := runTask(i)
    			ch <- ret
    		}(i)
    	}
    	return <-ch
    }
    
    func main() {
    	fmt.Println("Before:", runtime.NumGoroutine())
    	fmt.Println(firstResponse())
    	time.Sleep(time.Second * 1)
    	fmt.Println("After:", runtime.NumGoroutine())
    
    }
    

    所有任务都完成

    基于CSP实现

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	var mutex sync.Mutex
    	max := 10000
    	ch := make(chan int, max)
    	for i := 0; i < max; i++ {
    		go func() {
    			mutex.Lock()
    			ch <- 1
    			defer func() {
    				mutex.Unlock()
    			}()
    		}()
    	}
    	counter := 0
    	for i := 0; i < max; i++ {
    		counter += <-ch
    	}
    	fmt.Println("counter:", counter)
    }
    

    waitgroup实现

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	var mutex sync.Mutex
    	var wg sync.WaitGroup
    	counter := 0
    	for i := 0; i < 10000; i++ {
    		wg.Add(1) // 每启动一个协程都新增加一个等待
    		go func() {
    			mutex.Lock()
    			defer func() {
    				mutex.Unlock()
    				wg.Done()
    			}()
    			counter++
    		}(i)
    	}
    	wg.Wait()
    	fmt.Println("counter:", counter)
    }
    

    对象池

    适合通过复用降低复杂对象的创建和GC的代价

    协程安全,会有锁的开销

    生命周期受GC影响,不适合做连接池等需要自己管理生命周期的资源的池化。

    code

    基于buffered channel实现对象池,取用完后放回channel

    package object_pool
    
    import (
    	"errors"
    	"fmt"
    	"testing"
    	"time"
    )
    
    type ReusableObject struct {
    	token int
    }
    
    type ObjectPool struct {
    	bufChan chan *ReusableObject //用于缓冲可重用对象
    }
    
    func NewObjectPool(numOfObject int) *ObjectPool {
    	objectPool := ObjectPool{}
    	objectPool.bufChan = make(chan *ReusableObject, numOfObject)
    	for i := 0; i < numOfObject; i++ {
    		objectPool.bufChan <- &ReusableObject{
    			token: i,
    		}
    	}
    	return &objectPool
    }
    
    func (pool *ObjectPool) GetObject(timeout time.Duration) (*ReusableObject, error) {
    	select {
    	case ret := <-pool.bufChan:
    		return ret, nil
    	case <-time.After(timeout): //超时控制
    		return nil, errors.New("time out")
    	}
    
    }
    
    func (pool *ObjectPool) ReleaseObject(object *ReusableObject) error {
    	select {
    	case pool.bufChan <- object:
    		return nil
    	default:
    		return errors.New("overflow")
    	}
    }
    
    func TestObjPool(t *testing.T) {
    	pool := NewObjectPool(10)
    	// 创建对象池后,对象池是满的
    	if err := pool.ReleaseObject(&ReusableObject{}); err != nil { //尝试放置超出池大小的对象
    		t.Error(err)
    	}
    	for i := 0; i < 11; i++ {
    		if v, err := pool.GetObject(time.Second); err != nil {
    			t.Error(err)
    		} else {
    			fmt.Printf("%T %d
    ", v, v.token)
    			if err := pool.ReleaseObject(v); err != nil {
    				t.Error(err)
    			}
    		}
    
    	}
    
    	fmt.Println("Done")
    }
    

    sync.pool 对象生命周期

    • gc会清除sync.pool缓存的对象

    • 对象的有效期是下次gc前 --> gc 执行的时机是什么?

    带来的思考

    每次获取对象,可能会受锁的限制,所以是创建对象的开销大,还是锁带来的开销大需要根据实际情况权衡。

    code

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"sync"
    )
    
    func SyncPool() {
    	pool := &sync.Pool{
            // 当对象池为空时,调用get时会自动New创建一个新的对象,可以理解为默认对象
    		New: func() interface{} {
    			fmt.Println("Create a new object.")
    			return 100
    		},
    	}
    
    	v := pool.Get().(int)
    	fmt.Println(v)
    	pool.Put(3)
    	runtime.GC() //GC 会清除sync.pool中缓存的对象
    	v1, _ := pool.Get().(int)
    	fmt.Println(v1)
    }
    
    func SyncPoolInMultiGoroutine() {
    	pool := &sync.Pool{
    		New: func() interface{} {
    			fmt.Println("Create a new object.")
    			return 10
    		},
    	}
    
    	pool.Put(1)
    	pool.Put(2)
    	pool.Put(3)
    	pool.Put(4)
    
    	var wg sync.WaitGroup
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func(id int) {
    			fmt.Println(pool.Get())
    			wg.Done()
    		}(i)
    	}
    	wg.Wait()
    }
    
    func main() {
    	//SyncPool()
    	SyncPoolInMultiGoroutine()
    }
    
  • 相关阅读:
    mac github访问不稳定解决办法
    前后端项目部署-2, flask+Gunicorn+gevent,docker部署,
    前后端项目部署-1, flask,只有flask,一个最简单的flask项目部署到Docker的过程,
    mac,VMware,安装centos7,虚拟机,无界面
    深网简介
    mitmproxy抓包工具使用 for mac
    安卓抓取的另外一种思路
    安卓手机设置权限,adb pull permission denied解决办法
    爬虫工程师到底为什么要学习安卓逆向?
    爬虫工程师日常都是做什么。
  • 原文地址:https://www.cnblogs.com/hiyang/p/13055137.html
Copyright © 2011-2022 走看看