zoukankan      html  css  js  c++  java
  • golang 典型并发任务

    仅执行一次

    场景

    懒汉式,线程安全

    适用于只执行一次的任务,比如加载配置文件。

    code

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    func init() {
    	rand.Seed(time.Now().UnixNano())
    }
    
    func main() {
    	var once sync.Once
    	for i := 0; i < 10; i++ {
    		once.Do(func() {
    			num := rand.Intn(10)
    			fmt.Println(num)
    		})
    	}
    }
    

    仅需任意任务完成

    场景

    这里所有任务都完成了,但是只用了最快的一个结果,所以是所有任务都完成了;

    当有一个任务完成时,取消其他任务,因为任务都是有开销的。

    code

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    func runTask(id int) string {
    	time.Sleep(10 * time.Millisecond)
    	return fmt.Sprintf("The result is from %d", id)
    }
    
    func firstResponse() string {
    	numOfRunner := 10
    	// 使用带缓存的channel,让goroutines不会堵塞
    	ch := make(chan string, numOfRunner)
    	for i := 0; i < numOfRunner; i++ {
    		go func(i int) {
    			ret := runTask(i)
    			ch <- ret
    		}(i)
    	}
    	return <-ch
    }
    
    func main() {
    	fmt.Println("Before:", runtime.NumGoroutine())
    	fmt.Println(firstResponse())
    	time.Sleep(time.Second * 1)
    	fmt.Println("After:", runtime.NumGoroutine())
    
    }
    

    所有任务都完成

    基于CSP实现

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	var mutex sync.Mutex
    	max := 10000
    	ch := make(chan int, max)
    	for i := 0; i < max; i++ {
    		go func() {
    			mutex.Lock()
    			ch <- 1
    			defer func() {
    				mutex.Unlock()
    			}()
    		}()
    	}
    	counter := 0
    	for i := 0; i < max; i++ {
    		counter += <-ch
    	}
    	fmt.Println("counter:", counter)
    }
    

    waitgroup实现

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	var mutex sync.Mutex
    	var wg sync.WaitGroup
    	counter := 0
    	for i := 0; i < 10000; i++ {
    		wg.Add(1) // 每启动一个协程都新增加一个等待
    		go func() {
    			mutex.Lock()
    			defer func() {
    				mutex.Unlock()
    				wg.Done()
    			}()
    			counter++
    		}(i)
    	}
    	wg.Wait()
    	fmt.Println("counter:", counter)
    }
    

    对象池

    适合通过复用降低复杂对象的创建和GC的代价

    协程安全,会有锁的开销

    生命周期受GC影响,不适合做连接池等需要自己管理生命周期的资源的池化。

    code

    基于buffered channel实现对象池,取用完后放回channel

    package object_pool
    
    import (
    	"errors"
    	"fmt"
    	"testing"
    	"time"
    )
    
    type ReusableObject struct {
    	token int
    }
    
    type ObjectPool struct {
    	bufChan chan *ReusableObject //用于缓冲可重用对象
    }
    
    func NewObjectPool(numOfObject int) *ObjectPool {
    	objectPool := ObjectPool{}
    	objectPool.bufChan = make(chan *ReusableObject, numOfObject)
    	for i := 0; i < numOfObject; i++ {
    		objectPool.bufChan <- &ReusableObject{
    			token: i,
    		}
    	}
    	return &objectPool
    }
    
    func (pool *ObjectPool) GetObject(timeout time.Duration) (*ReusableObject, error) {
    	select {
    	case ret := <-pool.bufChan:
    		return ret, nil
    	case <-time.After(timeout): //超时控制
    		return nil, errors.New("time out")
    	}
    
    }
    
    func (pool *ObjectPool) ReleaseObject(object *ReusableObject) error {
    	select {
    	case pool.bufChan <- object:
    		return nil
    	default:
    		return errors.New("overflow")
    	}
    }
    
    func TestObjPool(t *testing.T) {
    	pool := NewObjectPool(10)
    	// 创建对象池后,对象池是满的
    	if err := pool.ReleaseObject(&ReusableObject{}); err != nil { //尝试放置超出池大小的对象
    		t.Error(err)
    	}
    	for i := 0; i < 11; i++ {
    		if v, err := pool.GetObject(time.Second); err != nil {
    			t.Error(err)
    		} else {
    			fmt.Printf("%T %d
    ", v, v.token)
    			if err := pool.ReleaseObject(v); err != nil {
    				t.Error(err)
    			}
    		}
    
    	}
    
    	fmt.Println("Done")
    }
    

    sync.pool 对象生命周期

    • gc会清除sync.pool缓存的对象

    • 对象的有效期是下次gc前 --> gc 执行的时机是什么?

    带来的思考

    每次获取对象,可能会受锁的限制,所以是创建对象的开销大,还是锁带来的开销大需要根据实际情况权衡。

    code

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"sync"
    )
    
    func SyncPool() {
    	pool := &sync.Pool{
            // 当对象池为空时,调用get时会自动New创建一个新的对象,可以理解为默认对象
    		New: func() interface{} {
    			fmt.Println("Create a new object.")
    			return 100
    		},
    	}
    
    	v := pool.Get().(int)
    	fmt.Println(v)
    	pool.Put(3)
    	runtime.GC() //GC 会清除sync.pool中缓存的对象
    	v1, _ := pool.Get().(int)
    	fmt.Println(v1)
    }
    
    func SyncPoolInMultiGoroutine() {
    	pool := &sync.Pool{
    		New: func() interface{} {
    			fmt.Println("Create a new object.")
    			return 10
    		},
    	}
    
    	pool.Put(1)
    	pool.Put(2)
    	pool.Put(3)
    	pool.Put(4)
    
    	var wg sync.WaitGroup
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func(id int) {
    			fmt.Println(pool.Get())
    			wg.Done()
    		}(i)
    	}
    	wg.Wait()
    }
    
    func main() {
    	//SyncPool()
    	SyncPoolInMultiGoroutine()
    }
    
  • 相关阅读:
    RecyclerView 数据刷新的几种方式 局部刷新 notify MD
    【图片】批量获取几万张图片
    RV BaseRecyclerViewAdapterHelper 总结 MD
    RecyclerView.ItemDecoration 间隔线
    Kotlin【简介】Android开发 配置 扩展
    Kotlin 特性 语法糖 优势 扩展 高阶 MD
    一个十分简洁实用的MD风格的UI主框架
    折叠伸缩工具栏 CollapsingToolbarLayout
    FloatingActionButton FAB 悬浮按钮
    Glide Picasso Fresco UIL 图片框架 缓存 MD
  • 原文地址:https://www.cnblogs.com/hiyang/p/13055137.html
Copyright © 2011-2022 走看看