zoukankan      html  css  js  c++  java
  • go 操作RabbitMQ

    1.RMQ的安装

    docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 rabbitmq:3-management

    • 三个端口映射,分别表示
    5672:连接生产者、消费者的端口
    15672:WEB管理页面的端口
    25672:分布式集群的端口
    

    2.基本概念

    • amqp:高级消息队列协议,即一种消息中间件协议,RMQ是amqp协议的一个具体实现。RMQ使用Erlang语言实现的,具有很好的并发能力,具体历史请百度,这里主要关心怎么用。
    • 生产者将消息发送至交换器;交换器再发送至队列,最后发送至消费者
    • 交换器有四种类型,fanout、direct、topic三种类型,header类型没用过,不关注。
    fanout
    一对多,根据绑定发送到每一个队列,
    常用于发布订阅
    
    direct
    默认模式,一对一关系,根据routingkey与bindingjkey
    一一对应匹配,发送消息
    
    关于topic模式
    以 ‘.’ 来分割单词。
    ‘#’ 表示一个或多个单词。
    ‘*’ 表示一个单词。
    如:
    RoutingKey为:
    aaa.bbb.ccc
    BindingKey可以为:
    *.bbb.ccc
    aaa.#
    

    3.库中重要的方法

    • 创建交换器
    func (ch *Channel) ExchangeDeclare(
    	name string,  //交换器的名称
    	kind string, //表示交换器的类型。有四种常用类型:direct、fanout、topic、headers
    	durable bool, //是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器
    	autoDelete bool, //是否自动删除,true表示,当所有绑定都与交换器解绑后,会自动删除此交换器。
    	internal bool,  //是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
    	noWait bool, //是否非阻塞, 阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ
    	args Table
    ) error
    
    • 创建队列
    func (ch *Channel) QueueDeclare(
    	name string,  //队列名称
    	durable bool,  //是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息
    	autoDelete bool,  //是否删除,当所有消费者都断开时,队列会自动删除。
    	exclusive bool,   //是否排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。
    	noWait bool, //是否非阻塞
    	args Table) (Queue, error)
    
    • 队列与交换器绑定,key,表示要绑定的键,交换器以此来分发
    func (ch *Channel) QueueBind(
    	name,  //队列名字,确定哪个队列
    	key, // 对应图中BandingKey,表示要绑定的键。
    	exchange string,  //交换器的名字
    	noWait bool,  //是否非阻塞
    	args Table) error
    
    • 交换器之间的绑定
    func (ch *Channel) ExchangeBind(
    	destination,  //目的交换器,通常是内部交换器。
    	key,    //对应BandingKey,表示要绑定的键。
    	source string,  //源交换器
    	noWait bool,   //是否非阻塞
    	args Table) error
    
    • 发送消息
    func (ch *Channel) Publish(
    		exchange,  //要发送的交换机
    		key string,  //路由键,与之相关的绑定键对应
    		mandatory, 
    		immediate bool, 
    		msg Publishing   //要发送的消息,msg对应一个Publishing结构
    		) error
    		
    //Publishing 结构体
    type Publishing struct {
            Headers Table
            // Properties
            ContentType     string  //消息的类型,通常为“text/plain”
            ContentEncoding string  //消息的编码,一般默认不用写
            DeliveryMode    uint8   //消息是否持久化,2表示持久化,0或1表示非持久化。
            Body []byte  //消息主体
            Priority        uint8  //消息的优先级 0 to 9
            CorrelationId   string    // correlation identifier
            ReplyTo         string    // address to to reply to (ex: RPC)
            Expiration      string    // message expiration spec
            MessageId       string    // message identifier
            Timestamp       time.Time // message timestamp
            Type            string    // message type name
            UserId          string    // creating user id - ex: "guest"
            AppId           string    // creating application id
    }
    		
    
    • 消费者接收消息--推模式
    func (ch *Channel) Consume(
    	queue string,  //队列名称 
    	consumer string,  //消费者标签,用于区分不同的消费者
    	autoAck string,  //是否自动回复ACK,true为是,回复ACK表示高速服务器我收到消息了。建议为false,手动回复,这样可控性强
    	exclusive bool,  //设置是否排他,排他表示当前队列只能给一个消费者使用
    	noLocal bool, //如果为true,表示生产者和消费者不能是同一个connect
    	noWait bool, //是否非阻塞
    	args Table) (<-chan Delivery, error)
    
    • 消费者接收消息--拉模式
    func (ch *Channel) Get(
    	queue string, 
    	autoAck bool) (msg Delivery, ok bool, err error)
    
    • 手动回复消息
    func (ch *Channel) Ack(tag uint64, multiple bool) error
    
    func (me Delivery) Ack(multiple bool) error {
            if me.Acknowledger == nil {
                    return errDeliveryNotInitialized
            }
            return me.Acknowledger.Ack(me.DeliveryTag, multiple)
    }
    
    func (d Delivery) Reject(requeue bool) error
    
    Publish – mandatory参数
    • false:当消息无法通过交换器匹配到队列时,会丢弃消息。
    • true:当消息无法通过交换器匹配到队列时,会调用basic.return通知生产者。
    • 注:不建议使用,因会使程序逻辑变得复杂,可以通过备用交换机来实现类似的功能。
    Publish – immediate参数
    • true:当消息到达Queue后,发现队列上无消费者时,通过basic.Return返回给生产者。

    • false:消息一直缓存在队列中,等待生产者。

    • 注:不建议使用此参数,遇到这种情况,可用TTL和DLX方法代替(后面会介绍

    • Qos
      func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

    • 注意:这个在推送模式下非常重要,通过设置Qos用来防止消息堆积。

    • prefetchCount:消费者未确认消息的个数。

    • prefetchSize :消费者未确认消息的大小。

    • global :是否全局生效,true表示是。全局生效指的是针对当前connect里的所有channel都生效

    4.代码示例

    生产者

    package main
    
    import (
            "fmt"
            "log"
            "os"
            "strings"
    
            "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
            }
    }
    
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            err = ch.ExchangeDeclare(
                    "logs",   // name
                    "fanout", // type
                    true,     // durable
                    false,    // auto-deleted
                    false,    // internal
                    false,    // no-wait
                    nil,      // arguments
            )
            failOnError(err, "Failed to declare an exchange")
    
            body := bodyFrom(os.Args)
            err = ch.Publish(
                    "logs", // exchange
                    "",     // routing key
                    false,  // mandatory
                    false,  // immediate
                    amqp.Publishing{
                            ContentType: "text/plain",
                            Body:        []byte(body),
                    })
            failOnError(err, "Failed to publish a message")
    
            log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
            var s string
            if (len(args) < 2) || os.Args[1] == "" {
                    s = "hello"
            } else {
                    s = strings.Join(args[1:], " ")
            }
            return s
    }
    

    消费者

    package main
    
    import (
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main() {
    	conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    	DealWithError(err,"Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch,err := conn.Channel()
    	DealWithError(err,"Failed to open a channel")
    	defer ch.Close()
    	//声明交换器
    	ch.ExchangeDeclare(
    		"logs",
    		"fanout",
    		true,
    		false,
    		false,
    		false,
    		nil,
    		)
    	DealWithError(err,"Failed to declare an exchange")
    	//声明了队列
    	q,err := ch.QueueDeclare(
    		"", //队列名字为rabbitMQ自动生成
    		false,
    		false,
    		true,
    		false,
    		nil,
    		)
    	DealWithError(err,"Failed to declare an exchange")
    	//交换器跟队列进行绑定,交换器将接收到的消息放进队列中
    	err = ch.QueueBind(
    		q.Name,
    		"",
    		"logs",
    		false,
    		nil,
    		)
    	DealWithError(err,"Failed to bind a queue")
    	msgs,err := ch.Consume(
    		q.Name,
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    		)
    	DealWithError(err,"Failed to register a consumer")
    	forever := make(chan bool)
    	go func() {
    		for d := range msgs{
    			log.Printf(" [x] %s",d.Body)
    		}
    	}()
    	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    	<-forever
    }
    
    func DealWithError(err error,msg string)  {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
  • 相关阅读:
    idea炫酷主题下载网站
    You have not concluded your merge (MERGE_HEAD exists)
    内部接口
    Nginx初尝试
    泛型和反射
    使用idea创建web项目
    <转>如果你报createSQLQuery is not valid without active transaction,请看这里
    android 通过pull解析xml文件
    shiro环境搭建
    springmvc文件上传
  • 原文地址:https://www.cnblogs.com/fanzou/p/13479482.html
Copyright © 2011-2022 走看看