zoukankan      html  css  js  c++  java
  • go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器

    一、使用场景

    大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。

    所以需要对多个协程进行控制。

    二、使用知识

    1. 从一个未初始化的管道读会阻塞

    2.从一个关闭的管道读不会阻塞

    利用两个管道和select 进行控制

    三、上代码

    控制器代码

    package util
    
    import (
    	"errors"
    	"sync"
    )
    
    const (
    	//STOP 停止
    	STOP = iota
    	//START 开始
    	START
    	//PAUSE 暂停
    	PAUSE
    )
    
    //Control 控制器
    type Control struct {
    	ch1  chan struct{}
    	ch2  chan struct{}
    	stat int64
    	lock sync.RWMutex
    }
    
    var (
    	//ErrStat 错误状态
    	ErrStat = errors.New("stat error")
    )
    
    //NewControl 获得一个新Control
    func NewControl() *Control {
    	return &Control{
    		ch1:  make(chan struct{}),
    		ch2:  nil,
    		stat: START,
    		lock: sync.RWMutex{},
    	}
    }
    
    //Stop 停止
    func (c *Control) Stop() error {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	if c.stat == START {
    		c.ch2 = nil
    		close(c.ch1)
    		c.stat = STOP
    	} else if c.stat == PAUSE {
    		ch2 := c.ch2
    		c.ch2 = nil
    		close(c.ch1)
    		close(ch2)
    		c.stat = STOP
    	} else {
    		return ErrStat
    	}
    	return nil
    }
    
    //Pause 暂停
    func (c *Control) Pause() error {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	if c.stat == START {
    		c.ch2 = make(chan struct{})
    		close(c.ch1)
    		c.stat = PAUSE
    	} else {
    		return ErrStat
    	}
    	return nil
    }
    
    //Start 开始
    func (c *Control) Start() error {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	if c.stat == PAUSE {
    		c.ch1 = make(chan struct{})
    		close(c.ch2)
    		c.stat = START
    	} else {
    		return ErrStat
    	}
    	return nil
    }
    
    //C 控制管道
    func (c *Control) C() <-chan struct{} {
    	c.lock.RLock()
    	defer c.lock.RUnlock()
    	return c.ch1
    }
    
    //Wait 等待
    func (c *Control) Wait() bool {
    	c.lock.RLock()
    	ch2 := c.ch2
    	c.lock.RUnlock()
    	if ch2 == nil {  //通过赋值nil 发送停止推出命令
    		return false
    	}
    	<-ch2  //会进行阻塞
    	return true
    }
    

    使用代码

    	for {
    		select {
    		case part, ok := <-c.Partitions():
    			if !ok {
    				conf.Logger.Error("get kafka Partitions not ok", regular.Name)
    				return
    			}
    			go readFromPart(c, part, regular, respChan)
    		case <-regular.C():   //regular 为Control 类
    			if !regular.Wait() {
    				conf.Logger.Debug("Stop! ")
    				return
    			}
    			conf.Logger.Debug("Start! ")
    		}
    	}

    这样就可以随时随地的控制工程中的协程

    regular  := util.NewControl()
    regular.Pause()
    regular.Start()
    regular.Stop()
    

      

  • 相关阅读:
    数据仓库
    数据库事务隔离级别与锁
    并发包之Future:代码级控制超时时间
    axis2 webservice 发布、调用与项目集成
    配置远程控制
    解决局部刷新的问题
    Sharepoint 2013 搜索高级配置(Search Scope)
    重启IIS报错:IIS 服务或万维网发布服务,或者依赖这 服务可能在启动期间发生错误或者已禁用
    错误提示:此产品的试用期已经结束
    Sharepoint 2013 启用搜做服务
  • 原文地址:https://www.cnblogs.com/zhaosc-haha/p/11966215.html
Copyright © 2011-2022 走看看