zoukankan      html  css  js  c++  java
  • golang rabbitmq 的学习



    package main
    import (
    const (
        //AMQP URI
        uri = "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-idoall-queues-task"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main() {
        bodyMsg := bodyFrom(os.Args)
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        return s
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        tick := time.NewTicker(time.Millisecond * time.Duration(rand.Intn(1000)))
        for {
            err = channel.Publish(
                exchange, // exchange
                q.Name,   // routing key
                false,    // mandatory
                false,    // immediate
                    Headers:         amqp.Table{},
                    ContentType:     "text/plain",
                    ContentEncoding: "",
                    Body:            []byte(body),
        failOnError(err, "Failed to publish a message")
    package main
    import (
    const (
        //AMQP URI
        uri = "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-idoall-queues-task"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main() {
        consumer(uri, exchangeName, queueName)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string) {
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")


    Message acknowledgment 消息确认

    package main
    import (
     * use
     * go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message.....
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  ""
        //Durable AMQP queue name
        queueName    = "test-idoall-queues-acknowledgments"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        bodyMsg := bodyFrom(os.Args)
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        return s
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange,     // exchange
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
        failOnError(err, "Failed to publish a message")
    package main
    import (
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-acknowledgments"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        consumer(uri, exchangeName, queueName)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    Message durability消息持久化

    package main
    import (
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  ""
        //Durable AMQP queue name
        queueName    = "test-idoall-queues-durability"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        bodyMsg := bodyFrom(os.Args)
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        return s
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange,     // exchange
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                DeliveryMode: amqp.Persistent,
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
        failOnError(err, "Failed to publish a message")
    package main
    import (
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-durability"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        consumer(uri, exchangeName, queueName)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    Fair dispatch 公平分发

    package main

    import (

    const (
    //AMQP URI
    uri = "amqp://guest:guest@localhost:5672/"
    //Durable AMQP exchange name
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-idoall-queues-fair_dispatch"

    func failOnError(err error, msg string) {
    if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))

    func main(){
    bodyMsg := bodyFrom(os.Args)
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))

    func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
    s = "hello idoall.org"
    } else {
    s = strings.Join(args[1:], " ")
    return s

    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string){
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    q, err := channel.QueueDeclare(
    queueName, // name
    true, // durable
    false, // delete when unused
    false, // exclusive
    false, // no-wait
    nil, // arguments
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的。
    // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
    exchange, // exchange
    q.Name, // routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing {
    Headers: amqp.Table{},
    DeliveryMode: amqp.Persistent,
    ContentType: "text/plain",
    ContentEncoding: "",
    Body: []byte(body),
    failOnError(err, "Failed to publish a message")
    package main
    import (
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange nam
        exchangeName  = ""
        //Durable AMQP queue name
        queueName     = "test-idoall-queues-fair_dispatch"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        consumer(uri, exchangeName, queueName)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        err = channel.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        failOnError(err, "Failed to set QoS")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    Exchanges & Bindings

    package main
    import (
    const (
        //AMQP URI
        uri          =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  "test-idoall-exchange-logs"
        //Exchange type - direct|fanout|topic|x-custom
        exchangeType = "fanout"
        //AMQP routing key
        routingKey   = ""
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        bodyMsg := bodyFrom(os.Args)
        publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello idoall.org"
        } else {
            s = strings.Join(args[1:], " ")
        return s
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@exchangeType, exchangeType的类型direct|fanout|topic
    //@routingKey, routingKey的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
        err = channel.ExchangeDeclare(
            exchange,     // name
            exchangeType, // type
            true,         // durable
            false,        // auto-deleted
            false,        // internal
            false,        // noWait
            nil,          // arguments
        failOnError(err, "Failed to declare a queue")
        // 发布消息
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        err = channel.Publish(
            exchange,     // exchange
            routingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                Headers:         amqp.Table{},
                ContentType: "text/plain",
                ContentEncoding: "",
                Body:        []byte(body),
        failOnError(err, "Failed to publish a message")


    package main
    import (
    const (
        //AMQP URI
        uri           =  "amqp://guest:guest@localhost:5672/"
        //Durable AMQP exchange name
        exchangeName =  "test-idoall-exchange-logs"
        //Exchange type - direct|fanout|topic|x-custom
        exchangeType = "fanout"
        //AMQP binding key
        bindingKey   = ""
        //Durable AMQP queue name
        queueName     = ""
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main(){
        consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@exchangeType, exchangeType的类型direct|fanout|topic
    //@queue, queue的名称
    //@key , 绑定的key名称
    func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got Channel, declaring Exchange (%q)", exchange)
        err = channel.ExchangeDeclare(
            exchange,     // name of the exchange
            exchangeType, // type
            true,         // durable
            false,        // delete when complete
            false,        // internal
            false,        // noWait
            nil,          // arguments
        failOnError(err, "Exchange Declare:")
        q, err := channel.QueueDeclare(
            queueName, // name
            false,   // durable
            false,   // delete when unused
            true,   // exclusive 当Consumer关闭连接时,这个queue要被deleted
            false,   // no-wait
            nil,     // arguments
        failOnError(err, "Failed to declare a queue")
        err = channel.QueueBind(
            q.Name, // name of the queue
            key,        // bindingKey
            exchange,   // sourceExchange
            false,      // noWait
            nil,        // arguments
        failOnError(err, "Failed to bind a queue")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf(" [x] %s", d.Body)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")


    package main
    import (
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
    func fib(n int) int {
            if n == 0 {
                    return 0
            } else if n == 1 {
                    return 1
            } else {
                    return fib(n-1) + fib(n-2)
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
            q, err := ch.QueueDeclare(
                    "rpc_queue", // name
                    false,       // durable
                    false,       // delete when usused
                    false,       // exclusive
                    false,       // no-wait
                    nil,         // arguments
            failOnError(err, "Failed to declare a queue")
            err = ch.Qos(
                    1,     // prefetch count
                    0,     // prefetch size
                    false, // global
            failOnError(err, "Failed to set QoS")
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    false,  // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
            failOnError(err, "Failed to register a consumer")
            forever := make(chan bool)
            go func() {
                    for d := range msgs {
                            n, err := strconv.Atoi(string(d.Body))
                            failOnError(err, "Failed to convert body to integer")
                            log.Printf(" [.] fib(%d)", n)
                            response := fib(n)
                            err = ch.Publish(
                                    "",        // exchange
                                    d.ReplyTo, // routing key
                                    false,     // mandatory
                                    false,     // immediate
                                            ContentType:   "text/plain",
                                            CorrelationId: d.CorrelationId,
                                            Body:          []byte(strconv.Itoa(response)),
                            failOnError(err, "Failed to publish a message")
            log.Printf(" [*] Awaiting RPC requests")
    package main
    import (
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
    func randomString(l int) string {
            bytes := make([]byte, l)
            for i := 0; i < l; i++ {
                    bytes[i] = byte(randInt(65, 90))
            return string(bytes)
    func randInt(min int, max int) int {
            return min + rand.Intn(max-min)
    func fibonacciRPC(n int) (res int, err error) {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
            q, err := ch.QueueDeclare(
                    "",    // name
                    false, // durable
                    false, // delete when usused
                    true,  // exclusive
                    false, // noWait
                    nil,   // arguments
            failOnError(err, "Failed to declare a queue")
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    true,   // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
            failOnError(err, "Failed to register a consumer")
            corrId := randomString(32)
            err = ch.Publish(
                    "",          // exchange
                    "rpc_queue", // routing key
                    false,       // mandatory
                    false,       // immediate
                            ContentType:   "text/plain",
                            CorrelationId: corrId,
                            ReplyTo:       q.Name,
                            Body:          []byte(strconv.Itoa(n)),
            failOnError(err, "Failed to publish a message")
            for d := range msgs {
                    if corrId == d.CorrelationId {
                            res, err = strconv.Atoi(string(d.Body))
                            failOnError(err, "Failed to convert body to integer")
    func main() {
            n := bodyFrom(os.Args)
            log.Printf(" [x] Requesting fib(%d)", n)
            res, err := fibonacciRPC(n)
            failOnError(err, "Failed to handle RPC request")
            log.Printf(" [.] Got %d", res)
    func bodyFrom(args []string) int {
            var s string
            if (len(args) < 2) || os.Args[1] == "" {
                    s = "30"
            } else {
                    s = strings.Join(args[1:], " ")
            n, err := strconv.Atoi(s)
            failOnError(err, "Failed to convert arg to integer")
            return n
  • 相关阅读:
    CentOS 7下修改rabbitmq打开文件数量方法
    zabbix 监控zookeeper
    rabbitmq最大连接数(Socket Descriptors)
    Ubuntu14.04 x64 zabbix 3.0 安装
  • 原文地址:https://www.cnblogs.com/jackluo/p/10797459.html
Copyright © 2011-2022 走看看