zoukankan      html  css  js  c++  java
  • go应用专题:context应用场景

    参考:

    https://www.jianshu.com/p/6def5063c1eb(context应用场景)

    https://zhuanlan.zhihu.com/p/110085652(深入理解context)

    context实现server调度

    net/http在实现http server时就用到了context, 下面简单分析一下

    1、首先Server在开启服务时会创建一个valueCtx,存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx。

    func (srv *Server) Serve(l net.Listener) error {
        ...
        var tempDelay time.Duration     // how long to sleep on accept failure
        baseCtx := context.Background() // base is always background, per Issue 16220
        ctx := context.WithValue(baseCtx, ServerContextKey, srv)
        for {
            rw, e := l.Accept()
            ...
            tempDelay = 0
            c := srv.newConn(rw)
            c.setState(c.rwc, StateNew) // before Serve can return
            go c.serve(ctx)
        }
    }

    2、建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

    func (c *conn) serve(ctx context.Context) {
        c.remoteAddr = c.rwc.RemoteAddr().String()
        ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
        ...
        ctx, cancelCtx := context.WithCancel(ctx)
        c.cancelCtx = cancelCtx
        defer cancelCtx()
        ...
        for {
            w, err := c.readRequest(ctx)
            ...
            serverHandler{c.server}.ServeHTTP(w, w.req)
            ...
        }
    }

    3、读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消方法。

    func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
        ...
        req, err := readRequest(c.bufr, keepHostHeader)
        ...
        ctx, cancelCtx := context.WithCancel(ctx)
        req.ctx = ctx
        ...
        w = &response{
            conn:          c,
            cancelCtx:     cancelCtx,
            req:           req,
            reqBody:       req.Body,
            handlerHeader: make(Header),
            contentLength: -1,
            closeNotifyCh: make(chan bool, 1),
    
            // We populate these ahead of time so we're not
            // reading from req.Header after their Handler starts
            // and maybe mutates it (Issue 14940)
            wants10KeepAlive: req.wantsHttp10KeepAlive(),
            wantsClose:       req.wantsClose(),
        }
        ...
        return w, nil
    }

    这样处理的目的主要有以下几点:

    一旦请求超时,即可中断当前请求;
    在处理构建response过程中如果发生错误,可直接调用response对象的cancelCtx方法结束当前请求;
    在处理构建response完成之后,调用response对象的cancelCtx方法结束当前请求。

    在整个server处理流程中,使用了一条context链贯穿Server、Connection、Request,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

    ValueContext实现请求中间件

    package main
    
    import (
        "net/http"
        "context"
    )
    
    type FooKey string
    
    var UserName = FooKey("user-name")
    var UserId = FooKey("user-id")
    
    func foo(next http.HandlerFunc) http.HandlerFunc {
        return func(w http.ResponseWriter, r *http.Request) {
            ctx := context.WithValue(r.Context(), UserId, "1")
            ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
            next(w, r.WithContext(ctx2))
        }
    }
    
    func GetUserName(context context.Context) string {
        if ret, ok := context.Value(UserName).(string); ok {
            return ret
        }
        return ""
    }
    
    func GetUserId(context context.Context) string {
        if ret, ok := context.Value(UserId).(string); ok {
            return ret
        }
        return ""
    }
    
    func test(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("welcome: "))
        w.Write([]byte(GetUserId(r.Context())))
        w.Write([]byte(" "))
        w.Write([]byte(GetUserName(r.Context())))
    }
    
    func main() {
        http.Handle("/", foo(test))
        http.ListenAndServe(":8080", nil)
    }
    在使用ValueCtx的时候需要注意一点,这里的key不应该设置成为普通的String或者Int类型,为了防止不同的中间件对这个key的覆盖。
    最好的情况是每个中间件使用一个自定义的key类型,比如这里的FooKey,而且获取Value的逻辑尽量也抽取出来作为一个函数,放在这个middleware的同包中。这样,就会有效避免不同包设置相同的key的冲突问题了。

    TimeoutContext实现超时控制

    请求端

    uri := "https://httpbin.org/delay/3"
    req, err := http.NewRequest("GET", uri, nil)
    if err != nil {
        log.Fatalf("http.NewRequest() failed with '%s'
    ", err)
    }
    
    ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
    req = req.WithContext(ctx)
    
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatalf("http.DefaultClient.Do() failed with:
    '%s'
    ", err)
    }
    defer resp.Body.Close()

    服务端

    package main
    
    import (
        "net/http"
        "time"
    )
    
    func test(w http.ResponseWriter, r *http.Request) {
        time.Sleep(20 * time.Second)
        w.Write([]byte("test"))
    }
    
    
    func main() {
        http.HandleFunc("/", test)
        timeoutHandler := http.TimeoutHandler(http.DefaultServeMux, 5 * time.Second, "timeout")
        http.ListenAndServe(":8080", timeoutHandler)
    }

    CancelContext实现任务调度

    package main
    
    import (
        "context"
        "sync"
        "github.com/pkg/errors"
    )
    
    func Rpc(ctx context.Context, url string) error {
        result := make(chan int)
        err := make(chan error)
    
        go func() {
            // 进行RPC调用,并且返回是否成功,成功通过result传递成功信息,错误通过error传递错误信息
            isSuccess := true
            if isSuccess {
                result <- 1
            } else {
                err <- errors.New("some error happen")
            }
        }()
    
        select {//协程会一直等待,直到推出
            case <- ctx.Done():
                // 其他RPC调用调用失败
                return ctx.Err()
            case e := <- err:
                // 本RPC调用失败,返回错误信息
                return e
            case <- result:
                // 本RPC调用成功,不返回错误信息
                return nil
        }
    }
    
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background()) //核心
    
        // RPC1调用
        err := Rpc(ctx, "http://rpc_1_url")
        if err != nil {
            return
        }
    
        wg := sync.WaitGroup{}
    
        // RPC2调用
        wg.Add(1)
        go func(){
            defer wg.Done()
            err := Rpc(ctx, "http://rpc_2_url")
            if err != nil {
                cancel()//发起去掉调度
            }
        }()
    
        // RPC3调用
        wg.Add(1)
        go func(){
            defer wg.Done()
            err := Rpc(ctx, "http://rpc_3_url")
            if err != nil {
                cancel()
            }
        }()
    
        // RPC4调用
        wg.Add(1)
        go func(){
            defer wg.Done()
            err := Rpc(ctx, "http://rpc_4_url")
            if err != nil {
                cancel()
            }
        }()
    
        wg.Wait()
    }
     
  • 相关阅读:
    Kali linux installation instruction
    Source Insight(C/C++/Java编辑器)用法
    Wear OS软件安装指南
    外卖ERP管理系统(一)
    邮轮ERP系统
    使用CEfSharp 下载文件 弹出保存框 IDownloadHandler
    cefSharp通过js操控页面,含跨域操控
    C# Winform 未能加载文件或程序集"System.Data.SQLite"或它的某一个依赖项。试图加载格式不正确的程序
    无法将类型为“System.__ComObject”的 COM 对象强制转换为类类型“mshtml.HTMLInputElementClass
    COMBOBOX绑定DICTIONARY做为数据源
  • 原文地址:https://www.cnblogs.com/tkzc2013/p/15184368.html
Copyright © 2011-2022 走看看