zoukankan      html  css  js  c++  java
  • gorutine管理(context)

    前言

    如有父gorutine在后台启动了1个gorutine(日志采集模块一直taill日志文件的内容是否新增然后发送到kafka),父gorutine突然得知这个日志路径变了。
    由于开启的这个日志采集子gorutine是在后台一直执行的......总不能重启线上服务/重新加载配置更不能os.Exit(),那么父gorutine如何让这个一直忙着干活的子gorutine退出呢
    我们就可以让子gorutine携带1个Context,子gorutine携带了这个Context,父gorutine就可以通过这个Context达到跟踪、退出子gorutines的目的。
     
    Go语言官方为我们提供了这个Context,中文可以称之为“上下文”。 
    context官方提供用于帮助我们以一种优雅的方式通过父gorutine追踪、退出无法自行退出的子gorutines内置包

     

    问题

    当1个子gorutine已经被开启的时候,我们如何在不结束自己(父gorutine)的前提下,结束这个衍生的子gorutine呢?
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    import (
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    
    //toSchool 子gorutine
    func toSchool() {
    	defer wg.Done()
    	for {
    		fmt.Println("walking~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	//当1个子gorutine开启的时候....
    	go toSchool()
    	fmt.Println("台风来了!")
    	//我们如何在不结束自己的前提下结束这些衍生的子gorutine呢?
    	wg.Wait()
    }
    

      

    自由解决方案

    既然是context是golang官方提出的标准方案,相对而言也会有自由解决方案。

    1.通过全局变量控制子gorutine退出

    package main
    
    import (
    	"fmt"
    	"time"
    	"sync"
    )
    
    var wg sync.WaitGroup
    var canToSignel=true
    func toSchool() {
    	defer wg.Done()
    	//不断地监测这个信号
    	for canToSignel {
    		fmt.Println("I'm on the way walking to scholl~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go toSchool()
    	time.Sleep(time.Second*10)
    	fmt.Println("The typhoon is coming!")
    	//修改全局变量(信号)
    	canToSignel=false
    	wg.Wait()
    }
    

      

      

    2.通过全局channel控制gorutine退出

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    //全局channel
    var signalChan = make(chan bool, 1)
    
    func child() {
    	defer wg.Done()
    	for {
    		fmt.Println("我是child gorutine")
    		time.Sleep(time.Second * 2)
    		//检测全局channel中是否有消息推送
    		select {
    		case <-signalChan:
    			return
    		default:
    		}
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go child()
    	time.Sleep(time.Second * 10)
    	signalChan <- true
    	wg.Wait()
    
    }

    or

    //通过往channel中发送信号的方式
    var canToChannel = make(chan bool, 1)
    
    func toSchool() {
    	defer wg.Done()
    	//不断地监测这个信号
    	select {
    	case <-canToChannel:
    		break
    	default:
    		fmt.Println("I'm on the way walking to scholl~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go toSchool()
    	time.Sleep(time.Second * 10)
    	fmt.Println("The typhoon is coming!")
    	//提交退出信号
    	canToChannel<-true
    	wg.Wait()
    }
    

      

    退出和追踪衍生gorutines的方式有很多,我们不使用go内置的context也能完全解决这一问题,但是每个程序员使用的解决方案不同的话,就会增加代码的阅读难度。

     

    官方解决方案(context)

    Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.

    Incoming requests to a server should create a Context, and outgoing calls to servers should accept a Context.

    The chain of function calls between them must propagate the Context,

    optionally replacing it with a derived Context

    created using WithCancel, WithDeadline, WithTimeout, or WithValue.

    When a Context is canceled, all Contexts derived from it are also canceled.

    什么是context?

    context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:

    type Context interface {
        Deadline() (deadline time.Time, ok bool)
        Done() <-chan struct{}
        Err() error
        Value(key interface{}) interface{}
    }
    

    context可以定义Context类型,专门用来简化 对于处理1个请求的N个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

    创建使用context

    Background()和TODO() (根节点)

    Go内置两个函数:Background()TODO(),这两个函数分别返回一个实现了Context接口的backgroundtodo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。

    Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context

    TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。(必须要传递1个context类型的参数)

    todo本质上也是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context

    WithCancel(取消/退出)
    主要用在父gorutine,控制子gorutine退出。
    cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
    NOTE: acquiring the child's lock while holding parent's lock.
    特性:一旦 根节点的gorutine执行cancel() 关闭的时候,它的所有后代都将被关闭。
     
    ps:
    可以把WithCancel这种context和gorutine一起封装到同1个struct里面。your kown for canceling.
    //1个具体的日志收集任务(TaillTask)
    type TaillTask struct {
    	path     string
    	topic    string
    	instance *tail.Tail
    	//exit task
    	ctx  context.Context
    	exit context.CancelFunc
    }
    

    more

    package taillog
    
    import (
    	"context"
    	"github.com/hpcloud/tail"
    	"fmt"
    	"jd.com/logagent/kafka"
    )
    
    //1个具体的日志收集任务(TaillTask)
    type TaillTask struct {
    	path     string
    	topic    string
    	instance *tail.Tail
    	//exit task
    	ctx  context.Context
    	exit context.CancelFunc
    }
    
    //实例化1个具体的日志收集任务(TaillTask)
    func (T *TaillTask) NewTaillTask(path, topic string)(task *TaillTask,err error){
    	ctx, cancel := context.WithCancel(context.Background())
    	task=&TaillTask{path: path, topic: topic, ctx: ctx, exit: cancel,}
    	//taill 文件配置
    	config := tail.Config{
    		ReOpen:    true,                                 //重新打开文件
    		Follow:    true,                                 //跟随文件
    		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读
    		MustExist: false,                                //文件不存在不报错
    		Poll:      true,
    	}
    	//给task任务填充taill(1个具体打开文件的taillobj)
    	task.instance, err = tail.TailFile(task.path, config)
    	if err != nil {
    		fmt.Println("文件打开失败", err)
    
    	}
    	//直接去采集日志
    	go task.run()
    	return
    }
    
    //从tailobj中读取日志内容---->kafka topic方法
    func (T *TaillTask)run() {
    	fmt.Printf("开始收集%s日志
    ",T.path)
    	for {
    		select {
    		//父进程调用了cancel
    		case <-T.ctx.Done():
    			fmt.Printf("taill任务%s%s退出了...
    ",T.topic,T.path)
    			return
    		case line := <-T.instance.Lines:
    			fmt.Printf("从%s文件中获取到内容%s",T.path,line.Text)
    			//taill采集到数据-----channel------>kafka 异步
    			kafka.SendToChan(T.topic, line.Text)
    		}
    
    	}
    
    }
    

      

     
    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func grandChild(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second*1)
    		fmt.Println("grandchild function ")
    		select{
    		//<-chan struct{}		
    		case <-ctx.Done():
    			return
    		default:		
    		}
    	}
    
    }
    
    func child(ctx context.Context) {
    	defer wg.Done()
    	go grandChild(ctx)
    	for {
    		time.Sleep(time.Second*5)
    		fmt.Println("child function ")
    		select{
    		//<-chan struct{}		
    		case <-ctx.Done():
    			return
    		default:		
    		}
    	}
    }
    
    func main() {
    	//定义1个全局的context类型的变量
    	ctx, cancel := context.WithCancel(context.Background())
    	wg.Add(2)
    	go child(ctx)
    	time.Sleep(time.Second * 10)
    	//退出
    	/*
       cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
    	*/
    	cancel()
    	wg.Wait()
    }
    

      

    案例2

    package main
    
    import (
    	"context"
    	"fmt"
    )
    
    func gen(ctx context.Context) <-chan int {
    	//定义1个destnation channel
    	dest := make(chan int)
    	n := 1
    	//匿名函数协程不断得给这个dest channel中输入数字
    	go func() {
    		for {
    			select {
    			//context结束该匿名函数协程结束
    			case <-ctx.Done():
    				return
    			case dest <- n:
    				n++
    			}
    
    		}
    	}()
    	return dest
    }
    
    func main() {
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    	//调用
    	for n := range gen(ctx) {
    		fmt.Println(n)
    		if n == 5 {
    			//main函数结束之后,调用了context取消
    			return
    		}
    	}
    
    }
    
    WithDeadline(绝对超时时间)

    当context的截止日过期时, ctx.Done()返回后context deadline exceeded。

    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func connectMyql(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second * 1)
    		fmt.Println("我连我连...我连莲莲....")
    		select {
    		case <-ctx.Done():
    			fmt.Println(ctx.Err())
    			return
    		default:
    		}
    
    	}
    }
    
    func main() {
    	//设置context 10秒钟之后过期
    	d := time.Now().Add(time.Second * 10)
    	ctx, cancel := context.WithDeadline(context.Background(), d)
    	/*
    	尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
    	如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    	*/
    	defer cancel()
    	wg.Add(1)
    	go connectMyql(ctx)
    	wg.Wait()
    
    }
    
    WithTimeout(相对超时时间)
    和WithDeadline 是一对,过期之后context超时context deadline exceeded
     
    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func connectMyql(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second * 1)
    		fmt.Println("我连我连...我连莲莲....")
    		select {
    		case <-ctx.Done():
    			fmt.Println(ctx.Err())
    			return
    		default:
    		}
    
    	}
    }
    
    func main() {
    	//设置context 从当前时间开始10秒钟之后过期(决对时间)
    	// d := time.Now().Add(time.Second * 10)
    	// ctx, cancel := context.WithDeadline(context.Background(), d)
    	//设置相对时间 5秒钟后过期(相对时间)
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    	/*
    		尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
    		如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    	*/
    	defer cancel()
    	wg.Add(1)
    	go connectMyql(ctx)
    	wg.Wait()
    
    } 

    WithValue(传递值)

    WithCancel、WithDeadline、WithTimeout,With 这个verb 就是context可以追溯和退出其衍生子gorutines 的关键所在! 在子gorutine开启时就与生俱来一些元数据!

    WithValue可以1个gorutin繁衍了N代子gorutines之后,它的后代gorutines都能with(携带)1个固定值,这样我就可以自上而下追溯这个繁衍链了!

    这也是微服务链路追踪的核心思想。

    func WithValue(parent Context, key, val interface{}) Context

    WithValue returns a copy of parent in which the value associated with key is val.

    WithValue返回父节点的副本,其中与key关联的值为val。

    Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

    仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。我把它当成session来看!

    The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context.

    所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。

    Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.

    WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口

    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    
    	"time"
    )
    
    // context.WithValue
    
    //TraceCode 自定义类型
    type TraceCode string
    
    var wg sync.WaitGroup
    
    func worker(ctx context.Context) {
    	defer wg.Done()
    
    	key := TraceCode("TRACE_CODE")
    	// 在子goroutine中获取trace code,(string)是类型断言!
    	traceCode, ok := ctx.Value(key).(string)
    	if !ok {
    		fmt.Println("invalid trace code")
    	}
    
    	for {
    		fmt.Printf("worker, trace code:%s
    ", traceCode)
    		// 假设正常连接数据库耗时1秒
    		time.Sleep(time.Second * 1)
    		// 10秒后自动调用
    		select {
    		case <-ctx.Done():
    			fmt.Println("worker done!")
    			return
    		default:
    		}
    	}
    
    }
    
    func main() {
    	// 设置1个10秒的超时
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    	//在系统的入口中设置trace code传递给后续启动的goroutine实现微服务日志数据聚合
    	ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "666")
    	wg.Add(1)
    	go worker(ctx)
    	//主线程等待10秒后
    	time.Sleep(time.Second * 10)
    	//通知子goroutine结束
    	cancel()
    	wg.Wait()
    	fmt.Println("over")
    }
    

    context应用场景(微服务链路追踪)

    作为1个微服务架构,微服务之间session不共享的服务端,如何追踪客户端1次request都调用了哪些微服务组件?并且聚合日志。

    参考

  • 相关阅读:
    【Language】 TIOBE Programming Community Index for February 2013
    【diary】good health, good code
    【web】a little bug of cnblog
    【Git】git bush 常用命令
    【web】Baidu zone ,let the world know you
    【diary】help others ,help yourself ,coding is happiness
    【Git】Chinese messy code in widows git log
    【windows】add some font into computer
    SqlServer启动参数配置
    关于sqlserver中xml数据的操作
  • 原文地址:https://www.cnblogs.com/sss4/p/12834302.html
Copyright © 2011-2022 走看看