zoukankan      html  css  js  c++  java
  • golang多线程生产者消费者模型

    1、场景描述

      生产者:多个生产者,生产速度高于消费者消费速度

      消费者:多个消费者

      数据同步:程序中止信号发出,生产者暂停生产并退出线程,消费者继续消费,直到缓存数据被消费完。

    2、模型代码

    package main
    
    import (
    	"fmt"
    	"strconv"
    	"sync"
    	"time"
    )
    
    //暂停标志
    var bStop = false
    
    //模拟异常/超时等使程序停止
    func makeStop(){
    	time.Sleep(time.Second*4)
        bStop = true
    }
    
    //生产者
    func producer(threadId int, wg *sync.WaitGroup, ch chan string){
        count := 0
    
        //标志位为false,不断写入数据
        for !bStop {
        	//模拟生产数据的耗时
    		time.Sleep(time.Second * 2)
    		count++
    		data := strconv.Itoa(threadId) + "+++++++++" + strconv.Itoa(count)
    		fmt.Println("producer:", data)
    		ch <- data
    	}
    
    	wg.Done()
    }
    
    //消费者
    func consumer(wg *sync.WaitGroup, ch chan string){
    
    	//不断读取,直到通道关闭
    	for data:= range ch {
    		time.Sleep(time.Second * 2)
    		fmt.Println("consumer:", data)
    	}
    
        wg.Done()
    }
    
    func work(){
    	//缓存:模拟生产者完成生产,消费者未完成消费
    	chanStream := make(chan string,30)
    
    	//生产者和消费者计数器
    	wgPd := new(sync.WaitGroup)
    	wgCs := new(sync.WaitGroup)
    
    	//producer
    	for i := 0; i < 3; i++ {
    		wgPd.Add(1)
    		go producer(i, wgPd, chanStream)
    	}
    
    	//consumer
    	for j := 0; j < 2; j++ {
    		wgCs.Add(1)
    		go consumer(wgCs, chanStream)
    	}
    
    	go makeStop()
    
    	wgPd.Wait()
    
    	//生产完成,关闭通道
    	close(chanStream)
    	wgCs.Wait()
    }
    

      执行结果:

    producer: 1+++++++++1
    producer: 2+++++++++1
    producer: 0+++++++++1
    producer: 0+++++++++2
    consumer: 1+++++++++1
    producer: 1+++++++++2
    consumer: 2+++++++++1
    producer: 2+++++++++2
    consumer: 0+++++++++2
    consumer: 0+++++++++1
    consumer: 2+++++++++2
    consumer: 1+++++++++2
    
    Process finished with exit code 0
    

      该程序可以保证生产者数据全部被消费者消费。

      重点解析:

      消费者通过range读取数据

           golang 通道读取方式有select和range,本场景更适合range读取数据,因为for循环中,如果通道未被关闭,线程会堵塞读取,即使数据为空,除非通道被关闭,才会退出循环。

           生产者完成之后,需要关闭通道,消费者才能正常退出

    wgPd.Wait()
    //生产完成,关闭通道
    close(chanStream)
    wgCs.Wait()
    

      当生产完成之后,关闭通道,消费者发现通道关闭,对应线程才会退出,即生产者完成生产,通过关闭通道告诉消费者,我已完成生产,你消费完剩余数据就退出吧。

      makeStop的作用

      模拟程序收到暂停信号,或者超时信号,一般情况是通过通道来进行标志,本文通过bool来进行标志,因为通道适用于单线程select模型,本文中生产者为多线程,如果通过放多个数据(数据个数=生产者个数)到通道,也可解决,但是多维护一份数据的一致性没有必要。

            生产者数量>消费者数量以及通道加缓存

            生产者数量>消费者数量:模拟生产速度大于消费速度的情况,同时生产者和消费者加休眠时间模拟数据处理的耗时;通道加缓存是为了模拟,生产者生产了一定数量数据到通道,并停止生产,验证消费者是否将缓存数据消费。

  • 相关阅读:
    vue2 v-model/v-text 中使用过滤器的方法示例
    HTML5游戏开发案例教程合集
    Docker实战案例视频课程
    Java项目框架架构与优化教程
    Linux云计算-虚拟化技术视频教程
    udl
    Chloe官网及基于NFine的后台源码毫无保留开放
    抽象类存在的意义和作用
    Shell 脚本语法
    Github 高级搜索功能
  • 原文地址:https://www.cnblogs.com/ChinaHook/p/14699627.html
Copyright © 2011-2022 走看看