zoukankan      html  css  js  c++  java
  • RabbitMQ的各种模式

     前面我们已经说明了Simple模式的实现,下面我们会说明其他模式的使用和实现

     1、Work 工作模式

      特点:一个消息只能被一个消费者获取,也就是说一个消息只能被消费一次。其实就是多了些消费者

      

      使用场景: 生产消息的速度 大于 消费消费消息的速度,还句话说就是减少系统的负载

    代码实现

     和Simple模式类似,就是多了一个mainWorkReceive.go,代码都是一样的。

    • RabbitMQWork/mainWorkPublish.go
      package main
      
      import (
      	"demo/RabbitMQ"
      	"fmt"
      	"strconv"
      	"time"
      )
      
      func main() {
      	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
      
      	for i := 0; i <= 100; i++ {
      		rabbitmq.PublishSimple("Hello Test! "+strconv.Itoa(i))
      		time.Sleep(1*time.Second)
      		fmt.Println(i)
      	}
      }
      
    • RabbitMQWork/mainWorkRecieve.go
      package main
      
      import "demo/RabbitMQ"
      
      func main() {
      	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
      	rabbitmq.ConsumeSimple()
      }
      

    2、Publish/Subscribe 订阅模式

      消费被路由投递给多个队列,一个消息被多个消费者获取, 下图的x为交换机,通过规则去匹配到各自的对队列上,再转给消费者

    代码实现

    •  RabbitMQ/rabbitmq.go
      package RabbitMQ
      
      import (
      	"fmt"
      	"github.com/streadway/amqp"
      	"log"
      )
      
      // 创建连接url
      const MQURL = "amqp://admin:admin@127.0.0.1:5672/test"
      
      type RabbitMQ struct {
      	conn    *amqp.Connection
      	channel *amqp.Channel
      
      	// 队列名称
      	QueueName string
      	// 交换机
      	Exchange string
      	// key
      	Key string
      	// 连接信息
      	Mqurl string
      }
      
      // 创建连接实例
      func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
      	// exchange 为空会使用默认的default
      	rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
      	var err error
      	// 创建rabbitmq来连接
      	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
      	rabbitmq.failOnErr(err, "创建连接错误!")
      
      	// 创建channel
      	rabbitmq.channel, err = rabbitmq.conn.Channel()
      	rabbitmq.failOnErr(err, "获取channel失败")
      	return rabbitmq
      }
      
      // 断开连接:channel和conn
      func (r *RabbitMQ) Destory() {
      	r.channel.Close()
      	r.conn.Close()
      }
      
      // 错误处理的函数
      func (r *RabbitMQ) failOnErr(err error, message string) {
      	if err != nil {
      		log.Fatalf("%s: %s", message, err)
      		panic(fmt.Sprintf("%s", message))
      	}
      }
      
      // step1: simple style 创建简单模式的实例, 只需要队列名
      func NewRabbitMQSimple(queueName string) *RabbitMQ {
      	return NewRabbitMQ(queueName, "", "")
      }
      
      // step2: 简单模式下生产
      func (r *RabbitMQ) PublishSimple(message string) {
      	// 固定用法  申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据
      	_, err := r.channel.QueueDeclare(
      		r.QueueName, // 队列名
      		false,       // 控制是否持久化
      		false,       // 是否自动删除,当最后一个消费者断开连接后是否删除
      		false,       // 是否具有排他性,其他用户不可访问
      		false,       // 是否阻塞
      		nil,         //  额外属性
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 发送消息到队列中
      	err = r.channel.Publish(
      		r.Exchange,
      		r.QueueName,
      		false, // mandatory 如果为true,会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把消息返回给发送者
      		false, // immediate 如果为true,当exchange发送消息到队列后发现队列没有绑定消费者后会把消息返回
      		amqp.Publishing{
      			ContentType: "text/plain",
      			Body:        []byte(message),
      		})
      }
      
      // 简单模式的消费消息
      func (r *RabbitMQ) ConsumeSimple() {
      	// 固定用法  申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据
      	_, err := r.channel.QueueDeclare(
      		r.QueueName, // 队列名
      		false,       // 控制是否持久化
      		false,       // 是否自动删除,当最后一个消费者断开连接后是否删除
      		false,       // 是否具有排他性,其他用户不可访问
      		false,       // 是否阻塞
      		nil,         //  额外属性
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 接受消息
      	msgs, err := r.channel.Consume(
      		r.QueueName,
      		"",    // 用来区分多个消费在
      		true,  // 是否自动应答, 主动的告诉mq自己已经消费完了,如果false,需要回调函数
      		false, // 是否排他性
      		false, // 如果设置为true, 表示不能将同一个connection中发送的消息传递给这个connect中的消费者
      		false, // 设置为阻塞,一个一个消费
      		nil,   // 附加信息
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 消费时的固定写法,用来阻塞
      	forever := make(chan bool)
      	// 启用协程处理消息
      	go func() {
      		for d := range msgs {
      			// 实现我们要处理的逻辑函数
      			log.Printf("Received a message: %s", d.Body)
      		}
      	}()
      
      	log.Printf("[*] Waiting for message,To exit press CTRL + C")
      	<-forever
      }
      
      // 订阅模式创建RabbitMQ实例, 需要指定exchange
      func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
      	// 创建RabbitMQ实例
      	rabbitmq := NewRabbitMQ("", exchangeName, "")
      	var err error
      	// 获取connection
      	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
      	rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
      	// 获取channel
      	rabbitmq.channel, err = rabbitmq.conn.Channel()
      	rabbitmq.failOnErr(err, "failed to open a channel")
      	return rabbitmq
      }
      
      // 订阅模式生产
      func (r *RabbitMQ) PublishPub(message string) {
      	// 尝试创建交换机 如果存在就直接发送,没存在就创建
      	err := r.channel.ExchangeDeclare(
      		r.Exchange, // 交换机名字
      		"fanout",   // 订阅模式下,需要设置为fanout, 广播类型
      		true,       //  是否持久化
      		false,      //  是否自动删除
      		false,      //  true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间绑定
      		false,      //   是否阻塞
      		nil,
      	)
      
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 发送信息
      	err = r.channel.Publish(
      		r.Exchange,
      		"",
      		false, // mandatory 如果为true,会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把消息返回给发送者
      		false, // 和simple模式类似
      		amqp.Publishing{
      			ContentType: "text/plain",
      			Body:        []byte(message),
      		})
      }
      
      // 订阅模式消费端代码,
      /*
      	凡是消费者都需要将有exchange, queue对象 且将他们绑定到一个exchange上
      */
      func (r *RabbitMQ) RecieveSub() {
      	// 1 试探性创建交换机
      	err := r.channel.ExchangeDeclare(
      		r.Exchange, // 交换机名字
      		"fanout",   // 订阅模式下,需要设置为fanout, 广播类型
      		true,       //  是否持久化
      		false,      //  是否自动删除
      		false,      //  true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间绑定
      		false,      //   是否阻塞
      		nil,
      	)
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 试探性创建队列,这里注意队列名称不要写
      	q, err := r.channel.QueueDeclare(
      		"",    // 队列名这里为空 表示随机生产队列名称 因为每个消费者就需要一个队列
      		false, // 控制是否持久化
      		false, // 是否自动删除,当最后一个消费者断开连接后是否删除
      		true,  // 是否具有排他性 这里设置为true
      		false, // 是否阻塞
      		nil,   //  额外属性
      	)
      	r.failOnErr(err, "Failed to declare a queue!")
      
      	// 绑定队列到exchange中
      	err = r.channel.QueueBind(
      		q.Name,      // 这里就是上面创建的/存在的交换机随即名字
      		"",     // 在pub/sub模式下,这里的key为空
      		r.Exchange,  //  绑定到一个exchange上
      		false,
      		nil,
      	)
      
      	// 和Simple模式类似
      	messages, err := r.channel.Consume(
      		q.Name,
      		"",
      		true,
      		false,
      		false,
      		false,
      		nil,
      	)
      
      	forever := make(chan bool)
      	go func() {
      		for d := range messages {
      			log.Printf("Received a message: %s", d.Body)
      		}
      	}()
      	fmt.Println("退出按:CTRL + C 
      ")
      	<-forever
      
      }
      
    • RabbitMQSubPub/mainSub1.go
      package main
      
      import "demo/RabbitMQ"
      
      func main() {
      	rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
      	rabbitmq.RecieveSub()
      }
      
    • RabbitMQSubPub/mainPub.go
      package main
      
      import (
      	"demo/RabbitMQ"
      	"strconv"
      	"time"
      )
      
      func main() {
      	rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
      
      	for i := 0; i < 100; i ++ {
      		rabbitmq.PublishPub("订阅模式生产第" +strconv.Itoa(i) +"条数据")
      		time.Sleep(1 * time.Second)
      	}
      }
      

    3、Routing 路由模式

      一个消息被多个消费在获取。并且消息的目标队列可以被生产者指定,按照指定规则匹配到相应的队列中; 作用就是不同的key去取不同的数据

    •  RabbitMQ/rabbitmq.go
      package RabbitMQ
      
      import (
      	"fmt"
      	"github.com/streadway/amqp"
      	"log"
      )
      
      // 创建连接url
      const MQURL = "amqp://admin:admin@127.0.0.1:5672/test"
      
      type RabbitMQ struct {
      	conn    *amqp.Connection
      	channel *amqp.Channel
      
      	// 队列名称
      	QueueName string
      	// 交换机
      	Exchange string
      	// key
      	Key string
      	// 连接信息
      	Mqurl string
      }
      
      // 创建连接实例
      func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
      	// exchange 为空会使用默认的default
      	rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
      	var err error
      	// 创建rabbitmq来连接
      	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
      	rabbitmq.failOnErr(err, "创建连接错误!")
      
      	// 创建channel
      	rabbitmq.channel, err = rabbitmq.conn.Channel()
      	rabbitmq.failOnErr(err, "获取channel失败")
      	return rabbitmq
      }
      
      // 断开连接:channel和conn
      func (r *RabbitMQ) Destory() {
      	r.channel.Close()
      	r.conn.Close()
      }
      
      // 错误处理的函数
      func (r *RabbitMQ) failOnErr(err error, message string) {
      	if err != nil {
      		log.Fatalf("%s: %s", message, err)
      		panic(fmt.Sprintf("%s", message))
      	}
      }
      
      // step1: simple style 创建简单模式的实例, 只需要队列名
      func NewRabbitMQSimple(queueName string) *RabbitMQ {
      	return NewRabbitMQ(queueName, "", "")
      }
      
      // step2: 简单模式下生产
      func (r *RabbitMQ) PublishSimple(message string) {
      	// 固定用法  申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据
      	_, err := r.channel.QueueDeclare(
      		r.QueueName, // 队列名
      		false,       // 控制是否持久化
      		false,       // 是否自动删除,当最后一个消费者断开连接后是否删除
      		false,       // 是否具有排他性,其他用户不可访问
      		false,       // 是否阻塞
      		nil,         //  额外属性
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 发送消息到队列中
      	err = r.channel.Publish(
      		r.Exchange,
      		r.QueueName,
      		false, // mandatory 如果为true,会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把消息返回给发送者
      		false, // immediate 如果为true,当exchange发送消息到队列后发现队列没有绑定消费者后会把消息返回
      		amqp.Publishing{
      			ContentType: "text/plain",
      			Body:        []byte(message),
      		})
      }
      
      // 简单模式的消费消息
      func (r *RabbitMQ) ConsumeSimple() {
      	// 固定用法  申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据
      	_, err := r.channel.QueueDeclare(
      		r.QueueName, // 队列名
      		false,       // 控制是否持久化
      		false,       // 是否自动删除,当最后一个消费者断开连接后是否删除
      		false,       // 是否具有排他性,其他用户不可访问
      		false,       // 是否阻塞
      		nil,         //  额外属性
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 接受消息
      	msgs, err := r.channel.Consume(
      		r.QueueName,
      		"",    // 用来区分多个消费在
      		true,  // 是否自动应答, 主动的告诉mq自己已经消费完了,如果false,需要回调函数
      		false, // 是否排他性
      		false, // 如果设置为true, 表示不能将同一个connection中发送的消息传递给这个connect中的消费者
      		false, // 设置为阻塞,一个一个消费
      		nil,   // 附加信息
      	)
      	if err != nil {
      		fmt.Println(err)
      	}
      
      	// 消费时的固定写法,用来阻塞
      	forever := make(chan bool)
      	// 启用协程处理消息
      	go func() {
      		for d := range msgs {
      			// 实现我们要处理的逻辑函数
      			log.Printf("Received a message: %s", d.Body)
      		}
      	}()
      
      	log.Printf("[*] Waiting for message,To exit press CTRL + C")
      	<-forever
      }
      
      // 订阅模式创建RabbitMQ实例, 需要指定exchange
      func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
      	// 创建RabbitMQ实例
      	rabbitmq := NewRabbitMQ("", exchangeName, "")
      	var err error
      	// 获取connection
      	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
      	rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
      	// 获取channel
      	rabbitmq.channel, err = rabbitmq.conn.Channel()
      	rabbitmq.failOnErr(err, "failed to open a channel")
      	return rabbitmq
      }
      
      // 订阅模式生产
      func (r *RabbitMQ) PublishPub(message string) {
      	// 尝试创建交换机 如果存在就直接发送,没存在就创建
      	err := r.channel.ExchangeDeclare(
      		r.Exchange, // 交换机名字
      		"fanout",   // 订阅模式下,需要设置为fanout, 广播类型
      		true,       //  是否持久化
      		false,      //  是否自动删除
      		false,      //  true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间绑定
      		false,      //   是否阻塞
      		nil,
      	)
      
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 发送信息
      	err = r.channel.Publish(
      		r.Exchange,
      		"",
      		false, // mandatory 如果为true,会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把消息返回给发送者
      		false, // 和simple模式类似
      		amqp.Publishing{
      			ContentType: "text/plain",
      			Body:        []byte(message),
      		})
      }
      
      // 订阅模式消费端代码,
      /*
      	凡是消费者都需要将有exchange, queue对象 且将他们绑定到一个exchange上
      */
      func (r *RabbitMQ) RecieveSub() {
      	// 1 试探性创建交换机
      	err := r.channel.ExchangeDeclare(
      		r.Exchange, // 交换机名字
      		"fanout",   // 订阅模式下,需要设置为fanout, 广播类型
      		true,       //  是否持久化
      		false,      //  是否自动删除
      		false,      //  true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间绑定
      		false,      //   是否阻塞
      		nil,
      	)
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 试探性创建队列,这里注意队列名称不要写
      	q, err := r.channel.QueueDeclare(
      		"",    // 队列名这里为空 表示随机生产队列名称 因为每个消费者就需要一个队列
      		false, // 控制是否持久化
      		false, // 是否自动删除,当最后一个消费者断开连接后是否删除
      		true,  // 是否具有排他性 这里设置为true
      		false, // 是否阻塞
      		nil,   //  额外属性
      	)
      	r.failOnErr(err, "Failed to declare a queue!")
      
      	// 绑定队列到exchange中
      	err = r.channel.QueueBind(
      		q.Name,      // 这里就是上面创建的/存在的交换机随即名字
      		"",     // 在pub/sub模式下,这里的key为空
      		r.Exchange,  //  绑定到一个exchange上
      		false,
      		nil,
      	)
      
      	// 和Simple模式类似
      	messages, err := r.channel.Consume(
      		q.Name,
      		"",
      		true,
      		false,
      		false,
      		false,
      		nil,
      	)
      
      	forever := make(chan bool)
      	go func() {
      		for d := range messages {
      			log.Printf("Received a message: %s", d.Body)
      		}
      	}()
      	fmt.Println("退出按:CTRL + C 
      ")
      	<-forever
      
      }
      
      /*
      	路由模式:需要将使用交换机上该为'direct', 然后将队列的key绑定到交换机上
      */
      // 路由模式, 创建RabbitMQ实例, routingKey 用来匹配规则
      func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {
      	// 创建RabbitMQ实例
      	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
      	var err error
      	// 获取connection
      	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
      	rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
      	// 获取channel
      	rabbitmq.channel, err = rabbitmq.conn.Channel()
      	rabbitmq.failOnErr(err, "failed to open channel")
      	return rabbitmq
      }
      
      // 路由模式发送消息
      func (r *RabbitMQ) PublishRouting(message string) {
      	// 1 尝试创建交换机
      	err := r.channel.ExchangeDeclare(
      			r.Exchange,
      			"direct",  // 改成direct
      			true,
      			false,
      			false,
      			false,
      			nil,
      		)
      
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 2 发送消息加上key
      	err = r.channel.Publish(
      			r.Exchange,
      			r.Key,   // 在设置key 将生产者中逃入这个key
      			false,
      			false,
      			amqp.Publishing{
      				ContentType:     "text/plain",
      				Body:            []byte(message),
      			})
      }
      
      //路由模式接受消息
      func (r *RabbitMQ) RecieveRouting() {
      	// 1 试探性创建交换机
      	err := r.channel.ExchangeDeclare(
      			r.Exchange,
      			"direct",
      			true,
      			false,
      			false,
      			false,
      			nil,
      		)
      	r.failOnErr(err, "Failed to declare an exchange!")
      
      	// 试探性创建队列,这里注意队列名称不要写
      	q, err := r.channel.QueueDeclare(
      		"",    // 队列名这里为空 表示随机生产队列名称 因为每个消费者就需要一个队列
      		false, // 控制是否持久化
      		false, // 是否自动删除,当最后一个消费者断开连接后是否删除
      		true,  // 是否具有排他性 这里设置为true
      		false, // 是否阻塞
      		nil,   //  额外属性
      	)
      	r.failOnErr(err, "Failed to declare a queue!")
      
      	// 绑定队列到exchange中
      	err = r.channel.QueueBind(
      		q.Name,      // 这里就是上面创建的/存在的交换机随即名字
      		r.Key,      // 需要设置key到交换机上
      		r.Exchange,  //  绑定到一个exchange上
      		false,
      		nil,
      	)
      
      	// 消费消息
      	messages, err := r.channel.Consume(
      		q.Name,
      		"",
      		true,
      		false,
      		false,
      		false,
      		nil,
      	)
      
      	forever := make(chan bool)
      	go func() {
      		for d := range messages {
      			log.Printf("Received a message: %s", d.Body)
      		}
      	}()
      	fmt.Println("退出按:CTRL + C 
      ")
      	<-forever
      }
      
    • RabbitMQRouting/publishRouting.go
      package main
      
      import (
      	"demo/RabbitMQ"
      	"fmt"
      	"strconv"
      	"time"
      )
      
      func main() {
      	pub := RabbitMQ.NewRabbitMQRouting("exTest","key1")
      	pub2 := RabbitMQ.NewRabbitMQRouting("exTest","key2")
      
      	for i:=0;i<10;i++{
      		pub.PublishRouting("Hello Test one!" +strconv.Itoa(i))
      		pub2.PublishRouting("Hello Test Two!" +strconv.Itoa(i))
      
      		time.Sleep(1*time.Second)
      		fmt.Println(i)
      	}
      }
      
    • RabbitMQRouting/receiveOne.go
      package main
      
      import "demo/RabbitMQ"
      
      func main() {
      	one := RabbitMQ.NewRabbitMQRouting("exTest","key1")
      	one.RecieveRouting()
      }
      
    • RabbitMQRouting/receiveTwo.go
      package main
      
      import "demo/RabbitMQ"
      
      func main() {
      	one := RabbitMQ.NewRabbitMQRouting("exTest","key2")
      	one.RecieveRouting()
      }
      
  • 相关阅读:
    StopAllSounds
    GotoAndPlay
    区间(interval)
    因数(factor)
    [HAOI2009]逆序对数列
    生物分子gene
    数轴line
    [SCOI2008]配对
    精力(power)
    bzoj4987: Tree(树形dp)
  • 原文地址:https://www.cnblogs.com/double-W/p/12587773.html
Copyright © 2011-2022 走看看