zoukankan      html  css  js  c++  java
  • golang源码分析:redcon基于redis协议的框架

    https://github.com/tidwall/redcon 是一个 Go实现 的 Redis 兼容服务器框架。它实现了redis协议,封装了网络连接,我们可以基于这个库快速实现一个基于redis协议的服务器。简单的redis服务器https://github.com/redis-go/redis 就是基于这个包实现的。

    package main


    import (
    "log"
    "strings"
    "sync"


    "github.com/tidwall/redcon"
    )


    var addr = ":6380"


    func main() {
    var mu sync.RWMutex
    var items = make(map[string][]byte)
    var ps redcon.PubSub
    go log.Printf("started server at %s", addr)
    err := redcon.ListenAndServe(addr,
    func(conn redcon.Conn, cmd redcon.Command) {
    switch strings.ToLower(string(cmd.Args[0])) {
    default:
    conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
    case "ping":
    conn.WriteString("PONG")
    case "quit":
    conn.WriteString("OK")
    conn.Close()
    case "set":
    if len(cmd.Args) != 3 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.Lock()
    items[string(cmd.Args[1])] = cmd.Args[2]
    mu.Unlock()
    conn.WriteString("OK")
    case "get":
    if len(cmd.Args) != 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.RLock()
    val, ok := items[string(cmd.Args[1])]
    mu.RUnlock()
    if !ok {
    conn.WriteNull()
    } else {
    conn.WriteBulk(val)
    }
    case "del":
    if len(cmd.Args) != 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.Lock()
    _, ok := items[string(cmd.Args[1])]
    delete(items, string(cmd.Args[1]))
    mu.Unlock()
    if !ok {
    conn.WriteInt(0)
    } else {
    conn.WriteInt(1)
    }
    case "publish":
    if len(cmd.Args) != 3 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    conn.WriteInt(ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])))
    case "subscribe", "psubscribe":
    if len(cmd.Args) < 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    command := strings.ToLower(string(cmd.Args[0]))
    for i := 1; i < len(cmd.Args); i++ {
    if command == "psubscribe" {
    ps.Psubscribe(conn, string(cmd.Args[i]))
    } else {
    ps.Subscribe(conn, string(cmd.Args[i]))
    }
    }
    }
    },
    func(conn redcon.Conn) bool {
    // Use this function to accept or deny the connection.
    // log.Printf("accept: %s", conn.RemoteAddr())
    return true
    },
    func(conn redcon.Conn, err error) {
    // This is called when the connection has been closed
    // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
    },
    )
    if err != nil {
    log.Fatal(err)
    }
    }

    下面看下源码实现,源码很简单,主要是两个文件:redcon/redcon.go,redcon/resp.go前者实现了网络连接的包装,后者实现了redis协议。依赖了两个网络包https://github.com/tidwall/btree,https://github.com/tidwall/match

    我们还是从例子的入口函数ListenAndServe开始学习

    func ListenAndServe(addr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) error {
    return ListenAndServeNetwork("tcp", addr, handler, accept, closed)
    }

    传入了4个参数,地址、服务handler(服务核心逻辑实现的地方,处理请求并返回结果)、accept函数和close函数。核心逻辑只是对ListenAndServeNetwork的一个包装,确定了网络协议是tcp协议

    func ListenAndServeNetwork(
    net, laddr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) error {
    return NewServerNetwork(net, laddr, handler, accept, closed).ListenAndServe()
    }

    NewServerNetwort函数初始化了server,最终调用的是server的ListenAndServe()函数。

    func NewServerNetwork(
    net, laddr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) *Server {
    if handler == nil {
    panic("handler is nil")
    }
    s := &Server{
    net: net,
    laddr: laddr,
    handler: handler,
    accept: accept,
    closed: closed,
    conns: make(map[*conn]bool),
    }
    return s
    }
    func (s *Server) ListenAndServe() error {
    return s.ListenServeAndSignal(nil)
    }
    func (s *Server) ListenServeAndSignal(signal chan error) error {
    ln, err := net.Listen(s.net, s.laddr)
    if err != nil {
    if signal != nil {
    signal <- err
    }
    return err
    }
    s.ln = ln
    if signal != nil {
    signal <- nil
    }
    return serve(s)
    }

    在这里初始化了网络连接,侦听网络端口,最后调用serve服务

    func serve(s *Server) error {
          for {
    lnconn, err := s.ln.Accept()
    if s.accept != nil && !s.accept(c) {
    go handle(s, c)
              }
            }
    }

    serve是整个服务的大循环,里面不断accept请求,对每个连接,启用一个协程去处理请求内容。

    func handle(s *Server, c *conn) {
    for {
    cmds, err := c.rd.readCommands(nil)
    for len(c.cmds) > 0 {
    cmd := c.cmds[0]
    s.handler(c, cmd)
          }
    }

    在handle函数内部调用server的handler去处理服务端请求的内容。至此整个服务端的框架基本介绍完毕。里面还封装了一套TLS的server逻辑,内容基本相似。

    func NewServerTLS(addr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    config *tls.Config,
    ) *TLSServer {
    return NewServerNetworkTLS("tcp", addr, handler, accept, closed, config)
    }

    下面重点介绍下handler函数,它是server结构体的一个属性

    type Server struct {
    mu sync.Mutex
    net string
    laddr string
    handler func(conn Conn, cmd Command)
    accept func(conn Conn) bool
    closed func(conn Conn, err error)
    conns map[*conn]bool
    ln net.Listener
    done bool
    idleClose time.Duration
    // AcceptError is an optional function used to handle Accept errors.
    AcceptError func(err error)
    }

     有两个参数Conn 网络连接、Command请求参数

    type Conn interface {}
    type conn struct {
    conn net.Conn
    wr *Writer
    rd *Reader
    addr string
    ctx interface{}
    detached bool
    closed bool
    cmds []Command
    idleClose time.Duration
    }

    包裹了网络连接和reader、writer

    redis协议resp的定义如下

    type RESP struct {
    Type Type
    Raw []byte
    Data []byte
    Count int
    }

    并且也实现了相关协议的解析函数

        func ReadNextRESP(b []byte) (n int, resp RESP) 
    func ReadNextCommand(packet []byte, argsbuf [][]byte)
    func readTelnetCommand(packet []byte, argsbuf [][]byte)
    func AppendAny(b []byte, v interface{}) []byte

    redcon只是一个server框架,基于这个框架,我们可以向开发httpserver一样非常方便地开发出一个兼容redis协议的服务端。

  • 相关阅读:
    Luogu 1080 【NOIP2012】国王游戏 (贪心,高精度)
    Luogu 1314 【NOIP2011】聪明的质检员 (二分)
    Luogu 1315 【NOIP2011】观光公交 (贪心)
    Luogu 1312 【NOIP2011】玛雅游戏 (搜索)
    Luogu 1525 【NOIP2010】关押罪犯 (贪心,并查集)
    Luogu 1514 引水入城 (搜索,动态规划)
    UVA 1394 And Then There Was One / Gym 101415A And Then There Was One / UVAlive 3882 And Then There Was One / POJ 3517 And Then There Was One / Aizu 1275 And Then There Was One (动态规划,思维题)
    Luogu 1437 [HNOI2004]敲砖块 (动态规划)
    Luogu 1941 【NOIP2014】飞扬的小鸟 (动态规划)
    HDU 1176 免费馅饼 (动态规划)
  • 原文地址:https://www.cnblogs.com/ExMan/p/15574047.html
Copyright © 2011-2022 走看看