zoukankan      html  css  js  c++  java
  • golang 操作 Redis & Mysql & RabbitMQ

    golang 操作 Redis & Mysql & RabbitMQ

    Reids

    安装导入

    go get github.com/garyburd/redigo/redis
    import "github.com/garyburd/redigo/redis"

    链接

    github:https://github.com/antirez/redis

    Doc:http://godoc.org/github.com/garyburd/redigo/redis

    Redis全套使用:http://www.cnblogs.com/suoning/p/5807247.html

    使用

    连接

    import "github.com/garyburd/redigo/redis"
    
    func main() {
        c, err := redis.Dial("tcp", "localhost:6379")
        if err != nil {
            fmt.Println("conn redis failed, err:", err)
            return
        }
        defer c.Close()
    }

    set & get

            _, err = c.Do("Set", "name", "nick")
        if err != nil {
            fmt.Println(err)
            return
        }
    
        r, err := redis.String(c.Do("Get", "name"))
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(r)

    mset & mget

    批量设置

            _, err = c.Do("MSet", "name", "nick", "age", "18")
        if err != nil {
            fmt.Println("MSet error: ", err)
            return
        }
    
        r2, err := redis.Strings(c.Do("MGet", "name", "age"))
        if err != nil {
            fmt.Println("MGet error: ", err)
            return
        }
        fmt.Println(r2)

    hset & hget

    hash操作

        _, err = c.Do("HSet", "names", "nick", "suoning")
        if err != nil {
            fmt.Println("hset error: ", err)
            return
        }
    
        r, err = redis.String(c.Do("HGet", "names", "nick"))
        if err != nil {
            fmt.Println("hget error: ", err)
            return
        }
        fmt.Println(r)

    expire

    设置过期时间

        _, err = c.Do("expire", "names", 5)
        if err != nil {
            fmt.Println("expire error: ", err)
            return
        }

    lpush & lpop & llen

    队列

        // 队列
        _, err = c.Do("lpush", "Queue", "nick", "dawn", 9)
        if err != nil {
            fmt.Println("lpush error: ", err)
            return
        }
        for {
            r, err = redis.String(c.Do("lpop", "Queue"))
            if err != nil {
                fmt.Println("lpop error: ", err)
                break
            }
            fmt.Println(r)
        }
        r3, err := redis.Int(c.Do("llen", "Queue"))
        if err != nil {
            fmt.Println("llen error: ", err)
            return
        }

    连接池

    各参数的解释如下:

    MaxIdle:最大的空闲连接数,表示即使没有redis连接时依然可以保持N个空闲的连接,而不被清除,随时处于待命状态。

    MaxActive:最大的激活连接数,表示同时最多有N个连接

    IdleTimeout:最大的空闲连接等待时间,超过此时间后,空闲连接将被关闭

        pool := &redis.Pool{
            MaxIdle:     16,
            MaxActive:   1024,
            IdleTimeout: 300,
            Dial: func() (redis.Conn, error) {
                return redis.Dial("tcp", "localhost:6379")
            },
        }

    连接池栗子

    package main
    
    import (
        "fmt"
    
        "github.com/garyburd/redigo/redis"
    )
    
    var pool *redis.Pool
    
    func init() {
        pool = &redis.Pool{
            MaxIdle:     16,
            MaxActive:   1024,
            IdleTimeout: 300,
            Dial: func() (redis.Conn, error) {
                return redis.Dial("tcp", "localhost:6379")
            },
        }
    }
    
    func main() {
        c := pool.Get()
        defer c.Close()
    
        _, err := c.Do("Set", "name", "nick")
        if err != nil {
            fmt.Println(err)
            return
        }
    
        r, err := redis.String(c.Do("Get", "name"))
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(r)
    }

    管道操作

    请求/响应服务可以实现持续处理新请求,客户端可以发送多个命令到服务器而无需等待响应,最后在一次读取多个响应。

    使用Send(),Flush(),Receive()方法支持管道化操作

    Send向连接的输出缓冲中写入命令。

    Flush将连接的输出缓冲清空并写入服务器端。

    Recevie按照FIFO顺序依次读取服务器的响应。

    func main() {
        c, err := redis.Dial("tcp", "localhost:6379")
        if err != nil {
            fmt.Println("conn redis failed, err:", err)
            return
        }
        defer c.Close()
    
        c.Send("SET", "name1", "sss1")
        c.Send("SET", "name2", "sss2")
    
        c.Flush()
    
        v, err := c.Receive()
        fmt.Printf("v:%v,err:%v
    ", v, err)
        v, err = c.Receive()
        fmt.Printf("v:%v,err:%v
    ", v, err)
    
        v, err = c.Receive()    // 夯住,一直等待
        fmt.Printf("v:%v,err:%v
    ", v, err)
    }

    Mysql

    安装导入

    go get "github.com/go-sql-driver/mysql"
    go get "github.com/jmoiron/sqlx"
    
    import (
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )

    链接:

    github:

      https://github.com/go-sql-driver/mysql

      https://github.com/jmoiron/sqlx

    Doc:

          http://godoc.org/github.com/jmoiron/sqlx

      http://jmoiron.github.io/sqlx/

    Mysql全套使用:http://www.cnblogs.com/suoning/p/5769141.html

    连接

    import (
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )
    
    var Db *sqlx.DB
    
    func init() {
    
        database, err := sqlx.Open("mysql", "root:@tcp(127.0.0.1:3306)/test")
        if err != nil {
            fmt.Println("open mysql failed,", err)
            return
        }
    
        Db = database
    }

    栗子建表

    CREATE TABLE `person` (
      `user_id` int(128) DEFAULT NULL,
      `username` varchar(255) DEFAULT NULL,
      `sex` varchar(16) DEFAULT NULL,
      `email` varchar(128) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    栗子(insert)

    package main
    
    import (
        "fmt"
    
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )
    
    type Person struct {
        UserId   int    `db:"user_id"`
        Username string `db:"username"`
        Sex      string `db:"sex"`
        Email    string `db:"email"`
    }
    
    var Db *sqlx.DB
    
    func init() {
        database, err := sqlx.Open("mysql", "root:@tcp(127.0.0.1:3306)/test")
        if err != nil {
            fmt.Println("open mysql failed,", err)
            return
        }
        Db = database
    }
    
    func main() {
        r, err := Db.Exec("insert into person(username, sex, email)values(?, ?, ?)", "suoning", "man", "suoning@net263.com")
        if err != nil {
            fmt.Println("exec failed, ", err)
            return
        }
        id, err := r.LastInsertId()
        if err != nil {
            fmt.Println("exec failed, ", err)
            return
        }
    
        fmt.Println("insert succ:", id)
    }

    栗子(update)

    package main
    
    import (
        "fmt"
    
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )
    
    type Person struct {
        UserId   int    `db:"user_id"`
        Username string `db:"username"`
        Sex      string `db:"sex"`
        Email    string `db:"email"`
    }
    
    var Db *sqlx.DB
    
    func init() {
    
        database, err := sqlx.Open("mysql", "root:@tcp(127.0.0.1:3306)/test")
        if err != nil {
            fmt.Println("open mysql failed,", err)
            return
        }
    
        Db = database
    }
    
    func main() {
    
        _, err := Db.Exec("update person set user_id=? where username=?", 20170808, "suoning")
        if err != nil {
            fmt.Println("exec failed, ", err)
            return
        }
    
    }

    栗子(select)

    package main
    
    import (
        "fmt"
    
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )
    
    type Person struct {
        UserId   int    `db:"user_id"`
        Username string `db:"username"`
        Sex      string `db:"sex"`
        Email    string `db:"email"`
    }
    
    type Place struct {
        Country string `db:"country"`
        City    string `db:"city"`
        TelCode int    `db:"telcode"`
    }
    
    var Db *sqlx.DB
    
    func init() {
    
        database, err := sqlx.Open("mysql", "root:@tcp(127.0.0.1:3306)/test")
        if err != nil {
            fmt.Println("open mysql failed,", err)
            return
        }
    
        Db = database
    }
    
    func main() {
    
        var person []Person
        err := Db.Select(&person, "select user_id, username, sex, email from person where user_id=?", 1)
        if err != nil {
            fmt.Println("exec failed, ", err)
            return
        }
        fmt.Println("select succ:", person)
    
        people := []Person{}
        Db.Select(&people, "SELECT * FROM person ORDER BY user_id ASC")
        fmt.Println(people)
        jason, john := people[0], people[1]
        fmt.Printf("%#v
    %#v", jason, john)
    }

    栗子(delete)

    package main
    
    import (
        "fmt"
    
        _ "github.com/go-sql-driver/mysql"
        "github.com/jmoiron/sqlx"
    )
    
    type Person struct {
        UserId   int    `db:"user_id"`
        Username string `db:"username"`
        Sex      string `db:"sex"`
        Email    string `db:"email"`
    }
    
    var Db *sqlx.DB
    
    func init() {
    
        database, err := sqlx.Open("mysql", "root:@tcp(127.0.0.1:3306)/test")
        if err != nil {
            fmt.Println("open mysql failed,", err)
            return
        }
    
        Db = database
    }
    
    func main() {
    
        _, err := Db.Exec("delete from person where username=? limit 1", "suoning")
        if err != nil {
            fmt.Println("exec failed, ", err)
            return
        }
    
        fmt.Println("delete succ")
    }

    RabbitMQ

    安装

    go get "github.com/streadway/amqp"

    文档:

    https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

    栗子一(普通模式)

    生产者:

    package main
    
    import (
        "fmt"
        "log"
        "os"
        "strings"
    
        "github.com/streadway/amqp"
        "time"
    )
    
    /*
    默认点对点模式
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        // 连接
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        // 打开一个并发服务器通道来处理消息
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 申明一个队列
        q, err := ch.QueueDeclare(
            "task_queue", // name
            true,         // durable  持久性的,如果事前已经声明了该队列,不能重复声明
            false,        // delete when unused
            false,        // exclusive 如果是真,连接一断开,队列删除
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        body := bodyFrom(os.Args)
    
        // 发布
        err = ch.Publish(
            "",     // exchange 默认模式,exchange为空
            q.Name,           // routing key 默认模式路由到同名队列,即是task_queue
            false,  // mandatory
            false,
            amqp.Publishing{
                // 持久性的发布,因为队列被声明为持久的,发布消息必须加上这个(可能不用),但消息还是可能会丢,如消息到缓存但MQ挂了来不及持久化。
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte(body),
            })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = fmt.Sprintf("%s-%v","hello", time.Now())
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }

    消费者:

    package main
    
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    /*
    默认点对点模式
    工作方,多个,拿发布方的消息
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 指定队列!
        q, err := ch.QueueDeclare(
            "task_queue", // name
            true,         // durable
            false,        // delete when unused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        // Fair dispatch 预取,每个工作方每次拿一个消息,确认后才拿下一次,缓解压力
        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")
    
        // 消费根据队列名
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack   设置为真自动确认消息
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
    
                // 确认消息被收到!!如果为真的,那么同在一个channel,在该消息之前未确认的消息都会确认,适合批量处理
                // 真时场景:每十条消息确认一次,类似
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
    }

    栗子二(订阅模式)

    订阅 生产者:

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "os"
        "strings"
        "time"
    )
    
    /*
    广播模式
    发布方
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type 广播模式
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        body := bodyFrom(os.Args)
    
        // 发布
        err = ch.Publish(
            "logs", // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃
            "",     // routing key  广播模式不需要这个,它会把所有消息路由到绑定的所有队列
            false,  // mandatory
            false,  // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    
        log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = fmt.Sprintf("%s-%v","hello", time.Now())
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }

    订阅 消费者:

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
    )
    
    /*
    广播模式
    订阅方
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 同样要申明交换机
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        // 新建队列,这个队列没名字,随机生成一个名字
        q, err := ch.QueueDeclare(
            "",    // name
            false, // durable
            false, // delete when usused
            true,  // exclusive  表示连接一断开,这个队列自动删除
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        // 队列和交换机绑定,即是队列订阅了发到这个交换机的消息
        err = ch.QueueBind(
            q.Name, // queue name  队列的名字
            "",     // routing key  广播模式不需要这个
            "logs", // exchange  交换机名字
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        // 开始消费消息,可开多个订阅方,因为队列是临时生成的,所有每个订阅方都能收到同样的消息
        msgs, err := ch.Consume(
            q.Name, // queue  队列名字
            "",     // consumer
            true,   // auto-ack  自动确认
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {
                log.Printf(" [x] %s", d.Body)
            }
        }()
    
        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
    }

    栗子三(RPC模式)

    RPC 应答方:

    package main
    
    import (
        "fmt"
        "log"
        "strconv"
    
        "github.com/streadway/amqp"
    )
    
    /*
    RPC模式
    应答方
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func fib(n int) int {
        if n == 0 {
            return 0
        } else if n == 1 {
            return 1
        } else {
            return fib(n-1) + fib(n-2)
        }
    }
    
    func main() {
    
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        q, err := ch.QueueDeclare(
            "rpc_queue", // name
            false,       // durable
            false,       // delete when usused
            false,       // exclusive
            false,       // no-wait
            nil,         // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        // 公平分发 没有这个则round-robbin
        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")
    
        // 消费,等待请求
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        forever := make(chan bool)
    
        go func() {
            //请求来了
            for d := range msgs {
                n, err := strconv.Atoi(string(d.Body))
                failOnError(err, "Failed to convert body to integer")
    
                log.Printf(" [.] fib(%d)", n)
    
                // 计算
                response := fib(n)
    
                // 回答
                err = ch.Publish(
                    "",        // exchange
                    d.ReplyTo, // routing key
                    false,     // mandatory
                    false,     // immediate
                    amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: d.CorrelationId,  //序列号
                        Body:          []byte(strconv.Itoa(response)),
                    })
                failOnError(err, "Failed to publish a message")
    
                // 确认回答完毕
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Awaiting RPC requests")
        <-forever
    }

    RPC 请求方:

    package main
    
    import (
        "fmt"
        "log"
        "math/rand"
        "os"
        "strconv"
        "strings"
        "time"
    
        "github.com/streadway/amqp"
    )
    
    /*
    RPC模式
    请求方
    */
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
            bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
    }
    
    func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
    }
    
    func fibonacciRPC(n int) (res int, err error) {
    
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 队列声明
        q, err := ch.QueueDeclare(
            "",    // name
            false, // durable
            false, // delete when usused
            true,  // exclusive 为真即连接断开就删除
            false, // noWait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive   这个为真,服务器会认为这是该队列唯一的消费者
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        corrId := randomString(32)
    
        err = ch.Publish(
            "",          // exchange
            "rpc_queue", // routing key
            false,       // mandatory
            false,       // immediate
            amqp.Publishing{
                ContentType:   "text/plain",
                CorrelationId: corrId,
                ReplyTo:       q.Name,
                Body:          []byte(strconv.Itoa(n)),
            })
        failOnError(err, "Failed to publish a message")
    
        for d := range msgs {
            if corrId == d.CorrelationId {
                res, err = strconv.Atoi(string(d.Body))
                failOnError(err, "Failed to convert body to integer")
                break
            }
        }
    
        return
    }
    
    func main() {
        rand.Seed(time.Now().UTC().UnixNano())
    
        n := bodyFrom(os.Args)
    
        log.Printf(" [x] Requesting fib(%d)", n)
        res, err := fibonacciRPC(n)
        failOnError(err, "Failed to handle RPC request")
    
        log.Printf(" [.] Got %d", res)
    }
    
    func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "30"
        } else {
            s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
    }
  • 相关阅读:
    96. Unique Binary Search Trees1和2
    576. Out of Boundary Paths
    686. Repeated String Match判断字符串重复几次可以包含另外一个
    650. 2 Keys Keyboard
    Penetration Test
    Penetration Test
    Penetration Test
    Penetration Test
    CISSP 考试经验分享
    2019-2020 ICPC Asia Hong Kong Regional Contest J—Junior Mathematician 数位dp
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/7428391.html
Copyright © 2011-2022 走看看