zoukankan      html  css  js  c++  java
  • 豌豆夹Redis解决方案Codis源码剖析:Proxy代理

    豌豆夹Redis解决方案Codis源码剖析:Proxy代理

    1.预备知识

    1.1 Codis

    Codis就不详细说了,摘抄一下GitHub上的一些项目描述

    Codis is a proxy based high performance Redis cluster solution written in Go/C, an alternative to Twemproxy. It supports multiple stateless proxy with multiple redis instances and is engineered to elastically scale, Easily add or remove redis or proxy instances on-demand/dynamicly.

    • Auto rebalance
    • Support both redis or rocksdb transparently
    • GUI dashboard & admin tools
    • Supports most of Redis commands, Fully compatible with twemproxy
    • Native Redis clients are supported
    • Safe and transparent data migration, Easily add or remove nodes on-demand
    • Command-line interface is also provided
    • RESTful APIs

    安装步骤官方网站上也写的很清楚了:

    // Golang环境安装配置
    [root@vm root]$ tar -C /usr/local -zxf go1.4.2.linux-amd64.tar.gz
    [root@vm root]$ vim /etc/profile
    export GOROOT=/usr/local/go
    export PATH=$GOROOT/bin:$PATH
    export GOPATH=/home/user/go
    [root@vm root]$ source /etc/profile
    [root@vm root]$ go version
    
    // 下载Codis依赖,编译Codis
    [root@vm root]$ cd codis-1.92
    [root@vm root]$ ./bootstrap.sh

    1.2 Golang

    Codis的核心代码都是用Golang开发的,所以在一头扎进源代码里之前,先了解Golang的语法特性是必不可少的!好在Golang除了少数一些“古怪之处”外,还算容易上手。具体请参考笔者的另一篇文章《 Java程序员的Golang入门指南(上)》

    1.3 Redis通信协议

    Redis通信协议简称为RESP,在分析网络通信时需要这一部分的知识。RESP本身的设计非常简单,所以还是快速过一下吧。具体请参考笔者的另一篇文章《用Netty解析Redis网络协议》以及官网上的协议具体规范

    1.4 Zookeeper

    Codis以及现今很多后端中间件都使用Zookeeper来协调分布式通信,所以在阅读源码前我们至少要知道Zookeeper是干什么的,有哪些基本操作和监听器。具体请参考笔者的另一篇文章《Apache Curator入门实战》

    2.Proxy源码剖析

    Codis可以分为客户端Jodis、代理中间件Codis Proxy、Zookeeper协调、监控界面、Redis定制版Codis Server等组件。这里第一部分主要关注最核心的Proxy部分的源码。

    2.1 程序入口main.go

    codis-1.92/cmd/proxy/main.go是Proxy组件的main函数入口,完成的主要工作就是设置日志级别、解析命令行参数(CPU核数、绑定地址等)、加载配置文件、Golang环境(runtime.GOMAXPROCS并发数)、启动Socket监听等常规任务。顺藤摸瓜,我们要分析的关键应该就在router中。

    func main() {
        // 1.打印banner,设置日志级别
        fmt.Print(banner)
        log.SetLevelByString("info")
    
        // 2.解析命令行参数
        args, err := docopt.Parse(usage, nil, true, "codis proxy v0.1", true)
        if err != nil {
            log.Error(err)
        }
        if args["-c"] != nil {
            configFile = args["-c"].(string)
        }
        ...
    
        dumppath := utils.GetExecutorPath()
    
        log.Info("dump file path:", dumppath)
        log.CrashLog(path.Join(dumppath, "codis-proxy.dump"))
    
        // 3.设置Golang并发数等
        router.CheckUlimit(1024)
        runtime.GOMAXPROCS(cpus)
    
        // 4.启动Http监听
        http.HandleFunc("/setloglevel", handleSetLogLevel)
        go http.ListenAndServe(httpAddr, nil)
        log.Info("running on ", addr)
        conf, err := router.LoadConf(configFile)
        if err != nil {
            log.Fatal(err)
        }
    
        // 5.创建Server,启动Socket监听
        s := router.NewServer(addr, httpAddr, conf)
        s.Run()
        log.Warning("exit")
    }

    2.2 核心类Server

    打开codis-1.92/pkg/proxy/router/router.go,在分析请求接收和分发前,先来看一个最核心的类Server,它就是在main.go中调用router.NewServer()时创建的。说一下比较重要的几个字段:

    • reqCh:Pipeline请求的Channel。
    • pools:Slot与cachepool的map。
    • evtbus/top:处理Zookeeper消息,更新拓扑结构。
    • bufferedReq:Slot处于migrate期间被缓冲的请求。
    • pipeConns:Slot对应的taskrunner。

    注意interface{},它表示空interface,按照Golang的Duck Type继承方式,任何类都是空接口的子类。所以interface{}有点像C语言中的void*/char*。

    因为Codis是先启动监听再开始接收Socket请求,所以对go s.handleTopoEvent()的分析放到后面。在下一节我们先看一下Codis是如何启动对Socket端口监听并将接收到的请求放入到Server的reqCh管道中的。

    type Server struct {
        slots  [models.DEFAULT_SLOT_NUM]*Slot
        top    *topo.Topology
        evtbus chan interface{}
        reqCh  chan *PipelineRequest
    
        lastActionSeq int
        pi            models.ProxyInfo
        startAt       time.Time
        addr          string
    
        moper       *MultiOperator
        pools       *cachepool.CachePool
        counter     *stats.Counters
        OnSuicide   OnSuicideFun
        bufferedReq *list.List
        conf        *Conf
    
        pipeConns map[string]*taskRunner //redis->taskrunner
    }
    
    func NewServer(addr string, debugVarAddr string, conf *Conf) *Server {
        log.Infof("start with configuration: %+v", conf)
    
        // 1.创建Server类
        s := &Server{
            conf:          conf,
            evtbus:        make(chan interface{}, 1000),
            top:           topo.NewTopo(conf.productName, conf.zkAddr, conf.f, conf.provider),
            counter:       stats.NewCounters("router"),
            lastActionSeq: -1,
            startAt:       time.Now(),
            addr:          addr,
            moper:         NewMultiOperator(addr),
            reqCh:         make(chan *PipelineRequest, 1000),
            pools:         cachepool.NewCachePool(),
            pipeConns:     make(map[string]*taskRunner),
            bufferedReq:   list.New(),
        }
        ...
    
        // 2.启动Zookeeper监听器
        s.RegisterAndWait()
        _, err = s.top.WatchChildren(models.GetWatchActionPath(conf.productName), s.evtbus)
        if err != nil {
            log.Fatal(errors.ErrorStack(err))
        }
    
        // 3.初始化所有Slot的信息
        s.FillSlots()
    
        // 4.启动对reqCh和evtbus中事件的监听
        go s.handleTopoEvent()
    
        return s
    }

    2.2.1 Zookeeper通信topology.go

    NewServer()中调用的RegisterAndWait()和WatchChildren()都是处理Zookeeper的。一部分代码在codis-1.92/pkg/proxy/router/topology/topology.go中,一部分底层实现在codis-1.92/pkg/models包下。这里就不具体分析models包是如何与Zookeeper通信的了,以免偏离了主题。现阶段,我们只需知道Zookeeper中结点关系(Proxy拓扑结构)的变化都会反映到evtbus管道中就行了。

    func (s *Server) RegisterAndWait() {
        _, err := s.top.CreateProxyInfo(&s.pi)
        if err != nil {
            log.Fatal(errors.ErrorStack(err))
        }
    
        _, err = s.top.CreateProxyFenceNode(&s.pi)
        if err != nil {
            log.Warning(errors.ErrorStack(err))
        }
    
        s.registerSignal()
        s.waitOnline()
    }
    
    func (top *Topology) WatchChildren(path string, evtbus chan interface{}) ([]string, error) {
        content, _, evtch, err := top.zkConn.ChildrenW(path)
        if err != nil {
            return nil, errors.Trace(err)
        }
    
        // 启动监听器,监听Zookeeper事件
        go top.doWatch(evtch, evtbus)
        return content, nil
    }
    
    func (top *Topology) doWatch(evtch <-chan topo.Event, evtbus chan interface{}) {
        e := <-evtch
        if e.State == topo.StateExpired || e.Type == topo.EventNotWatching {
            log.Fatalf("session expired: %+v", e)
        }
    
        log.Warningf("topo event %+v", e)
    
        switch e.Type {
        //case topo.EventNodeCreated:
        //case topo.EventNodeDataChanged:
        case topo.EventNodeChildrenChanged: //only care children changed
            //todo:get changed node and decode event
        default:
            log.Warningf("%+v", e)
        }
    
        // 将Zookeeper结点变化的事件放入Server的evtbus管道中
        evtbus <- e
    }

    2.2.2 初始化槽信息fillSlots()

    Codis将Redis服务器按照Group划分,每个Group就是一个Master以及至少一个Slave。也就是说每个Group都对应哈希散列的一个Slot。fillSlots()从ZooKeeper中取出注册的Redis后端信息,初始化每个Slot(默认1024个):包括Slot状态、Group信息等。

    func (s *Server) FillSlots() {
        // 为所有默认1024个Slot初始化信息
        for i := 0; i < models.DEFAULT_SLOT_NUM; i++ {
            s.fillSlot(i, false)
        }
    }
    
    func (s *Server) fillSlot(i int, force bool) {
        s.clearSlot(i)
    
        // 1.获得当前Slot的信息和Group信息
        slotInfo, groupInfo, err := s.top.GetSlotByIndex(i)
    
        slot := &Slot{
            slotInfo:  slotInfo,
            dst:       group.NewGroup(*groupInfo),
            groupInfo: groupInfo,
        }
    
        // 2.创建Slot对应的cachepool
        s.pools.AddPool(slot.dst.Master())
    
        if slot.slotInfo.State.Status == models.SLOT_STATUS_MIGRATE {
            //get migrate src group and fill it
            from, err := s.top.GetGroup(slot.slotInfo.State.MigrateStatus.From)
            if err != nil { //todo: retry ?
                log.Fatal(err)
            }
            slot.migrateFrom = group.NewGroup(*from)
            s.pools.AddPool(slot.migrateFrom.Master())
        }
    
        s.slots[i] = slot
        s.counter.Add("FillSlot", 1)
    }

    codis-1.92/pkg/proxy/cachepool/cachepool.go和codis-1.92/pkg/proxy/redispool/redispool.go中负责创建与Redis通信的连接池。

    type LivePool struct {
        pool redispool.IPool
    }
    
    type CachePool struct {
        mu    sync.RWMutex
        pools map[string]*LivePool
    }
    
    func (cp *CachePool) AddPool(key string) error {
        // 1.锁住cachepool
        cp.mu.Lock()
        defer cp.mu.Unlock()
    
        // 2.查找当前Slot的连接池
        pool, ok := cp.pools[key]
        if ok {
            return nil
        }
    
        // 3.若不存在则新建LivePool
        pool = &LivePool{
            //pool: redispool.NewConnectionPool("redis conn pool", 50, 120*time.Second),
            pool: NewSimpleConnectionPool(),
        }
    
        // 4.打开连接
        pool.pool.Open(redispool.ConnectionCreator(key))
    
        // 5.保存新建好的连接池
        cp.pools[key] = pool
    
        return nil
    }

    2.3 请求接收router.go(1)

    下面继续跟踪主流程,main()方法在调用NewServer()创建出Server实例后,调用了其Run()方法。Run()是标准的服务端代码,首先net.Listen()绑定到端口上监听,然后进入死循环Accept(),每接收到一个连接就启动一个goroutine进行处理。

    func (s *Server) Run() {
        log.Infof("listening %s on %s", s.conf.proto, s.addr)
        listener, err := net.Listen(s.conf.proto, s.addr)
        ...
    
        for {
            conn, err := listener.Accept()
            if err != nil {
                log.Warning(errors.ErrorStack(err))
                continue
            }
            go s.handleConn(conn)
        }
    }

    handleConn()接收到客户端的连接,完成三件事儿:

    • 创建session对象:保存当前客户端的Socket连接、读写缓冲区、响应Channel等。
    • 启动响应goroutine:client.WritingLoop()中处理backQ中的响应数据。
    • 建立Redis连接:server.redisTunnel()中打开连接,读取客户端请求并转发给Redis处理。
    func (s *Server) handleConn(c net.Conn) {
        log.Info("new connection", c.RemoteAddr())
        s.counter.Add("connections", 1)
    
        // 1.创建当前客户端的Session实例
        client := &session{
            Conn:        c,
            r:           bufio.NewReaderSize(c, 32*1024),
            w:           bufio.NewWriterSize(c, 32*1024),
            CreateAt:    time.Now(),
            backQ:       make(chan *PipelineResponse, 1000),
            closeSignal: &sync.WaitGroup{},
        }
        client.closeSignal.Add(1)
    
        // 2.启动监视backQ写回响应的子routine
        go client.WritingLoop()
        ...
    
        // 3.循环读取该客户端的请求并处理
        for {
            err = s.redisTunnel(client)
            if err != nil {
                close(client.backQ)
                return
            }
            client.Ops++
        }
    }

    redisTunnel可以说是Proxy服务端的“代码中枢”了,最核心的代码都是在这里共同协作完成任务的,它调用三个最为关键的函数:

    • getRespOpKeys()解析请求:在helper.go中,委托parser.go解析客户端请求。此处对多参数的请求例如hmset进行特殊处理,因为key可能对应多个后端Redis实例。如果是单参数,则可以Pipeline化发送给后端。
    • mapKey2Slot()哈希映射:在mapper.go中,计算key应该分配到哪台Redis服务器的Slot中。
    • PipelineRequest()创建Pipeline请求:根据前面得到的数据新建PipelineRequest,并发送到当前客户端Session中的Channel中。之后调用pr.wg.Wait(),当前go s.handleConn()创建的goroutine休眠等待响应
    func (s *Server) redisTunnel(c *session) error {
        resp, op, keys, err := getRespOpKeys(c)
        k := keys[0]
        ...
    
        if isMulOp(opstr) {
            if len(keys) > 1 { //can not send to redis directly
                var result []byte
                err := s.moper.handleMultiOp(opstr, keys, &result)
                if err != nil {
                    return errors.Trace(err)
                }
    
                s.sendBack(c, op, keys, resp, result)
                return nil
            }
        }
    
        i := mapKey2Slot(k)
    
        //pipeline
        c.pipelineSeq++
        pr := &PipelineRequest{
            slotIdx: i,
            op:      op,
            keys:    keys,
            seq:     c.pipelineSeq,
            backQ:   c.backQ,
            req:     resp,
            wg:      &sync.WaitGroup{},
        }
        pr.wg.Add(1)
    
        s.reqCh <- pr
        pr.wg.Wait()
    
        return nil
    }

    2.3.1 RESP协议解析parser.go

    redisTunnel()调用了helper.go中的getRespOpKeys(),后者使用parser.go解析RESP协议请求,从Parse()函数的代码中能清晰地看到对RESP五种通信格式’-‘,’+’,’:’,’$’,’*’。因为要根据请求中的命令和key做路由,以及特殊处理(例如多参数命令),所以Codis不能简单地透传,而是解析协议获得所需的信息

    注意parser.Parse()的用法,这里parser是包名不是一个对象实例,而Parse是parser包中的一个public函数。所以乍看之下有点困惑了,这也是Golang支持既像C一样面向过程编程,又有高级语言的面向对象甚至Duck Type的缘故。

    Parse()读取网络流,并递归处理整个请求。例如”GET ab”命令:

        *2
    
    
        $3
    
    
        GET
    
    
        $2
    
    
        ab
    
    

    最终Parse()返回时得到:

        Resp{
            Raw: "*2
    
    ", 
            Multi{
                Resp{ Raw: "$3
    
    GET
    
    " },
                Resp{ Raw: "$2
    
    ab
    
    " }
            }
        }

    如果细致分析的话,readLine()中使用readSlice()读取缓冲区的切片,节约了内存。这种设计上的小细节还是很值得关注和学习的,毕竟“天下大事,必作于细”。

    func getRespOpKeys(c *session) (*parser.Resp, []byte, [][]byte, error) {
        resp, err := parser.Parse(c.r) // read client request
        op, keys, err := resp.GetOpKeys()
        ...
    
        return resp, op, keys, nil
    }
    
    type Resp struct {
        Type  int
        Raw   []byte
        Multi []*Resp
    }
    
    func Parse(r *bufio.Reader) (*Resp, error) {
        line, err := readLine(r)
        if err != nil {
            return nil, errors.Trace(err)
        }
    
        resp := &Resp{}
        if line[0] == '$' || line[0] == '*' {
            resp.Raw = make([]byte, 0, len(line)+64)
        } else {
            resp.Raw = make([]byte, 0, len(line))
        }
    
        resp.Raw = append(resp.Raw, line...)
    
        switch line[0] {
        case '-':
            resp.Type = ErrorResp
            return resp, nil
        case '+':
            resp.Type = SimpleString
            return resp, nil
        case ':':
            resp.Type = IntegerResp
            return resp, nil
        case '$':
            resp.Type = BulkResp
            ...
        case '*':
            resp.Type = MultiResp
            ...
    }

    2.3.2 哈希映射mapper.go

    mapKey2Slot()处理HashTag,并使用CRC32计算哈希值。

    const (
        HASHTAG_START = '{'
        HASHTAG_END   = '}'
    )
    
    func mapKey2Slot(key []byte) int {
        hashKey := key
        //hash tag support
        htagStart := bytes.IndexByte(key, HASHTAG_START)
        if htagStart >= 0 {
            htagEnd := bytes.IndexByte(key[htagStart:], HASHTAG_END)
            if htagEnd >= 0 {
                hashKey = key[htagStart+1 : htagStart+htagEnd]
            }
        }
    
        return int(crc32.ChecksumIEEE(hashKey) % models.DEFAULT_SLOT_NUM)
    }

    2.4 请求分发router.go(2)

    NewServer()中执行go s.handleTopoEvent()启动goroutine,对Server数据结构中的reqCh和evtbus两个Channel进行事件监听处理。这里重点看拿到reqCh的事件后是如何dispatch()的。reqCh的事件也就是PipelineRequest,会经dispath()函数放入对应Slot的taskrunner的in管道中。也就是说,reqCh中的请求会被分发到各个Slot自己的Channel中

    另外注意:此处会检查PipelineRequest对应Slot的状态,如果正在migrate,则暂时将请求缓冲到Server类的bufferedReq链表中

    func (s *Server) handleTopoEvent() {
        for {
            select {
            // 1.处理Server.reqCh中事件
            case r := <-s.reqCh:
                // 1.1 如果正在migrate,则将请求r暂时缓冲起来
                if s.slots[r.slotIdx].slotInfo.State.Status == models.SLOT_STATUS_PRE_MIGRATE {
                    s.bufferedReq.PushBack(r)
                    continue
                }
    
                // 1.2 处理缓冲中的请求e
                for e := s.bufferedReq.Front(); e != nil; {
                    next := e.Next()
                    s.dispatch(e.Value.(*PipelineRequest))
                    s.bufferedReq.Remove(e)
                    e = next
                }
    
                // 1.3 处理当前请求r
                s.dispatch(r)
    
            // 2.处理Server.evtbus中请求
            case e := <-s.evtbus:
                switch e.(type) {
                case *killEvent:
                    s.handleMarkOffline()
                    e.(*killEvent).done <- nil
                default:
                    evtPath := GetEventPath(e)
                    ...
                    s.processAction(e)
                }
            }
        }
    }
    
    func (s *Server) dispatch(r *PipelineRequest) {
        s.handleMigrateState(r.slotIdx, r.keys[0])
    
        // 1.查找Slot对应的taskrunner
        tr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()]
    
        // 2.若没有,则新建一个taskrunner
        if !ok {
            // 2.1 新建tr时出错,则向r.backQ放入一个空响应
            if err := s.createTaskRunner(s.slots[r.slotIdx]); err != nil {
                r.backQ <- &PipelineResponse{ctx: r, resp: nil, err: err}
                return
            }
    
            // 2.2 拿到taskrunner
            tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()]
        }
    
        // 3.将请求r放入in管道
        tr.in <- r
    }

    taskrunner.go的createTaskRunner()调用NewTaskRunner()创建当前Slot对应的taskrunner。每个taskrunner都拥有一对in和out管道。之前的PipelineRequest就是放到in管道中。然后启动了两个goroutine,分别调用writeloop()和readloop()函数监听in和out管道,处理其中的请求。

    func (s *Server) createTaskRunner(slot *Slot) error {
        dst := slot.dst.Master()
        if _, ok := s.pipeConns[dst]; !ok {
            tr, err := NewTaskRunner(dst, s.conf.netTimeout)
            if err != nil {
                return errors.Errorf("create task runner failed, %v,  %+v, %+v", err, slot.dst, slot.slotInfo)
            } else {
                s.pipeConns[dst] = tr
            }
        }
        return nil
    }
    
    func NewTaskRunner(addr string, netTimeout int) (*taskRunner, error) {
        // 1.创建TaskRunner实例
        tr := &taskRunner{
            in:         make(chan interface{}, 1000),
            out:        make(chan interface{}, 1000),
            redisAddr:  addr,
            tasks:      list.New(),
            netTimeout: netTimeout,
        }
    
        // 2.创建Redis连接,并绑定到tr
        c, err := redisconn.NewConnection(addr, netTimeout)
        tr.c = c
    
        // 3.开始监听读写管道in和out
        go tr.writeloop()
        go tr.readloop()
    
        return tr, nil
    }
    
    func (tr *taskRunner) writeloop() {
        var err error
        tick := time.Tick(2 * time.Second)
        for {
            ...
    
            select {
            // 1.处理in管道中来自客户端的请求
            case t := <-tr.in:
                tr.processTask(t)
            // 2.处理out管道中来自Redis的响应
            case resp := <-tr.out:
                err = tr.handleResponse(resp)
            // 设置select间隔
            case <-tick:
                if tr.tasks.Len() > 0 && int(time.Since(tr.latest).Seconds()) > tr.netTimeout {
                    tr.c.Close()
                }
            }
        }
    }

    2.5 请求发送taskrunner.go

    终于到了请求的生命周期的最后一个环节了!writeloop()会不断调用processTask()处理in管道中的请求,通过dowrite()函数发送到Redis服务端。当in管道中没有其他请求时,会强制刷新一下缓冲区。

    func (tr *taskRunner) processTask(t interface{}) {
        var err error
        switch t.(type) {
        case *PipelineRequest:
            r := t.(*PipelineRequest)
            var flush bool
            if len(tr.in) == 0 { //force flush
                flush = true
            }
            err = tr.handleTask(r, flush)
        case *sync.WaitGroup: //close taskrunner
            err = tr.handleTask(nil, true) //flush
            ...
        }
        ...
    }
    
    func (tr *taskRunner) handleTask(r *PipelineRequest, flush bool) error {
        if r == nil && flush { //just flush
            return tr.c.Flush()
        }
    
        // 1.将请求保存到链表,接收到响应时再移除
        tr.tasks.PushBack(r)
        tr.latest = time.Now()
    
        // 2.发送请求到Redis
        return errors.Trace(tr.dowrite(r, flush))
    }
    
    type Resp struct {
        Type  int
        Raw   []byte
        Multi []*Resp
    }
    
    func (tr *taskRunner) dowrite(r *PipelineRequest, flush bool) error {
        // 1.通过Bytes()函数取出Resp中的原始字节Raw
        b, err := r.req.Bytes()
        ...
    
        // 2.将原始请求发送到Redis服务端
        _, err = tr.c.Write(b)
        ...
    
        // 3.如果需要,强制刷新缓冲区
        if flush {
            return errors.Trace(tr.c.Flush())
        }
        return nil
    }

    Codis使用Golang的bufio库处理底层的IO流读写操作。在NewConnection()中,用net包创建到Redis的Socket连接,并分别创建大小为512K的读写缓冲流。

    //not thread-safe
    type Conn struct {
        addr string
        net.Conn
        closed     bool
        r          *bufio.Reader
        w          *bufio.Writer
        netTimeout int //second
    }
    
    func NewConnection(addr string, netTimeout int) (*Conn, error) {
        // 1.打开到Redis服务端的TCP连接
        conn, err := net.DialTimeout("tcp", addr, time.Duration(netTimeout)*time.Second)
        ...
    
        // 2.创建Conn实例,及读写缓冲区
        return &Conn{
            addr:       addr,
            Conn:       conn,
            r:          bufio.NewReaderSize(conn, 512*1024),
            w:          bufio.NewWriterSize(deadline.NewDeadlineWriter(conn, time.Duration(netTimeout)*time.Second), 512*1024),
            netTimeout: netTimeout,
        }, nil
    }
    
    func (c *Conn) Flush() error {
        return c.w.Flush()
    }
    
    func (c *Conn) Write(p []byte) (int, error) {
        return c.w.Write(p)
    }

    2.6 返回响应session.go

    当writeloop()在“如火如荼”地向Redis发送请求时,readloop()也没有闲着。它不断地从Redis读取响应。发送每个请求时都不会等待Redis的响应,也就是说发送请求和读取响应完全是异步进行的,所以就充分利用了Pipeline的性能优势

    func (tr *taskRunner) readloop() {
        for {
            // 1.从Redis连接中读取响应
            resp, err := parser.Parse(tr.c.BufioReader())
            if err != nil {
                tr.out <- err
                return
            }
    
            // 2.将解析好的响应放入out管道中
            tr.out <- resp
        }
    }
    
    func (tr *taskRunner) handleResponse(e interface{}) error {
        switch e.(type) {
        ...
        case *parser.Resp:
            // 1.取到out管道中的PipelineResponse
            resp := e.(*parser.Resp)
    
            // 2.取出对应的PipelineRequest
            e := tr.tasks.Front()
            req := e.Value.(*PipelineRequest)
    
            // 3.将响应放入到backQ管道中(req.backQ也就是session中的backQ)
            req.backQ <- &PipelineResponse{ctx: req, resp: resp, err: nil}
    
            // 4.从任务列表中移除已拿到响应的请求
            tr.tasks.Remove(e)
            return nil
        }
        return nil
    }

    因为writeloop()不仅监视in管道,也监视out管道。所以writeloop()会将readloop()放入的响应交给handleResponse()处理。最终PipelineResponse被放入Session对象的backQ管道中。还记得它吗?在最开始NewServer时为当前客户端创建的Session实例。最后,接收到的PipleResponse会转成RESP协议的字节序列,发送回客户端。

    func (s *session) WritingLoop() {
        s.lastUnsentResponseSeq = 1
        for {
            select {
            case resp, ok := <-s.backQ:
                if !ok {
                    s.Close()
                    s.closeSignal.Done()
                    return
                }
    
                flush, err := s.handleResponse(resp)
                ...
            }
        }
    }
    
    func (s *session) handleResponse(resp *PipelineResponse) (flush bool, err error) {
        ...
        if !s.closed {
            if err := s.writeResp(resp); err != nil {
                return false, errors.Trace(err)
            }
            flush = true
        }
        return
    }
    
    func (s *session) writeResp(resp *PipelineResponse) error {
        // 1.取出Resp中的原始字节
        buf, err := resp.resp.Bytes()
        if err != nil {
            return errors.Trace(err)
        }
    
        // 2.写回到客户端
        _, err = s.Write(buf)
        return errors.Trace(err)
    }
    
    //write without bufio
    func (s *session) Write(p []byte) (int, error) {
        return s.w.Write(p)
    }

    2.7 Proxy源码流程总结

    最后以一张Proxy的流程图作结束。经过我们的分析能够看出,关于并发安全方面,Codis唯一需要并发控制的地方就是从reqCh分发到各个Slot的Channel,为了避免竞争,这一部分是由一个goroutine完成的。

    overview

  • 相关阅读:
    Leetcode Binary Tree Preorder Traversal
    Leetcode Minimum Depth of Binary Tree
    Leetcode 148. Sort List
    Leetcode 61. Rotate List
    Leetcode 86. Partition List
    Leetcode 21. Merge Two Sorted Lists
    Leetcode 143. Reorder List
    J2EE项目应用开发过程中的易错点
    JNDI初认识
    奔腾的代码
  • 原文地址:https://www.cnblogs.com/xiaomaohai/p/6157610.html
Copyright © 2011-2022 走看看