zoukankan      html  css  js  c++  java
  • golang源码分析:redcon基于redis协议的框架

    https://github.com/tidwall/redcon 是一个 Go实现 的 Redis 兼容服务器框架。它实现了redis协议,封装了网络连接,我们可以基于这个库快速实现一个基于redis协议的服务器。简单的redis服务器https://github.com/redis-go/redis 就是基于这个包实现的。

    package main


    import (
    "log"
    "strings"
    "sync"


    "github.com/tidwall/redcon"
    )


    var addr = ":6380"


    func main() {
    var mu sync.RWMutex
    var items = make(map[string][]byte)
    var ps redcon.PubSub
    go log.Printf("started server at %s", addr)
    err := redcon.ListenAndServe(addr,
    func(conn redcon.Conn, cmd redcon.Command) {
    switch strings.ToLower(string(cmd.Args[0])) {
    default:
    conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
    case "ping":
    conn.WriteString("PONG")
    case "quit":
    conn.WriteString("OK")
    conn.Close()
    case "set":
    if len(cmd.Args) != 3 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.Lock()
    items[string(cmd.Args[1])] = cmd.Args[2]
    mu.Unlock()
    conn.WriteString("OK")
    case "get":
    if len(cmd.Args) != 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.RLock()
    val, ok := items[string(cmd.Args[1])]
    mu.RUnlock()
    if !ok {
    conn.WriteNull()
    } else {
    conn.WriteBulk(val)
    }
    case "del":
    if len(cmd.Args) != 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    mu.Lock()
    _, ok := items[string(cmd.Args[1])]
    delete(items, string(cmd.Args[1]))
    mu.Unlock()
    if !ok {
    conn.WriteInt(0)
    } else {
    conn.WriteInt(1)
    }
    case "publish":
    if len(cmd.Args) != 3 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    conn.WriteInt(ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])))
    case "subscribe", "psubscribe":
    if len(cmd.Args) < 2 {
    conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
    return
    }
    command := strings.ToLower(string(cmd.Args[0]))
    for i := 1; i < len(cmd.Args); i++ {
    if command == "psubscribe" {
    ps.Psubscribe(conn, string(cmd.Args[i]))
    } else {
    ps.Subscribe(conn, string(cmd.Args[i]))
    }
    }
    }
    },
    func(conn redcon.Conn) bool {
    // Use this function to accept or deny the connection.
    // log.Printf("accept: %s", conn.RemoteAddr())
    return true
    },
    func(conn redcon.Conn, err error) {
    // This is called when the connection has been closed
    // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
    },
    )
    if err != nil {
    log.Fatal(err)
    }
    }

    下面看下源码实现,源码很简单,主要是两个文件:redcon/redcon.go,redcon/resp.go前者实现了网络连接的包装,后者实现了redis协议。依赖了两个网络包https://github.com/tidwall/btree,https://github.com/tidwall/match

    我们还是从例子的入口函数ListenAndServe开始学习

    func ListenAndServe(addr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) error {
    return ListenAndServeNetwork("tcp", addr, handler, accept, closed)
    }

    传入了4个参数,地址、服务handler(服务核心逻辑实现的地方,处理请求并返回结果)、accept函数和close函数。核心逻辑只是对ListenAndServeNetwork的一个包装,确定了网络协议是tcp协议

    func ListenAndServeNetwork(
    net, laddr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) error {
    return NewServerNetwork(net, laddr, handler, accept, closed).ListenAndServe()
    }

    NewServerNetwort函数初始化了server,最终调用的是server的ListenAndServe()函数。

    func NewServerNetwork(
    net, laddr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    ) *Server {
    if handler == nil {
    panic("handler is nil")
    }
    s := &Server{
    net: net,
    laddr: laddr,
    handler: handler,
    accept: accept,
    closed: closed,
    conns: make(map[*conn]bool),
    }
    return s
    }
    func (s *Server) ListenAndServe() error {
    return s.ListenServeAndSignal(nil)
    }
    func (s *Server) ListenServeAndSignal(signal chan error) error {
    ln, err := net.Listen(s.net, s.laddr)
    if err != nil {
    if signal != nil {
    signal <- err
    }
    return err
    }
    s.ln = ln
    if signal != nil {
    signal <- nil
    }
    return serve(s)
    }

    在这里初始化了网络连接,侦听网络端口,最后调用serve服务

    func serve(s *Server) error {
          for {
    lnconn, err := s.ln.Accept()
    if s.accept != nil && !s.accept(c) {
    go handle(s, c)
              }
            }
    }

    serve是整个服务的大循环,里面不断accept请求,对每个连接,启用一个协程去处理请求内容。

    func handle(s *Server, c *conn) {
    for {
    cmds, err := c.rd.readCommands(nil)
    for len(c.cmds) > 0 {
    cmd := c.cmds[0]
    s.handler(c, cmd)
          }
    }

    在handle函数内部调用server的handler去处理服务端请求的内容。至此整个服务端的框架基本介绍完毕。里面还封装了一套TLS的server逻辑,内容基本相似。

    func NewServerTLS(addr string,
    handler func(conn Conn, cmd Command),
    accept func(conn Conn) bool,
    closed func(conn Conn, err error),
    config *tls.Config,
    ) *TLSServer {
    return NewServerNetworkTLS("tcp", addr, handler, accept, closed, config)
    }

    下面重点介绍下handler函数,它是server结构体的一个属性

    type Server struct {
    mu sync.Mutex
    net string
    laddr string
    handler func(conn Conn, cmd Command)
    accept func(conn Conn) bool
    closed func(conn Conn, err error)
    conns map[*conn]bool
    ln net.Listener
    done bool
    idleClose time.Duration
    // AcceptError is an optional function used to handle Accept errors.
    AcceptError func(err error)
    }

     有两个参数Conn 网络连接、Command请求参数

    type Conn interface {}
    type conn struct {
    conn net.Conn
    wr *Writer
    rd *Reader
    addr string
    ctx interface{}
    detached bool
    closed bool
    cmds []Command
    idleClose time.Duration
    }

    包裹了网络连接和reader、writer

    redis协议resp的定义如下

    type RESP struct {
    Type Type
    Raw []byte
    Data []byte
    Count int
    }

    并且也实现了相关协议的解析函数

        func ReadNextRESP(b []byte) (n int, resp RESP) 
    func ReadNextCommand(packet []byte, argsbuf [][]byte)
    func readTelnetCommand(packet []byte, argsbuf [][]byte)
    func AppendAny(b []byte, v interface{}) []byte

    redcon只是一个server框架,基于这个框架,我们可以向开发httpserver一样非常方便地开发出一个兼容redis协议的服务端。

  • 相关阅读:
    .NET5微服务示例-Ocelot网关
    .NET5微服务示例-Polly熔断与降级
    .NET5微服务示例-Consul注册中心
    .NET下使用ELK日志中心
    [ 题解 ] [ 数学 ] [ JZOJ5809 ] 数羊
    [ 题解 ] [ 数学 ] 函数 (sequence) (欧拉函数)
    [ 题解 ] [ JZOJ5777 ] 小 x 玩游戏
    更换谷歌浏览器视频输入源
    axios 封装及 API 接口管理
    小程序代码压缩实践
  • 原文地址:https://www.cnblogs.com/ExMan/p/15574047.html
Copyright © 2011-2022 走看看