zoukankan      html  css  js  c++  java
  • Go RabbitMQ (一)

    RabbitMQ

    简介

    RabbitMQ是一个消息代理,用来负责接收和转发消息。

    术语

    • 生产者:生产者是负责发送消息的
    • 队列:队列是RabbitMQ用来存储消息的,受主机内存和磁盘大小的限制,本质上是一个消息的缓冲区。生产者可以将消息发送至队列中,消费者可以从队列中接收到消息
    • 消费者:消费者是用来等待接收消息

    生产者,消费者,代理可以驻留在不同主机或同一主机,一个应用可以是生产者也可以是消费者

    Hello World

    接下来我们来实现RabbitMQ的“Hello World”,生产者将“Hello World”发送进队列中,消费者将其接收并打印

    • RabbitMQ客户端的安装
      • RabbitMQ实现了很多协议,在这里我们使用的是的AMQP 0-9-1,这是一种用于消息传递的开放通用协议。同时有很多关于RabbitMQ的客户端,在这里我们使用的是Go amqp客户端
      • 安装: **go get github.com/streadway/amqp
    发送
    • 连接RabbitMQ

      conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
      if err != nil {
      	log.Fatal(err)
      }
      defer conn.Close()
      

      RabbitMQ的连接已经为我们抽象了socket的连接,同时为我们处理了协议版本号和身份认证等等

    • 创建通道

      ch,err := conn.Channel()
      if err != nil {
      	log.Fatal(err)
      }
      defer ch.Close()
      

      在使用其他API完成任务的时候我们首先通过以上方式创建通道

    • 在开始发送消息之前我们首先应该声明一个队列。声明队列之后我们就可以将消息发送至队列当中

      q, err := ch.QueueDeclare(
      "hello", // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
      

    )
    if err != nil {
    log.Fatal(err)
    }
    body := "Hello World!"
    err = ch.Publish(
    "", // exchange
    q.Name, // routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing {
    ContentType: "text/plain",
    Body: []byte(body),
    })
    if err != nil {
    log.Fatal(err)
    }
    ```
    队列的声明是一个幂等性操作,如果不存在该队列的话则会创建。此处注意,如果队列存在,修改了队列参数并不会影响已经存在的队列,并且会返回错误。消息内容是一个字节数组,所以我们必须进行编码

    接收
    • 连接,创建通道,队列

      在接收端我们同样需要像发送端一样连接RabbitMQ,创建通道后再创建队列,注意此处队列的创建是跟发送端的队列完全匹配的。队列在接收端也创建是因为我们接收端有可能比发送端先启动,所以为了保证我们要消费的队列存在我们在此处也进行创建

    • 消费消息

      msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
      

    )
    if err != nil {
    log.Fatal(err)
    }
    forever := make(chan bool)
    go func() {
    for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    }
    }()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
    ```
    使用通道消费队列中的消息,当队列有消息的时候将会异步的推送给我们

  • 相关阅读:
    P1582 倒水 (二进制)
    P2014 选课 (树形动规)
    多项式前置技能——复数
    P3694 邦邦的大合唱站队 (状压DP)
    P1754 球迷购票问题 (卡特兰数,递推)
    [SCOI2003]字符串折叠 (区间DP)
    [SDOI2008]仪仗队 (欧拉函数)
    4-字符串
    3.输出,输入,基本数据类型
    2.栈,堆,寄存器的理解
  • 原文地址:https://www.cnblogs.com/develop-SZT/p/10706553.html
Copyright © 2011-2022 走看看