前言
问题
package main
import (
"fmt"
"time"
)
import (
"sync"
)
var wg sync.WaitGroup
//toSchool 子gorutine
func toSchool() {
defer wg.Done()
for {
fmt.Println("walking~~~~")
time.Sleep(time.Second * 1)
}
}
func main() {
wg.Add(1)
//当1个子gorutine开启的时候....
go toSchool()
fmt.Println("台风来了!")
//我们如何在不结束自己的前提下结束这些衍生的子gorutine呢?
wg.Wait()
}
自由解决方案
既然是context是golang官方提出的标准方案,相对而言也会有自由解决方案。
1.通过全局变量控制子gorutine退出
package main
import (
"fmt"
"time"
"sync"
)
var wg sync.WaitGroup
var canToSignel=true
func toSchool() {
defer wg.Done()
//不断地监测这个信号
for canToSignel {
fmt.Println("I'm on the way walking to scholl~~~~")
time.Sleep(time.Second * 1)
}
}
func main() {
wg.Add(1)
go toSchool()
time.Sleep(time.Second*10)
fmt.Println("The typhoon is coming!")
//修改全局变量(信号)
canToSignel=false
wg.Wait()
}
2.通过全局channel控制gorutine退出
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
//全局channel
var signalChan = make(chan bool, 1)
func child() {
defer wg.Done()
for {
fmt.Println("我是child gorutine")
time.Sleep(time.Second * 2)
//检测全局channel中是否有消息推送
select {
case <-signalChan:
return
default:
}
}
}
func main() {
wg.Add(1)
go child()
time.Sleep(time.Second * 10)
signalChan <- true
wg.Wait()
}
or
//通过往channel中发送信号的方式
var canToChannel = make(chan bool, 1)
func toSchool() {
defer wg.Done()
//不断地监测这个信号
select {
case <-canToChannel:
break
default:
fmt.Println("I'm on the way walking to scholl~~~~")
time.Sleep(time.Second * 1)
}
}
func main() {
wg.Add(1)
go toSchool()
time.Sleep(time.Second * 10)
fmt.Println("The typhoon is coming!")
//提交退出信号
canToChannel<-true
wg.Wait()
}
退出和追踪衍生gorutines的方式有很多,我们不使用go内置的context也能完全解决这一问题,但是每个程序员使用的解决方案不同的话,就会增加代码的阅读难度。
官方解决方案(context)
Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.
Incoming requests to a server should create a Context, and outgoing calls to servers should accept a Context.
The chain of function calls between them must propagate the Context,
optionally replacing it with a derived Context
created using WithCancel, WithDeadline, WithTimeout, or WithValue.
When a Context is canceled, all Contexts derived from it are also canceled.
什么是context?
context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
context可以定义Context类型,专门用来简化 对于处理1个请求的N个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。
创建使用context
Background()和TODO() (根节点)
Go内置两个函数:Background()和TODO(),这两个函数分别返回一个实现了Context接口的background和todo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。
Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。
TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。(必须要传递1个context类型的参数)
todo本质上也是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。
//1个具体的日志收集任务(TaillTask)
type TaillTask struct {
path string
topic string
instance *tail.Tail
//exit task
ctx context.Context
exit context.CancelFunc
}
more
package taillog
import (
"context"
"github.com/hpcloud/tail"
"fmt"
"jd.com/logagent/kafka"
)
//1个具体的日志收集任务(TaillTask)
type TaillTask struct {
path string
topic string
instance *tail.Tail
//exit task
ctx context.Context
exit context.CancelFunc
}
//实例化1个具体的日志收集任务(TaillTask)
func (T *TaillTask) NewTaillTask(path, topic string)(task *TaillTask,err error){
ctx, cancel := context.WithCancel(context.Background())
task=&TaillTask{path: path, topic: topic, ctx: ctx, exit: cancel,}
//taill 文件配置
config := tail.Config{
ReOpen: true, //重新打开文件
Follow: true, //跟随文件
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
//给task任务填充taill(1个具体打开文件的taillobj)
task.instance, err = tail.TailFile(task.path, config)
if err != nil {
fmt.Println("文件打开失败", err)
}
//直接去采集日志
go task.run()
return
}
//从tailobj中读取日志内容---->kafka topic方法
func (T *TaillTask)run() {
fmt.Printf("开始收集%s日志
",T.path)
for {
select {
//父进程调用了cancel
case <-T.ctx.Done():
fmt.Printf("taill任务%s%s退出了...
",T.topic,T.path)
return
case line := <-T.instance.Lines:
fmt.Printf("从%s文件中获取到内容%s",T.path,line.Text)
//taill采集到数据-----channel------>kafka 异步
kafka.SendToChan(T.topic, line.Text)
}
}
}
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func grandChild(ctx context.Context) {
defer wg.Done()
for {
time.Sleep(time.Second*1)
fmt.Println("grandchild function ")
select{
//<-chan struct{}
case <-ctx.Done():
return
default:
}
}
}
func child(ctx context.Context) {
defer wg.Done()
go grandChild(ctx)
for {
time.Sleep(time.Second*5)
fmt.Println("child function ")
select{
//<-chan struct{}
case <-ctx.Done():
return
default:
}
}
}
func main() {
//定义1个全局的context类型的变量
ctx, cancel := context.WithCancel(context.Background())
wg.Add(2)
go child(ctx)
time.Sleep(time.Second * 10)
//退出
/*
cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
*/
cancel()
wg.Wait()
}
案例2
package main
import (
"context"
"fmt"
)
func gen(ctx context.Context) <-chan int {
//定义1个destnation channel
dest := make(chan int)
n := 1
//匿名函数协程不断得给这个dest channel中输入数字
go func() {
for {
select {
//context结束该匿名函数协程结束
case <-ctx.Done():
return
case dest <- n:
n++
}
}
}()
return dest
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//调用
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
//main函数结束之后,调用了context取消
return
}
}
}
当context的截止日过期时, ctx.Done()返回后context deadline exceeded。
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func connectMyql(ctx context.Context) {
defer wg.Done()
for {
time.Sleep(time.Second * 1)
fmt.Println("我连我连...我连莲莲....")
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
}
}
}
func main() {
//设置context 10秒钟之后过期
d := time.Now().Add(time.Second * 10)
ctx, cancel := context.WithDeadline(context.Background(), d)
/*
尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
*/
defer cancel()
wg.Add(1)
go connectMyql(ctx)
wg.Wait()
}
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func connectMyql(ctx context.Context) {
defer wg.Done()
for {
time.Sleep(time.Second * 1)
fmt.Println("我连我连...我连莲莲....")
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
}
}
}
func main() {
//设置context 从当前时间开始10秒钟之后过期(决对时间)
// d := time.Now().Add(time.Second * 10)
// ctx, cancel := context.WithDeadline(context.Background(), d)
//设置相对时间 5秒钟后过期(相对时间)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
/*
尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
*/
defer cancel()
wg.Add(1)
go connectMyql(ctx)
wg.Wait()
}
WithValue(传递值)
WithCancel、WithDeadline、WithTimeout,With 这个verb 就是context可以追溯和退出其衍生子gorutines 的关键所在! 在子gorutine开启时就与生俱来一些元数据!
WithValue可以保证1个gorutin繁衍了N代子gorutines之后,它的后代gorutines都能with(携带)1个固定值,这样我就可以自上而下追溯这个繁衍链了!
这也是微服务链路追踪的核心思想。
func WithValue(parent Context, key, val interface{}) Context
WithValue returns a copy of parent in which the value associated with key is val.
WithValue返回父节点的副本,其中与key关联的值为val。
Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.
仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。我把它当成session来看!
The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context.
所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。
Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.
WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口
package main
import (
"context"
"fmt"
"sync"
"time"
)
// context.WithValue
//TraceCode 自定义类型
type TraceCode string
var wg sync.WaitGroup
func worker(ctx context.Context) {
defer wg.Done()
key := TraceCode("TRACE_CODE")
// 在子goroutine中获取trace code,(string)是类型断言!
traceCode, ok := ctx.Value(key).(string)
if !ok {
fmt.Println("invalid trace code")
}
for {
fmt.Printf("worker, trace code:%s
", traceCode)
// 假设正常连接数据库耗时1秒
time.Sleep(time.Second * 1)
// 10秒后自动调用
select {
case <-ctx.Done():
fmt.Println("worker done!")
return
default:
}
}
}
func main() {
// 设置1个10秒的超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
//在系统的入口中设置trace code传递给后续启动的goroutine实现微服务日志数据聚合
ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "666")
wg.Add(1)
go worker(ctx)
//主线程等待10秒后
time.Sleep(time.Second * 10)
//通知子goroutine结束
cancel()
wg.Wait()
fmt.Println("over")
}
context应用场景(微服务链路追踪)
作为1个微服务架构,微服务之间session不共享的服务端,如何追踪客户端1次request都调用了哪些微服务组件?并且聚合日志。
