zoukankan      html  css  js  c++  java
  • golang rabbitmq 的学习

    https://www.rabbitmq.com/tutorials/tutorial-one-go.html

    Rabbitmq的任务分发机制

    producer_task.go
    
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "math/rand"
        "os"
        "strings"
        "time"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-idoall-queues-task"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        tick := time.NewTicker(time.Millisecond * time.Duration(rand.Intn(1000)))
        for {
            <-tick.C
            err = channel.Publish(
                exchange, // exchange
                q.Name,   // routing key
                false,    // mandatory
                false,    // immediate
                amqp.Publishing{
                    Headers:         amqp.Table{},
                    ContentType:     "text/plain",
                    ContentEncoding: "",
                    Body:            []byte(body),
                })
        }
        failOnError(err, "Failed to publish a message")
    }
    consumer_task.go
    
    package main
    
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-idoall-queues-task"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                //*
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                //*/
                log.Printf("Done")
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }

    开了三个任务窗口接收,发现是并行接收的

    Message acknowledgment 消息确认

    producer_acknowledgments.go
    
    package main
    
    import (
        "fmt"
        "log"
        "os"
        "strings"
        "github.com/streadway/amqp"
    )
    
    /**
     * use
     * go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message.....
     */
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  ""
        //Durable AMQP queue name
        queueName    = "test-idoall-queues-acknowledgments"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange,     // exchange
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }
    consumer_acknowledgments.go
    
    package main
    
    import (
        "fmt"
        "log"
        "bytes"
        "time"
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-acknowledgments"
    )
    /**
     *
     */
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }

    Message durability消息持久化

    producer_durability.go
    package main
    
    import (
        "fmt"
        "log"
        "os"
        "strings"
        "github.com/streadway/amqp"
    )
    
    
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  ""
        //Durable AMQP queue name
        queueName    = "test-idoall-queues-durability"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange,     // exchange
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                DeliveryMode: amqp.Persistent,
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }
    consumer_durability.go
    package main
    
    import (
        "fmt"
        "log"
        "bytes"
        "time"
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-durability"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }

    Fair dispatch 公平分发

    producer_fair_dispatch.go
    package main

    import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
    )

    const (
    //AMQP URI
    uri = "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-idoall-queues-fair_dispatch"
    )

    //如果存在错误,则输出
    func failOnError(err error, msg string) {
    if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
    }
    }

    func main(){
    bodyMsg := bodyFrom(os.Args)
    //调用发布消息函数
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
    }

    func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
    s = "hello idoall.org"
    } else {
    s = strings.Join(args[1:], " ")
    }
    return s
    }

    //发布者的方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
    //建立连接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //创建一个Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue
    q, err := channel.QueueDeclare(
    queueName, // name
    true, // durable
    false, // delete when unused
    false, // exclusive
    false, // no-wait
    nil, // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的。
    // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
    exchange, // exchange
    q.Name, // routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing {
    Headers: amqp.Table{},
    DeliveryMode: amqp.Persistent,
    ContentType: "text/plain",
    ContentEncoding: "",
    Body: []byte(body),
    })
    failOnError(err, "Failed to publish a message")
    }
     
    consumer_fair_dispatch.go
    package main
    
    import (
        "fmt"
        "log"
        "bytes"
        "time"
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-fair_dispatch"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        //每次只取一条消息
        err = channel.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }

    Exchanges & Bindings

    producer_exchange_logs.go
    package main
    
    import (
        "fmt"
        "log"
        "os"
        "strings"
        "github.com/streadway/amqp"
    )
    
    
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  "test-idoall-exchange-logs"
        //Exchange type - direct|fanout|topic|x-custom
        exchangeType = "fanout"
        //AMQP routing key
        routingKey   = ""
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@exchangeType, exchangeType的类型direct|fanout|topic
    //@routingKey, routingKey的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
    
        //创建一个queue
        log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
        err = channel.ExchangeDeclare(
            exchange,     // name
            exchangeType, // type
            true,         // durable
            false,        // auto-deleted
            false,        // internal
            false,        // noWait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        // 发布消息
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        err = channel.Publish(
            exchange,     // exchange
            routingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }

    consumer_exchange_logs.go

    package main
    
    import (
        "fmt"
        "log"
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  "test-idoall-exchange-logs"
        //Exchange type - direct|fanout|topic|x-custom
        exchangeType = "fanout"
        //AMQP binding key
        bindingKey   = ""
        //Durable AMQP queue name
        queueName     = ""
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main(){
        //调用消息接收者
        consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
    }
    
    //接收者方法
    //
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@exchangeType, exchangeType的类型direct|fanout|topic
    //@queue, queue的名称
    //@key , 绑定的key名称
    func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        //创建一个exchange
        log.Printf("got Channel, declaring Exchange (%q)", exchange)
        err = channel.ExchangeDeclare(
            exchange,     // name of the exchange
            exchangeType, // type
            true,         // durable
            false,        // delete when complete
            false,        // internal
            false,        // noWait
            nil,          // arguments
        );
        failOnError(err, "Exchange Declare:")
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            true,   // exclusive 当Consumer关闭连接时,这个queue要被deleted
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        //绑定到exchange
        err = channel.QueueBind(
            q.Name, // name of the queue
            key,        // bindingKey
            exchange,   // sourceExchange
            false,      // noWait
            nil,        // arguments
        );
        failOnError(err, "Failed to bind a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf(" [x] %s", d.Body)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }

    远程调用RPC

     rpc_server.go 
    package main
    
    import (
            "fmt"
            "log"
            "strconv"
    
            "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
            }
    }
    
    func fib(n int) int {
            if n == 0 {
                    return 0
            } else if n == 1 {
                    return 1
            } else {
                    return fib(n-1) + fib(n-2)
            }
    }
    
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            q, err := ch.QueueDeclare(
                    "rpc_queue", // name
                    false,       // durable
                    false,       // delete when usused
                    false,       // exclusive
                    false,       // no-wait
                    nil,         // arguments
            )
            failOnError(err, "Failed to declare a queue")
    
            err = ch.Qos(
                    1,     // prefetch count
                    0,     // prefetch size
                    false, // global
            )
            failOnError(err, "Failed to set QoS")
    
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    false,  // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
            )
            failOnError(err, "Failed to register a consumer")
    
            forever := make(chan bool)
    
            go func() {
                    for d := range msgs {
                            n, err := strconv.Atoi(string(d.Body))
                            failOnError(err, "Failed to convert body to integer")
    
                            log.Printf(" [.] fib(%d)", n)
                            response := fib(n)
    
                            err = ch.Publish(
                                    "",        // exchange
                                    d.ReplyTo, // routing key
                                    false,     // mandatory
                                    false,     // immediate
                                    amqp.Publishing{
                                            ContentType:   "text/plain",
                                            CorrelationId: d.CorrelationId,
                                            Body:          []byte(strconv.Itoa(response)),
                                    })
                            failOnError(err, "Failed to publish a message")
    
                            d.Ack(false)
                    }
            }()
    
            log.Printf(" [*] Awaiting RPC requests")
            <-forever
    }
     rpc_client.go
    package main
    
    import (
            "fmt"
            "log"
            "math/rand"
            "os"
            "strconv"
            "strings"
            "time"
    
            "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
            }
    }
    
    func randomString(l int) string {
            bytes := make([]byte, l)
            for i := 0; i < l; i++ {
                    bytes[i] = byte(randInt(65, 90))
            }
            return string(bytes)
    }
    
    func randInt(min int, max int) int {
            return min + rand.Intn(max-min)
    }
    
    func fibonacciRPC(n int) (res int, err error) {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            q, err := ch.QueueDeclare(
                    "",    // name
                    false, // durable
                    false, // delete when usused
                    true,  // exclusive
                    false, // noWait
                    nil,   // arguments
            )
            failOnError(err, "Failed to declare a queue")
    
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    true,   // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
            )
            failOnError(err, "Failed to register a consumer")
    
            corrId := randomString(32)
    
            err = ch.Publish(
                    "",          // exchange
                    "rpc_queue", // routing key
                    false,       // mandatory
                    false,       // immediate
                    amqp.Publishing{
                            ContentType:   "text/plain",
                            CorrelationId: corrId,
                            ReplyTo:       q.Name,
                            Body:          []byte(strconv.Itoa(n)),
                    })
            failOnError(err, "Failed to publish a message")
    
            for d := range msgs {
                    if corrId == d.CorrelationId {
                            res, err = strconv.Atoi(string(d.Body))
                            failOnError(err, "Failed to convert body to integer")
                            break
                    }
            }
    
            return
    }
    
    func main() {
            rand.Seed(time.Now().UTC().UnixNano())
    
            n := bodyFrom(os.Args)
    
            log.Printf(" [x] Requesting fib(%d)", n)
            res, err := fibonacciRPC(n)
            failOnError(err, "Failed to handle RPC request")
    
            log.Printf(" [.] Got %d", res)
    }
    
    func bodyFrom(args []string) int {
            var s string
            if (len(args) < 2) || os.Args[1] == "" {
                    s = "30"
            } else {
                    s = strings.Join(args[1:], " ")
            }
            n, err := strconv.Atoi(s)
            failOnError(err, "Failed to convert arg to integer")
            return n
    }
  • 相关阅读:
    SQL基础用法(实例二)
    SQL基础用法(实例一)
    CentOS 7下修改rabbitmq打开文件数量方法
    CentOS7下安装RabbitMQ
    zabbix 监控zookeeper
    使用Zabbix监控ZooKeeper服务的健康状态
    rabbitmq最大连接数(Socket Descriptors)
    zabbix如何添加主机监控
    Ubuntu下Zabbix服务器监控工具部署
    Ubuntu14.04 x64 zabbix 3.0 安装
  • 原文地址:https://www.cnblogs.com/jackluo/p/10797459.html
Copyright © 2011-2022 走看看