1、场景描述
生产者:多个生产者,生产速度高于消费者消费速度
消费者:多个消费者
数据同步:程序中止信号发出,生产者暂停生产并退出线程,消费者继续消费,直到缓存数据被消费完。
2、模型代码
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
//暂停标志
var bStop = false
//模拟异常/超时等使程序停止
func makeStop(){
time.Sleep(time.Second*4)
bStop = true
}
//生产者
func producer(threadId int, wg *sync.WaitGroup, ch chan string){
count := 0
//标志位为false,不断写入数据
for !bStop {
//模拟生产数据的耗时
time.Sleep(time.Second * 2)
count++
data := strconv.Itoa(threadId) + "+++++++++" + strconv.Itoa(count)
fmt.Println("producer:", data)
ch <- data
}
wg.Done()
}
//消费者
func consumer(wg *sync.WaitGroup, ch chan string){
//不断读取,直到通道关闭
for data:= range ch {
time.Sleep(time.Second * 2)
fmt.Println("consumer:", data)
}
wg.Done()
}
func work(){
//缓存:模拟生产者完成生产,消费者未完成消费
chanStream := make(chan string,30)
//生产者和消费者计数器
wgPd := new(sync.WaitGroup)
wgCs := new(sync.WaitGroup)
//producer
for i := 0; i < 3; i++ {
wgPd.Add(1)
go producer(i, wgPd, chanStream)
}
//consumer
for j := 0; j < 2; j++ {
wgCs.Add(1)
go consumer(wgCs, chanStream)
}
go makeStop()
wgPd.Wait()
//生产完成,关闭通道
close(chanStream)
wgCs.Wait()
}
执行结果:
producer: 1+++++++++1 producer: 2+++++++++1 producer: 0+++++++++1 producer: 0+++++++++2 consumer: 1+++++++++1 producer: 1+++++++++2 consumer: 2+++++++++1 producer: 2+++++++++2 consumer: 0+++++++++2 consumer: 0+++++++++1 consumer: 2+++++++++2 consumer: 1+++++++++2 Process finished with exit code 0
该程序可以保证生产者数据全部被消费者消费。
重点解析:
消费者通过range读取数据
golang 通道读取方式有select和range,本场景更适合range读取数据,因为for循环中,如果通道未被关闭,线程会堵塞读取,即使数据为空,除非通道被关闭,才会退出循环。
生产者完成之后,需要关闭通道,消费者才能正常退出
wgPd.Wait() //生产完成,关闭通道 close(chanStream) wgCs.Wait()
当生产完成之后,关闭通道,消费者发现通道关闭,对应线程才会退出,即生产者完成生产,通过关闭通道告诉消费者,我已完成生产,你消费完剩余数据就退出吧。
makeStop的作用
模拟程序收到暂停信号,或者超时信号,一般情况是通过通道来进行标志,本文通过bool来进行标志,因为通道适用于单线程select模型,本文中生产者为多线程,如果通过放多个数据(数据个数=生产者个数)到通道,也可解决,但是多维护一份数据的一致性没有必要。
生产者数量>消费者数量以及通道加缓存
生产者数量>消费者数量:模拟生产速度大于消费速度的情况,同时生产者和消费者加休眠时间模拟数据处理的耗时;通道加缓存是为了模拟,生产者生产了一定数量数据到通道,并停止生产,验证消费者是否将缓存数据消费。