zoukankan      html  css  js  c++  java
  • Go语言进阶学习笔记

    协程

    1. 创建时默认的 stack 的⼤⼩
    • JDK5 以后 Java Thread stack 默认为1M
    • Groutine 的 Stack 初始化⼤⼩为2K
    1. 和 KSE (Kernel Space Entity 系统线程) 的对应关系
    • Java Thread 是 1:1
    • Groutine 是 M:N

    Thread 切换代价大

    image

    image

    先说,协程的本质是用户态的线程,用户对其有控制权限,内存占用少,切换代价低。

    再来解释一下MPG是什么意思。

    M代表内核线程,所有的G都要放在M上才能运行。

    P代表控制器,调度G到M上,其维护了一个队列,存储了所有需要它来调度的G。

    G代表一个go routine单元。

    补充几点常见的调度策略:

    1,如果某个M陷入阻塞呢?
    当一个OS线程M由于io操作而陷入阻塞,假设此时G0正跑在了M上,那么M上绑定的P就会带着余下的所有G去寻找新的M。当M恢复过来时,一般情况下,会从别的M上拿过来一个P,并把原先跑在其上的G0放到P的队列中,从而运行G0。如果,没有拿到可用的P的话,就把G0放入到全局global runqueue队列中,使G0等待被调度,然后M进入线程缓存。所有的P也会周期性的检查global runqueue并运行其中的goroutine,否则global runqueue上的goroutine永远无法执行。

    2,如果有的M较忙,有的M较闲呢?
    此时P所分配的任务G很快就执行完了(分配不均),这就导致了这个处理器P很忙,但是其他的P还有任务。此时,P首先会去global runqueue取G。但是,如果global runqueue没有任务G了,那么P不得不从其他的P里拿一些G来执行。一般来说,如果P从其他的P那里要拿任务的话,一般就拿run queue的一半,这就确保了每个OS线程都能充分的使用。

    3,如果一个G运行时间过长,导致队列中后续G都无法运行呢?
    启动的时候,会专门创建一个线程sysmon,用来监控和管理,在内部是一个循环。首先,记录所有P的G任务计数schedtick,schedtick会在每执行一个G任务后递增。如果检查到 schedtick一直没有递增,说明这个P一直在执行同一个G任务,如果超过一定的时间(10ms),就在这个G任务的栈信息里面加一个标记。然后这个G任务在执行的时候,如果遇到非内联函数调用,就会检查一次这个标记,然后中断自己,把自己加到队列末尾,执行下一个G。如果没有遇到非内联函数(有时候正常的小函数会被优化成内联函数)调用的话,那就惨了,会一直执行这个G任务,直到它自己结束;如果是个死循环,并且GOMAXPROCS=1的话,恭喜你,夯住了!亲测,的确如此。

    4,一个G由于调度被中断,此后如何恢复?
    中断的时候将寄存器里的栈信息,保存到自己的G对象里面。当再次轮到自己执行时,将自己保存的栈信息复制到寄存器里面,这样就接着上次之后运行了。 (≧▽≦)/

    func TestGroutine(t *testing.T) {
    	for i := 0; i < 10; i++ {
    		go func(i int) {
    			//time.Sleep(time.Second * 1)
    			fmt.Println(i) //值传递,协程间没有竞争关系
    		}(i)
    	}
    	time.Sleep(time.Millisecond * 50)
    }
    
    //每次输出顺序不一致
    

    共享内存并发机制

    • 包package sync
      Mutex
      RWLock

    • 加锁操作

    func TestCounterThreadSafe(t *testing.T) {
    	var mut sync.Mutex
    	counter := 0
    	for i := 0; i < 5000; i++ {
    		go func() {
    			defer func() {
    				mut.Unlock()
    			}()
    			mut.Lock()
    			counter++
    		}()
    	}
    	time.Sleep(1 * time.Second) //等所有协程执行完
    	t.Logf("counter = %d", counter)
    }
    
    • WaitGroup 只有我wait的所有东西完成后才能往下执行
    func TestCounterWaitGroup(t *testing.T) {
    	var mut sync.Mutex
    	var wg sync.WaitGroup
    	counter := 0
    	for i := 0; i < 5000; i++ {
    		wg.Add(1) //新增一个我们要等待的任务
    		go func() {
    			defer func() {
    				mut.Unlock()
    			}()
    			mut.Lock()
    			counter++
    			wg.Done() //有一个等待的任务已经完成
    		}()
    	}
    	wg.Wait() //所有要等待的任务都已经完成
    	t.Logf("counter = %d", counter)
    
    }
    

    CSP并发模型

    • CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。 CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。
    Golang CSP
    • Golang 就是借用CSP模型的一些概念为之实现并发进行理论支持,其实从实际上出发,go语言并没有,完全实现了CSP模型的所有理论,仅仅是借用了 process和channel这两个概念。process是在go语言上的表现就是 goroutine 是实际并发执行的实体,每个实体之间是通过channel通讯来实现数据共享。
    Channel
    • Golang中使用 CSP中 channel 这个概念。channel 是被单独创建并且可以在进程之间传递,它的通信模式类似于 boss-worker 模式的,一个实体通过将消息发送到channel 中,然后又监听这个 channel 的实体处理,两个实体之间是匿名的,这个就实现实体中间的解耦,其中 channel 是同步的一个消息被发送到 channel 中,最终是一定要被另外的实体消费掉的,在实现原理上其实是一个阻塞的消息队列。
    // 做一个任务service(50ms)
    func service() string {
    	time.Sleep(time.Millisecond * 50)
    	return "Done"
    }
    
    // 不相关的任务(100ms)
    func otherTask() {
    	fmt.Println("working on something else")
    	time.Sleep(time.Millisecond * 100)
    	fmt.Println("Task is done.")
    }
    
    // 这样调用两个任务是串行的
    func TestService(t *testing.T) {
    	fmt.Println(service())
    	otherTask()
    	//串行输出任务1,任务2结果
    }
    
    /*************** CSP 改进版 ********************/
    
    // 返回结果的时候把channel返回回去,当外面需要的时候可以去channel等待
    func AsyncService() chan string {
    	retCh := make(chan string) //阻塞性channel
    	//retCh := make(chan string, 1) // 非阻塞性
    	//启用一个协程去运行
    	go func() {
    		ret := service()
    		fmt.Println("returned result.")
    		retCh <- ret //运行完把结果返回到channel,如果其他地方没有取这个消息,整个协程被阻塞,如果用buffer channel则不会被阻塞
    		fmt.Println("service exited.")
    	}()
    	return retCh
    }
    
    //
    func TestAsynService(t *testing.T) {
    	retCh := AsyncService() 
    	otherTask() //(结果是这行先被执行好)
    	fmt.Println(<-retCh) //需要结果的时候从channel取出数据
    	time.Sleep(time.Second * 1)
    }
    
    select 多渠道选择

    select 只有任意有一个case有事件,都会往下执行

    select {
        case ret := <-retCh1:
            t.Logf("result %s", ret)
        case ret :=<-retCh2:
            t.Logf("result %s", ret)
        default:
            t.Error(“No one returned”) //如果都是阻塞,会执行default(可选),否则会等待事件
    }
    
    超时控制
    select {
        case ret := <-retCh:
            t.Logf("result %s", ret)
        case <-time.After(time.Second * 1): //没有达到超时时间会被阻塞
            t.Error("time out")
        }
    
    • 示例
    func service() string {
    	time.Sleep(time.Millisecond * 500)
    	return "Done"
    }
    
    func AsyncService() chan string {
    	retCh := make(chan string, 1)
    	//retCh := make(chan string, 1)
    	go func() {
    		ret := service()
    		fmt.Println("returned result.")
    		retCh <- ret
    		fmt.Println("service exited.")
    	}()
    	return retCh
    }
    
    func TestSelect(t *testing.T) {
    	select {
    	case ret := <-AsyncService():
    		t.Log(ret)
    	case <-time.After(time.Millisecond * 100): //100ms发生超时
    		t.Error("time out")
    	}
    }
    
    channel 的关闭和⼴播
    • 向关闭的 channel 发送数据,会导致 panic
    • v, ok <-ch; ok 为 bool 值, true 表示正常接受, false 表示通道关闭
    • 所有的 channel 接收者都会在 channel 关闭时,⽴刻从阻塞等待中返回且上
      述 ok 值为 false。这个⼴播机制常被利⽤,进⾏向多个订阅者同时发送信号。
      如:退出信号。
    func dataProducer(ch chan int, wg *sync.WaitGroup) {
    	go func() {
    		for i := 0; i < 10; i++ {
    			ch <- i
    		}
    		close(ch) //发送完后就把channel 关闭
    
    		wg.Done()
    	}()
    
    }
    
    func dataReceiver(ch chan int, wg *sync.WaitGroup) {
    	go func() {
    		for {
    		    //返回两个值(true正常,false关闭)
    			if data, ok := <-ch; ok {
    				fmt.Println(data)
    			} else {
    				break
    			}
    		}
    		wg.Done()
    	}()
    
    }
    
    func TestCloseChannel(t *testing.T) {
    	var wg sync.WaitGroup
    	ch := make(chan int)
    	wg.Add(1)
    	dataProducer(ch, &wg)
    	wg.Add(1)
    	dataReceiver(ch, &wg)
    	// wg.Add(1)
    	// dataReceiver(ch, &wg)
    	wg.Wait()
    
    }
    
    任务取消
    //获取通知,如果没有收到广播,返回false,收到广播后返回true
    func isCancelled(cancelChan chan struct{}) bool {
    	select {
    	case <-cancelChan:
    		return true
    	default:
    		return false
    	}
    }
    
    func cancel(cancelChan chan struct{}) {
        //关闭信息会被广播接收,其他信号只能被单播
    	close(cancelChan)
    }
    
    func TestCancel(t *testing.T) {
    	cancelChan := make(chan struct{}, 0)
    	for i := 0; i < 5; i++ {
    	    //起了5个协程,每个协程循环等待,一旦有消息,取消任务
    		go func(i int, cancelCh chan struct{}) {
    			for {
    				if isCancelled(cancelCh) {
    					break
    				}
    				time.Sleep(time.Millisecond * 5)
    			}
    			fmt.Println(i, "Cancelled")
    		}(i, cancelChan)
    	}
    	//关闭信号
    	cancel(cancelChan)
    	time.Sleep(time.Second * 1)
    }
    
    context(上下文)任务取消

    功能:取消任务的时候,把子任务也取消了

    • 根 Context:通过 context.Background () 创建
    • ⼦ Context: context.WithCancel(parentContext) 创建
    • ctx, cancel := context.WithCancel(context.Background())
    • 当前 Context 被取消时,基于他的⼦ context 都会被取消
    • 接收取消通知 <-ctx.Done()
    func isCancelled(ctx context.Context) bool {
    	select {
    	case <-ctx.Done(): //任务已经完成
    		return true
    	default:
    		return false
    	}
    }
    
    func TestCancel(t *testing.T) {
    	ctx, cancel := context.WithCancel(context.Background())//创建子Context
    	for i := 0; i < 5; i++ {
    		go func(i int, ctx context.Context) {
    			for {
    				if isCancelled(ctx) {
    					break
    				}
    				time.Sleep(time.Millisecond * 5)
    			}
    			fmt.Println(i, "Cancelled")
    		}(i, ctx)
    	}
    	cancel()
    	time.Sleep(time.Second * 1)
    }
    
    仅运行一次(如线程安全的单例)
    type Singleton struct {
    	data string
    }
    
    var singleInstance *Singleton
    var once sync.Once
    
    //单例,多线程下也只会被运行一次
    func GetSingletonObj() *Singleton {
    	once.Do(func() {
    		fmt.Println("Create Obj")
    		singleInstance = new(Singleton)
    	})
    	return singleInstance
    }
    
    func TestGetSingletonObj(t *testing.T) {
    	var wg sync.WaitGroup
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func() {
    			obj := GetSingletonObj()
    			fmt.Printf("%X
    ", unsafe.Pointer(obj))
    			wg.Done()
    		}()
    	}
    	wg.Wait()
    }
    
    仅需任意任务完成(任何一个有结果就返回)
    func runTask(id int) string {
    	time.Sleep(10 * time.Millisecond)
    	return fmt.Sprintf("The result is from %d", id)
    }
    
    func FirstResponse() string {
    	numOfRunner := 10
    	ch := make(chan string, numOfRunner) //一定要用bufferChannel,防止后面的协程被阻塞
    	for i := 0; i < numOfRunner; i++ {
    		go func(i int) {
    			ret := runTask(i)
    			ch <- ret
    		}(i)
    	}
    	return <-ch //一旦有数据就能直接return
    }
    
    func TestFirstResponse(t *testing.T) {
    	t.Log("Before:", runtime.NumGoroutine()) //2
    	t.Log(FirstResponse())
    	time.Sleep(time.Second * 1)
    	t.Log("After:", runtime.NumGoroutine()) //11 chanel被阻塞没被释放
    
    }
    
    必需所有任务完成(可以实现类似wg)
    func runTask(id int) string {
    	time.Sleep(10 * time.Millisecond)
    	return fmt.Sprintf("The result is from %d", id)
    }
    
    func FirstResponse() string {
    	numOfRunner := 10
    	ch := make(chan string, numOfRunner)
    	for i := 0; i < numOfRunner; i++ {
    		go func(i int) {
    			ret := runTask(i)
    			ch <- ret
    		}(i)
    	}
    	return <-ch
    }
    
    func AllResponse() string {
    	numOfRunner := 10
    	ch := make(chan string, numOfRunner)
    	for i := 0; i < numOfRunner; i++ {
    		go func(i int) {
    			ret := runTask(i)
    			ch <- ret
    		}(i)
    	}
    	finalRet := ""
    	for j := 0; j < numOfRunner; j++ {
    		finalRet += <-ch + "
    "
    	}
    	return finalRet
    }
    
    func TestFirstResponse(t *testing.T) {
    	t.Log(AllResponse())
    }
    
  • 相关阅读:
    [RxJS] Combination operators: concat, startWith
    [RxJS] Filtering operators: skipWhile and skipUntil
    [RxJS] Filtering operators: takeUntil, takeWhile
    [RxJS] Filtering operators: takeLast, last
    强连通tarjan模版
    HDU 4576 Robot (概率DP)
    Inside GDALAllRegister之二: 自动加载驱动
    [置顶] Java中字符串为什么不以结尾
    rcp(插件开发) The 'Eclipse-LazyStart' header is deprecated, use 'Bundle-ActivationPolicy'
    如何在模板中引用参数类中的一个特定member
  • 原文地址:https://www.cnblogs.com/jaychan/p/12609100.html
Copyright © 2011-2022 走看看