zoukankan      html  css  js  c++  java
  • gorutine管理(context)

    前言

    如有父gorutine在后台启动了1个gorutine(日志采集模块一直taill日志文件的内容是否新增然后发送到kafka),父gorutine突然得知这个日志路径变了。
    由于开启的这个日志采集子gorutine是在后台一直执行的......总不能重启线上服务/重新加载配置更不能os.Exit(),那么父gorutine如何让这个一直忙着干活的子gorutine退出呢
    我们就可以让子gorutine携带1个Context,子gorutine携带了这个Context,父gorutine就可以通过这个Context达到跟踪、退出子gorutines的目的。
     
    Go语言官方为我们提供了这个Context,中文可以称之为“上下文”。 
    context官方提供用于帮助我们以一种优雅的方式通过父gorutine追踪、退出无法自行退出的子gorutines内置包

     

    问题

    当1个子gorutine已经被开启的时候,我们如何在不结束自己(父gorutine)的前提下,结束这个衍生的子gorutine呢?
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    import (
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    
    //toSchool 子gorutine
    func toSchool() {
    	defer wg.Done()
    	for {
    		fmt.Println("walking~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	//当1个子gorutine开启的时候....
    	go toSchool()
    	fmt.Println("台风来了!")
    	//我们如何在不结束自己的前提下结束这些衍生的子gorutine呢?
    	wg.Wait()
    }
    

      

    自由解决方案

    既然是context是golang官方提出的标准方案,相对而言也会有自由解决方案。

    1.通过全局变量控制子gorutine退出

    package main
    
    import (
    	"fmt"
    	"time"
    	"sync"
    )
    
    var wg sync.WaitGroup
    var canToSignel=true
    func toSchool() {
    	defer wg.Done()
    	//不断地监测这个信号
    	for canToSignel {
    		fmt.Println("I'm on the way walking to scholl~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go toSchool()
    	time.Sleep(time.Second*10)
    	fmt.Println("The typhoon is coming!")
    	//修改全局变量(信号)
    	canToSignel=false
    	wg.Wait()
    }
    

      

      

    2.通过全局channel控制gorutine退出

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    //全局channel
    var signalChan = make(chan bool, 1)
    
    func child() {
    	defer wg.Done()
    	for {
    		fmt.Println("我是child gorutine")
    		time.Sleep(time.Second * 2)
    		//检测全局channel中是否有消息推送
    		select {
    		case <-signalChan:
    			return
    		default:
    		}
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go child()
    	time.Sleep(time.Second * 10)
    	signalChan <- true
    	wg.Wait()
    
    }

    or

    //通过往channel中发送信号的方式
    var canToChannel = make(chan bool, 1)
    
    func toSchool() {
    	defer wg.Done()
    	//不断地监测这个信号
    	select {
    	case <-canToChannel:
    		break
    	default:
    		fmt.Println("I'm on the way walking to scholl~~~~")
    		time.Sleep(time.Second * 1)
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go toSchool()
    	time.Sleep(time.Second * 10)
    	fmt.Println("The typhoon is coming!")
    	//提交退出信号
    	canToChannel<-true
    	wg.Wait()
    }
    

      

    退出和追踪衍生gorutines的方式有很多,我们不使用go内置的context也能完全解决这一问题,但是每个程序员使用的解决方案不同的话,就会增加代码的阅读难度。

     

    官方解决方案(context)

    Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.

    Incoming requests to a server should create a Context, and outgoing calls to servers should accept a Context.

    The chain of function calls between them must propagate the Context,

    optionally replacing it with a derived Context

    created using WithCancel, WithDeadline, WithTimeout, or WithValue.

    When a Context is canceled, all Contexts derived from it are also canceled.

    什么是context?

    context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:

    type Context interface {
        Deadline() (deadline time.Time, ok bool)
        Done() <-chan struct{}
        Err() error
        Value(key interface{}) interface{}
    }
    

    context可以定义Context类型,专门用来简化 对于处理1个请求的N个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

    创建使用context

    Background()和TODO() (根节点)

    Go内置两个函数:Background()TODO(),这两个函数分别返回一个实现了Context接口的backgroundtodo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。

    Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context

    TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。(必须要传递1个context类型的参数)

    todo本质上也是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context

    WithCancel(取消/退出)
    主要用在父gorutine,控制子gorutine退出。
    cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
    NOTE: acquiring the child's lock while holding parent's lock.
    特性:一旦 根节点的gorutine执行cancel() 关闭的时候,它的所有后代都将被关闭。
     
    ps:
    可以把WithCancel这种context和gorutine一起封装到同1个struct里面。your kown for canceling.
    //1个具体的日志收集任务(TaillTask)
    type TaillTask struct {
    	path     string
    	topic    string
    	instance *tail.Tail
    	//exit task
    	ctx  context.Context
    	exit context.CancelFunc
    }
    

    more

    package taillog
    
    import (
    	"context"
    	"github.com/hpcloud/tail"
    	"fmt"
    	"jd.com/logagent/kafka"
    )
    
    //1个具体的日志收集任务(TaillTask)
    type TaillTask struct {
    	path     string
    	topic    string
    	instance *tail.Tail
    	//exit task
    	ctx  context.Context
    	exit context.CancelFunc
    }
    
    //实例化1个具体的日志收集任务(TaillTask)
    func (T *TaillTask) NewTaillTask(path, topic string)(task *TaillTask,err error){
    	ctx, cancel := context.WithCancel(context.Background())
    	task=&TaillTask{path: path, topic: topic, ctx: ctx, exit: cancel,}
    	//taill 文件配置
    	config := tail.Config{
    		ReOpen:    true,                                 //重新打开文件
    		Follow:    true,                                 //跟随文件
    		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读
    		MustExist: false,                                //文件不存在不报错
    		Poll:      true,
    	}
    	//给task任务填充taill(1个具体打开文件的taillobj)
    	task.instance, err = tail.TailFile(task.path, config)
    	if err != nil {
    		fmt.Println("文件打开失败", err)
    
    	}
    	//直接去采集日志
    	go task.run()
    	return
    }
    
    //从tailobj中读取日志内容---->kafka topic方法
    func (T *TaillTask)run() {
    	fmt.Printf("开始收集%s日志
    ",T.path)
    	for {
    		select {
    		//父进程调用了cancel
    		case <-T.ctx.Done():
    			fmt.Printf("taill任务%s%s退出了...
    ",T.topic,T.path)
    			return
    		case line := <-T.instance.Lines:
    			fmt.Printf("从%s文件中获取到内容%s",T.path,line.Text)
    			//taill采集到数据-----channel------>kafka 异步
    			kafka.SendToChan(T.topic, line.Text)
    		}
    
    	}
    
    }
    

      

     
    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func grandChild(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second*1)
    		fmt.Println("grandchild function ")
    		select{
    		//<-chan struct{}		
    		case <-ctx.Done():
    			return
    		default:		
    		}
    	}
    
    }
    
    func child(ctx context.Context) {
    	defer wg.Done()
    	go grandChild(ctx)
    	for {
    		time.Sleep(time.Second*5)
    		fmt.Println("child function ")
    		select{
    		//<-chan struct{}		
    		case <-ctx.Done():
    			return
    		default:		
    		}
    	}
    }
    
    func main() {
    	//定义1个全局的context类型的变量
    	ctx, cancel := context.WithCancel(context.Background())
    	wg.Add(2)
    	go child(ctx)
    	time.Sleep(time.Second * 10)
    	//退出
    	/*
       cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
    	*/
    	cancel()
    	wg.Wait()
    }
    

      

    案例2

    package main
    
    import (
    	"context"
    	"fmt"
    )
    
    func gen(ctx context.Context) <-chan int {
    	//定义1个destnation channel
    	dest := make(chan int)
    	n := 1
    	//匿名函数协程不断得给这个dest channel中输入数字
    	go func() {
    		for {
    			select {
    			//context结束该匿名函数协程结束
    			case <-ctx.Done():
    				return
    			case dest <- n:
    				n++
    			}
    
    		}
    	}()
    	return dest
    }
    
    func main() {
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    	//调用
    	for n := range gen(ctx) {
    		fmt.Println(n)
    		if n == 5 {
    			//main函数结束之后,调用了context取消
    			return
    		}
    	}
    
    }
    
    WithDeadline(绝对超时时间)

    当context的截止日过期时, ctx.Done()返回后context deadline exceeded。

    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func connectMyql(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second * 1)
    		fmt.Println("我连我连...我连莲莲....")
    		select {
    		case <-ctx.Done():
    			fmt.Println(ctx.Err())
    			return
    		default:
    		}
    
    	}
    }
    
    func main() {
    	//设置context 10秒钟之后过期
    	d := time.Now().Add(time.Second * 10)
    	ctx, cancel := context.WithDeadline(context.Background(), d)
    	/*
    	尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
    	如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    	*/
    	defer cancel()
    	wg.Add(1)
    	go connectMyql(ctx)
    	wg.Wait()
    
    }
    
    WithTimeout(相对超时时间)
    和WithDeadline 是一对,过期之后context超时context deadline exceeded
     
    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    	"time"
    )
    
    var wg sync.WaitGroup
    
    func connectMyql(ctx context.Context) {
    	defer wg.Done()
    	for {
    		time.Sleep(time.Second * 1)
    		fmt.Println("我连我连...我连莲莲....")
    		select {
    		case <-ctx.Done():
    			fmt.Println(ctx.Err())
    			return
    		default:
    		}
    
    	}
    }
    
    func main() {
    	//设置context 从当前时间开始10秒钟之后过期(决对时间)
    	// d := time.Now().Add(time.Second * 10)
    	// ctx, cancel := context.WithDeadline(context.Background(), d)
    	//设置相对时间 5秒钟后过期(相对时间)
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    	/*
    		尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
    		如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    	*/
    	defer cancel()
    	wg.Add(1)
    	go connectMyql(ctx)
    	wg.Wait()
    
    } 

    WithValue(传递值)

    WithCancel、WithDeadline、WithTimeout,With 这个verb 就是context可以追溯和退出其衍生子gorutines 的关键所在! 在子gorutine开启时就与生俱来一些元数据!

    WithValue可以1个gorutin繁衍了N代子gorutines之后,它的后代gorutines都能with(携带)1个固定值,这样我就可以自上而下追溯这个繁衍链了!

    这也是微服务链路追踪的核心思想。

    func WithValue(parent Context, key, val interface{}) Context

    WithValue returns a copy of parent in which the value associated with key is val.

    WithValue返回父节点的副本,其中与key关联的值为val。

    Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

    仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。我把它当成session来看!

    The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context.

    所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。

    Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.

    WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口

    package main
    
    import (
    	"context"
    	"fmt"
    	"sync"
    
    	"time"
    )
    
    // context.WithValue
    
    //TraceCode 自定义类型
    type TraceCode string
    
    var wg sync.WaitGroup
    
    func worker(ctx context.Context) {
    	defer wg.Done()
    
    	key := TraceCode("TRACE_CODE")
    	// 在子goroutine中获取trace code,(string)是类型断言!
    	traceCode, ok := ctx.Value(key).(string)
    	if !ok {
    		fmt.Println("invalid trace code")
    	}
    
    	for {
    		fmt.Printf("worker, trace code:%s
    ", traceCode)
    		// 假设正常连接数据库耗时1秒
    		time.Sleep(time.Second * 1)
    		// 10秒后自动调用
    		select {
    		case <-ctx.Done():
    			fmt.Println("worker done!")
    			return
    		default:
    		}
    	}
    
    }
    
    func main() {
    	// 设置1个10秒的超时
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    	//在系统的入口中设置trace code传递给后续启动的goroutine实现微服务日志数据聚合
    	ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "666")
    	wg.Add(1)
    	go worker(ctx)
    	//主线程等待10秒后
    	time.Sleep(time.Second * 10)
    	//通知子goroutine结束
    	cancel()
    	wg.Wait()
    	fmt.Println("over")
    }
    

    context应用场景(微服务链路追踪)

    作为1个微服务架构,微服务之间session不共享的服务端,如何追踪客户端1次request都调用了哪些微服务组件?并且聚合日志。

    参考

  • 相关阅读:
    lhgdialog的传值问题
    jquery中lhgdialog插件(一)
    有关eclipse的内存溢出问题
    oracle数据库cmd导出数据和导入数据
    java replace方法
    jaspersoft中分组打印
    Android开发小技巧之根据position判断ListView是否在显示
    Android开发中Chronometer的用法
    [转][darkbaby]任天堂传——失落的泰坦王朝(下)
    [转][darkbaby]任天堂传——失落的泰坦王朝(中)
  • 原文地址:https://www.cnblogs.com/sss4/p/12834302.html
Copyright © 2011-2022 走看看