zoukankan      html  css  js  c++  java
  • rabbitmq消息队列——"Hello World!"

    RabbitMQ

    一、”Hello World!”

    1、简介:

             RabbitMQ是一种消息中间件,主要思想很简单:接收消息并转发。你可以将它设想为一个邮局:你往里面发送邮件并确保邮件能实际运达,RabbitMQ好比这里的邮箱、邮局和邮递员的角色。

             RabbitMQ和邮局的一个主要区别是,RabbitMQ仅仅接收、存储、转发这些数据包裹——message

             先来看下RabbitMQ中的一些关键术语:

    a)、生产(者):除了发送什么意义都没有。一个发送消息的应用就是一个生产者,使用如下描述:

     

    b)、队列:储存消息的“容器”,可以储存任意多的message——本质上是一个无限长度的缓冲区,多个生产者可以将消息发送至同一队列,多个消费者也可以从同一队列中接收消息。队列使用如下描述,”queue_name”是该队列的名称:

    c)、消费(者):一个消费者就好比一个用来等待接收消息的程序。使用如下来描述:

    2、”Hello World!”(使用Go RabbitMQ客户端)

             这节我们将使用Go写两个小程序:一个生产者用来发送单一消息,一个消费者用来接收这些消息并打印。图示如下:

    备注:这里使用“amqp”包,自行安装:
    go get github.com/streadway/amqp

    发送:

             首先创建一个send.go和receiver.go分别用来发送和接收,发送方会链接RabbitMQ服务器、发送消息然后退出。

    在send.go,首先导入相关包:

    package main
    
    import (
      "fmt"
      "log"
    
      "github.com/streadway/amqp"
    )

    再加一个出错处理类:

    func failOnError(err error, msg string) {
      if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
      }
    }

    连接到服务器:

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")  //一般默认端口5672
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    这里的conn对象是抽象了底层的socket链接,在conn的基础上我们可以创建多个channel(通道,一个conn可以创建多个channel,使用channel节省了tcp资源,后续的很多操作如:队列声明、消息声明发送、交换器声明等都是在channel基础上操作的),接下来是产生一个channel:

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    为了发送消息到队列,我们还应该声明一个队列,并将消息发送给它:

    q, err := ch.QueueDeclare(
      "hello", // name 队列名称
      false,   // durable    是否持久化,默认为false
      false,   // delete when unused 队列无订阅时是否自动删除队列
      false,   // exclusive 是否队列私有,私有后仅有一个应用可以连接
      false,   // no-wait
      nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")
    
    body := "hello"    //消息主体
    err = ch.Publish(         //发送消息
      "",     // exchange    //指定消息发送的交换器名称
      q.Name, // routing key   //路由键
      false,  // mandatory
      false,  // immediate
      amqp.Publishing {
        ContentType: "text/plain",    //消息类型:文本消息
        Body:        []byte(body),  //消息体,理论上可以发送任意类型数据,因为是byte类型
      })
    failOnError(err, "Failed to publish a message")

    至此发送方代码完毕!

    接收:

    接收跟发送不同的是,接收端一直运行监听发送端消息发送并打印输出,接收端的模型如下:

    在这里,我们仍然使用send.go中的逻辑执行,首先是链接服务器,其次是声明channel和队列(可以防止接收端启动时发送端还没有启动的情况),主要代码如下:

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
    
    q, err := ch.QueueDeclare(
      "hello", // name
      false,   // durable
      false,   // delete when usused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    这里声明的队列名称就是send.go中声明的队列,然后从该队列中读取消息并打印:

    msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
    )
    failOnError(err, "Failed to register a consumer")
    
    forever := make(chan bool)
    
    go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
      }
    }()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

    至此,所有代码书写完毕,完整版代码如下:

    发送:

    package main
    
    import (
        "fmt"
        "log"
    
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s:%s", msg, err)
            panic(fmt.Sprintf("%s:%s", msg, err))
        }
    }
    
    func main() {
        //链接rabbitMQ服务器
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "fail to connect to rabbitmq server")
        defer conn.Close()
    
        //声明一个channel
        ch, err := conn.Channel()
        failOnError(err, "fail to open a channel")
        defer ch.Close()
    
        //根据channel声明一个队列
        q, err := ch.QueueDeclare("queue_name", false, false, false, false, nil)
        failOnError(err, "fail to define a queue")
    
        //使用channel直接发送消息至队列
        body := "hello queue!"
        err = ch.Publish("", q.Name, false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
        failOnError(err, "fail to publish the message")
    }
    

    接收:

    // rabbitmq_1.receiver project main.go
    package main
    
    import (
        "fmt"
        "log"
    
        "github.com/streadway/amqp"
    )
    
    func main() {
        //链接RabbitMQ服务器
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        //声明一个channel
        ch, err := conn.Channel()
        failOnError(err, "failed to open a channel")
        defer ch.Close()
    
        //使用channel声明一个队列
        q, err := ch.QueueDeclare("queue_name", false, false, false, false, nil)
        failOnError(err, "failed to declare a queue")
    
        //注册一个消费者
        msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
        failOnError(err, "failed to register a consumer")
    
        //定义一个never型chan,用于防止进程退出
        var forever chan bool = make(chan bool, 0)
        //开启一个channel,实时打印channel中的消息
        go func() {
            for d := range msgs {
                log.Printf("received a message:%s", d.Body)
            }
        }()
    
        log.Printf("press Ctrl+c to exit!")
        <-forever
    }
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }

    然后我们使用RabbitMQ自带的管理工具查看mq运行情况:

    首先要开启管理工具:
    rabbitmq-plugins enable rabbitmq_management

    然后浏览器访问地址:http://localhost:15672/,这里输入默认用户名密码:guest/guest,进去后界面如下:

    
    
  • 相关阅读:
    ZOJ4125 Sekiro
    ZOJ4118 Stones in the Bucket
    ZOJ4115 Wandering Robot
    ZOJ4113 Calandar
    【递归】N皇后问题 和 2n皇后问题 dfs
    7-18
    7_13
    二维前缀和
    64位整数乘法
    【分治】魔法石的诱惑
  • 原文地址:https://www.cnblogs.com/vipzhou/p/6041685.html
Copyright © 2011-2022 走看看