zoukankan      html  css  js  c++  java
  • go使用go-redis操作redis 连接类型,pipline, 发布订阅

    内容:

    一 . 客户端Client(普通模式,主从模式,哨兵模式)
    二. conn连接(连接, pipline, 发布订阅等)
    三. 示例程序(连接, pipline, 发布订阅等)
    客户端
    Client 普通模式的客户端
    go redis依据用途提供了多种客户端创建的函数, 如下:

    func NewClient(opt *Options) *Client
    func NewFailoverClient(failoverOpt *FailoverOptions) *Client
    func (c *Client) Context() context.Context
    func (c *Client) Do(args ...interface{}) *Cmd
    func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd
    func (c *Client) Options() *Options
    func (c *Client) PSubscribe(channels ...string) *PubSub
    func (c *Client) Pipeline() Pipeliner
    func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)
    func (c *Client) PoolStats() *PoolStats
    func (c *Client) Process(cmd Cmder) error
    func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error
    func (c *Client) SetLimiter(l Limiter) *Client
    func (c *Client) Subscribe(channels ...string) *PubSub
    func (c *Client) TxPipeline() Pipeliner
    func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)
    func (c *Client) Watch(fn func(*Tx) error, keys ...string) error
    func (c *Client) WithContext(ctx context.Context) *Client
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    NewClient 创建一个普通连接

    NewFailoverClient 具有故障检测以及故障转移的client

    PSubscribe / Subscribe 发布订阅模式的client

    Pipeline 启用pipline管道模式的client

    PoolStats 连接池状态

    Close 关闭连接

    集群模式的ClusterClient
    func NewClusterClient(opt *ClusterOptions) *ClusterClient
    func (c *ClusterClient) Close() error
    func (c *ClusterClient) Context() context.Context
    func (c *ClusterClient) DBSize() *IntCmd
    func (c *ClusterClient) Do(args ...interface{}) *Cmd
    func (c *ClusterClient) DoContext(ctx context.Context, args ...interface{}) *Cmd
    func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error
    func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error
    func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error
    func (c *ClusterClient) Options() *ClusterOptions
    func (c *ClusterClient) PSubscribe(channels ...string) *PubSub
    func (c *ClusterClient) Pipeline() Pipeliner
    func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)
    func (c *ClusterClient) PoolStats() *PoolStats
    func (c *ClusterClient) Process(cmd Cmder) error
    func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error
    func (c *ClusterClient) ReloadState() error
    func (c *ClusterClient) Subscribe(channels ...string) *PubSub
    func (c *ClusterClient) TxPipeline() Pipeliner
    func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)
    func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error
    func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    方法与client大致相同

    哨兵SentinelClient
    func NewSentinelClient(opt *Options) *SentinelClient
    func (c *SentinelClient) CkQuorum(name string) *StringCmd
    func (c SentinelClient) Close() error
    func (c *SentinelClient) Context() context.Context
    func (c *SentinelClient) Failover(name string) *StatusCmd
    func (c *SentinelClient) FlushConfig() *StatusCmd
    func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd
    func (c *SentinelClient) Master(name string) *StringStringMapCmd
    func (c *SentinelClient) Masters() *SliceCmd
    func (c *SentinelClient) Monitor(name, ip, port, quorum string) *StringCmd
    func (c *SentinelClient) PSubscribe(channels ...string) *PubSub
    func (c *SentinelClient) Ping() *StringCmd
    func (c *SentinelClient) Process(cmd Cmder) error
    func (c *SentinelClient) ProcessContext(ctx context.Context, cmd Cmder) error
    func (c *SentinelClient) Remove(name string) *StringCmd
    func (c *SentinelClient) Reset(pattern string) *IntCmd
    func (c *SentinelClient) Sentinels(name string) *SliceCmd
    func (c *SentinelClient) Set(name, option, value string) *StringCmd
    func (c *SentinelClient) Slaves(name string) *SliceCmd
    func (c SentinelClient) String() string
    func (c *SentinelClient) Subscribe(channels ...string) *PubSub
    func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    连接:
    创建量客户端之后,需要与redis建立连接才能够发送请求进行使用

    在连接层, 提供了很多封装后的函数

    普通连接Conn
    例如:

    涉及很多函数, 函数名称基本可以通过redis的命令进行类比找到,不一一列举

    func (c Conn) Append(key, value string) *IntCmd
    func (c Conn) Auth(password string) *StatusCmd
    func (c Conn) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd
    func (c Conn) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd
    func (c Conn) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd
    func (c Conn) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
    func (c Conn) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
    func (c Conn) BgRewriteAOF() *StatusCmd
    func (c Conn) BgSave() *StatusCmd
    func (c Conn) BitCount(key string, bitCount *BitCount) *IntCmd
    func (c Conn) BitField(key string, args ...interface{}) *IntSliceCmd
    func (c Conn) BitOpAnd(destKey string, keys ...string) *IntCmd

    ....

    func (c Conn) HKeys(key string) *StringSliceCmd
    func (c Conn) HLen(key string) *IntCmd
    func (c Conn) HMGet(key string, fields ...string) *SliceCmd
    func (c Conn) HMSet(key string, fields map[string]interface{}) *StatusCmd
    func (c Conn) HScan(key string, cursor uint64, match string, count int64) *ScanCmd
    func (c Conn) HSet(key, field string, value interface{}) *BoolCmd
    func (c Conn) HSetNX(key, field string, value interface{}) *BoolCmd
    func (c Conn) HVals(key string) *StringSliceCmd
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    Pipeline
    pipline 管道批量执行命令,可以节约带宽

    提供的方法与conn基本一致

    func (c Pipeline) Append(key, value string) *IntCmd
    func (c Pipeline) Auth(password string) *StatusCmd
    func (c Pipeline) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd
    func (c Pipeline) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd
    func (c Pipeline) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd
    func (c Pipeline) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
    func (c Pipeline) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
    func (c Pipeline) BgRewriteAOF() *StatusCmd

    .....


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    PubSub
    发布订阅模式

    func (c *PubSub) Channel() <-chan *Message
    func (c *PubSub) ChannelSize(size int) <-chan *Message
    func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{}
    func (c *PubSub) Close() error
    func (c *PubSub) PSubscribe(patterns ...string) error
    func (c *PubSub) PUnsubscribe(patterns ...string) error
    func (c *PubSub) Ping(payload ...string) error
    func (c *PubSub) Receive() (interface{}, error)
    func (c *PubSub) ReceiveMessage() (*Message, error)
    func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error)
    func (c *PubSub) String() string
    func (c *PubSub) Subscribe(channels ...string) error
    func (c *PubSub) Unsubscribe(channels ...string) error
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    可以建立channel Channel()

    设置超时时间ReceiveTimeout

    Subscribe订阅

    Unsubscribe取消订阅

    PSubscribe 发布消息等

    排他 TX
    func (c Tx) Get(key string) *StringCmd
    func (c Tx) GetBit(key string, offset int64) *IntCmd
    func (c Tx) GetRange(key string, start, end int64) *StringCmd
    func (c Tx) GetSet(key string, value interface{}) *StringCmd
    func (c Tx) HDel(key string, fields ...string) *IntCmd
    func (c Tx) HExists(key, field string) *BoolCmd
    func (c Tx) HGet(key, field string) *StringCmd
    func (c Tx) HGetAll(key string) *StringStringMapCmd
    func (c Tx) HIncrBy(key, field string, incr int64) *IntCmd
    func (c Tx) HIncrByFloat(key, field string, incr float64) *FloatCmd
    func (c Tx) HKeys(key string) *StringSliceCmd
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    在conn的基础上加入排他性功能

    提供的函数及方法与conn基本相同, 可以参考conn进行使用

    示例程序
    示例:

    func ExampleClient() {
    redisdb := redis.NewClient(&redis.Options{
    Addr: "192.168.137.18:6379",
    Password: "", // no password set
    DB: 0, // use default DB
    })

    err := redisdb.Set("key", "value", 0).Err()
    if err != nil {
    panic(err)
    }

    val, err := redisdb.Get("key").Result()
    if err != nil {
    panic(err)
    }
    fmt.Println("key", val)

    val2, err := redisdb.Get("missing_key").Result()
    if err == redis.Nil {
    fmt.Println("missing_key does not exist")
    } else if err != nil {
    panic(err)
    } else {
    fmt.Println("missing_key", val2)
    }

    }

    func main() {
    ExampleClient()
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    输出:

    key value
    missing_key does not exist
    1
    2
    NewClient创建连接:
    redisdb := redis.NewClient(&redis.Options{
    Addr: "localhost:6379", // use default Addr
    Password: "", // no password set
    DB: 0, // use default DB
    })

    pong, err := redisdb.Ping().Result()
    fmt.Println(pong, err)
    1
    2
    3
    4
    5
    6
    7
    8
    如果使用tls/ssl 则在Options参数 TLSConfig *tls.Config 中指定

    主从故障转移NewFailoverClient
    redisdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName: "master",
    SentinelAddrs: []string{":26379"},
    })
    redisdb.Ping()
    1
    2
    3
    4
    5
    集群NewClusterClient
    redisdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    redisdb.Ping()
    1
    2
    3
    4
    url样式建立连接 ParseURL
    opt, err := redis.ParseURL("redis://:qwerty@localhost:6379/1")
    if err != nil {
    panic(err)
    }
    fmt.Println("addr is", opt.Addr)
    fmt.Println("db is", opt.DB)
    fmt.Println("password is", opt.Password)

    // Create client as usually.
    _ = redis.NewClient(opt)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    pipline
    pipe := redisdb.Pipeline()

    incr := pipe.Incr("pipeline_counter")
    pipe.Expire("pipeline_counter", time.Hour)

    // Execute
    //
    // INCR pipeline_counter
    // EXPIRE pipeline_counts 3600
    //
    // using one redisdb-server roundtrip.
    _, err := pipe.Exec()
    fmt.Println(incr.Val(), err)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    Pipelined

    var incr *redis.IntCmd
    _, err := redisdb.Pipelined(func(pipe redis.Pipeliner) error {
    incr = pipe.Incr("pipelined_counter")
    pipe.Expire("pipelined_counter", time.Hour)
    return nil
    })
    fmt.Println(incr.Val(), err)
    1
    2
    3
    4
    5
    6
    7
    排他TxPipeline

    pipe := redisdb.TxPipeline()

    incr := pipe.Incr("tx_pipeline_counter")
    pipe.Expire("tx_pipeline_counter", time.Hour)

    // Execute
    //
    // MULTI
    // INCR pipeline_counter
    // EXPIRE pipeline_counts 3600
    // EXEC
    //
    // using one redisdb-server roundtrip.
    _, err := pipe.Exec()
    fmt.Println(incr.Val(), err)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    rdb := redis.NewClient(&redis.Options{
    Addr: ":6379",
    })
    rdb.AddHook(redisHook{})

    rdb.Pipelined(func(pipe redis.Pipeliner) error {
    pipe.Ping()
    pipe.Ping()
    return nil
    })
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    发布订阅
    sub := client.Subscribe(queryResp)
    iface, err := sub.Receive()
    if err != nil {
    // handle error
    }

    // Should be *Subscription, but others are possible if other actions have been
    // taken on sub since it was created.
    switch iface.(type) {
    case *Subscription:
    // subscribe succeeded
    case *Message:
    // received first message
    case *Pong:
    // pong received
    default:
    // handle error
    }

    ch := sub.Channel()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    示例程序

    package main

    import (
    "github.com/go-redis/redis"
    "time"
    "fmt"
    )


    func ExampleClient() {
    redisdb := redis.NewClient(&redis.Options{
    Addr: "192.168.137.18:6379",
    Password: "", // no password set
    DB: 0, // use default DB
    })
    //rdb.AddHook()

    pubsub := redisdb.Subscribe("mychannel1")

    // Wait for confirmation that subscription is created before publishing anything.
    _, err := pubsub.Receive()
    if err != nil {
    panic(err)
    }

    // Go channel which receives messages.
    ch := pubsub.Channel()

    // Publish a message.
    err = redisdb.Publish("mychannel1", "hello").Err()
    if err != nil {
    panic(err)
    }

    time.AfterFunc(time.Second, func() {
    // When pubsub is closed channel is closed too.
    _ = pubsub.Close()
    })

    // Consume messages.
    for msg := range ch {
    fmt.Println(msg.Channel, msg.Payload)
    }
    }

    func main() {
    ExampleClient()
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    输出:

    mychannel1 hello
    1
    示例2

    func ExampleClient2() {
    redisdb := redis.NewClient(&redis.Options{
    Addr: "192.168.137.18:6379",
    Password: "", // no password set
    DB: 0, // use default DB
    })
    //rdb.AddHook()
    pubsub := redisdb.Subscribe("mychannel2")
    defer pubsub.Close()

    for i := 0; i < 2; i++ {
    // ReceiveTimeout is a low level API. Use ReceiveMessage instead.
    msgi, err := pubsub.ReceiveTimeout(time.Second)
    if err != nil {
    break
    }

    switch msg := msgi.(type) {
    case *redis.Subscription:
    fmt.Println("subscribed to", msg.Channel)

    _, err := redisdb.Publish("mychannel2", "hello").Result()
    if err != nil {
    panic(err)
    }
    case *redis.Message:
    fmt.Println("received", msg.Payload, "from", msg.Channel)
    default:
    panic("unreached")
    }
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    输出:

    subscribed to mychannel2
    received hello from mychannel2
    ————————————————
    版权声明:本文为CSDN博主「comprel」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/comprel/article/details/96716708

  • 相关阅读:
    shell 10流程控制
    shell 9test命令
    shell 8字符串与文件内容处理
    shell 7输入输出
    shell 6基本运算符
    JS-JQ实现TAB选项卡
    JS-JQ实现页面滚动时元素智能定位(顶部-其他部位)
    js获取框架(IFrame)的内容
    codeforces 660C C. Hard Process(二分)
    codeforces 660B B. Seating On Bus(模拟)
  • 原文地址:https://www.cnblogs.com/ExMan/p/11493192.html
Copyright © 2011-2022 走看看