zoukankan      html  css  js  c++  java
  • docker快速安装rabbitmq

    一、获取镜像

    #指定版本,该版本包含了web控制页面
    docker pull rabbitmq:management

    二、运行镜像

    #方式一:默认guest 用户,密码也是 guest
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
    
    #方式二:设置用户名和密码
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

    三、访问ui页面

    http://localhost:15672/

    四、golang案例

    #producer生产者代码
    package main
    
    import (
        "fmt"
    
        "log"
    
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
    
        uri = "amqp://guest:guest@10.0.0.11:5672/" // 10.0.0.11为主机ip
    
        //Durable AMQP exchange name
    
        exchangeName = ""
    
        //Durable AMQP queue name
    
        queueName = "test-queues"
    
        //Body of message
    
        bodyMsg string = "hello angel"
    )
    
    //如果存在错误,则输出
    
    func failOnError(err error, msg string) {
    
        if err != nil {
    
            log.Fatalf("%s: %s", msg, err)
    
            panic(fmt.Sprintf("%s: %s", msg, err))
    
        }
    
    }
    
    func main() {
    
        //调用发布消息函数
    
        publish(uri, exchangeName, queueName, bodyMsg)
    
        log.Printf("published %dB OK", len(bodyMsg))
    
    }
    
    //发布者的方法
    
    //@amqpURI, amqp的地址
    
    //@exchange, exchange的名称
    
    //@queue, queue的名称
    
    //@body, 主体内容
    
    func publish(amqpURI string, exchange string, queue string, body string) {
    
        //建立连接
    
        log.Printf("dialing %q", amqpURI)
    
        connection, err := amqp.Dial(amqpURI)
    
        failOnError(err, "Failed to connect to RabbitMQ")
    
        defer connection.Close()
    
        //创建一个Channel
    
        log.Printf("got Connection, getting Channel")
    
        channel, err := connection.Channel()
    
        failOnError(err, "Failed to open a channel")
    
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
    
        q, err := channel.QueueDeclare(
    
            queueName, // name
    
            false, // durable
    
            false, // delete when unused
    
            false, // exclusive
    
            false, // no-wait
    
            nil, // arguments
    
        )
    
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的
    
        // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue
    
        // routing_key就是指定的queue名字
    
        err = channel.Publish(
    
            exchange, // exchange
    
            q.Name, // routing key
    
            false, // mandatory
    
            false, // immediate
    
            amqp.Publishing{
    
                Headers: amqp.Table{},
    
                ContentType: "text/plain",
    
                ContentEncoding: "",
    
                Body: []byte(body),
            })
    
        failOnError(err, "Failed to publish a message")
    
    }
    #consumer消费者代码
    package main
    
    import (
        "fmt"
    
        "log"
    
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
    
        uri = "amqp://guest:guest@10.0.0.11:5672/"
    
        //Durable AMQP exchange nam
    
        exchangeName = ""
    
        //Durable AMQP queue name
    
        queueName = "test-queues"
    )
    
    //如果存在错误,则输出
    
    func failOnError(err error, msg string) {
    
        if err != nil {
    
            log.Fatalf("%s: %s", msg, err)
    
            panic(fmt.Sprintf("%s: %s", msg, err))
    
        }
    
    }
    
    func main() {
    
        //调用消息接收者
    
        consumer(uri, exchangeName, queueName)
    
    }
    
    //接收者方法
    
    //@amqpURI, amqp的地址
    
    //@exchange, exchange的名称
    
    //@queue, queue的名称
    
    func consumer(amqpURI string, exchange string, queue string) {
    
        //建立连接
    
        log.Printf("dialing %q", amqpURI)
    
        connection, err := amqp.Dial(amqpURI)
    
        failOnError(err, "Failed to connect to RabbitMQ")
    
        defer connection.Close()
    
        //创建一个Channel
    
        log.Printf("got Connection, getting Channel")
    
        channel, err := connection.Channel()
    
        failOnError(err, "Failed to open a channel")
    
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
    
        q, err := channel.QueueDeclare(
    
            queueName, // name
    
            false, // durable
    
            false, // delete when unused
    
            false, // exclusive
    
            false, // no-wait
    
            nil, // arguments
    
        )
    
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
    
        //订阅消息
    
        msgs, err := channel.Consume(
    
            q.Name, // queue
    
            "", // consumer
    
            true, // auto-ack
    
            false, // exclusive
    
            false, // no-local
    
            false, // no-wait
    
            nil, // args
    
        )
    
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
    
        forever := make(chan bool)
    
        //调用gorountine
    
        go func() {
    
            for d := range msgs {
    
                log.Printf("Received a message: %s", d.Body)
    
            }
    
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
    
        <-forever
    
    }

    五、拥有消息确认的代码

    #producer
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "os"
        "strings"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello angel"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange, // exchange
            q.Name,   // routing key
            false,    // mandatory
            false,    // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }
    #consumer
    package main
    
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }
  • 相关阅读:
    [转]在SQL Server中,关于with as使用介绍
    系统设计笔记【不断更新中】
    关于PHPZend framework2 框架 学习过程。 阅前须知: ZF2中的配置文件是可以静态文件配置来注册和通过相关函数动态注册。 1.EventManager(事件驱动),关于事件驱动,在ZF2相关资料没有详细说明,可以参考ANDROID的事件驱动,MFC的消息响应/事件驱动。
    http://www.mwop.net/blog/248IntroducingtheZF2PluginBroker.html
    Introducing AkrabatSession zf2
    http://127.0.0.1/loginapi/JsonPacket/example/loginrequestdemo.php
    if ($this>getRequest()>isPost()) {
    email
    Quenz_i997_PDA_CHS.rar
    http://team.aiitec.net/?c=inbox
  • 原文地址:https://www.cnblogs.com/angelyan/p/11218260.html
Copyright © 2011-2022 走看看