zoukankan      html  css  js  c++  java
  • golang操作Rabbit

    GOLANG操作rabbitmq

    简单模式

    一个生产者对应一个消费者!!!

    生产者

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    	/*
    		name:queue的名称
    		durable:是否持久化
    		autoDelete: 是否自动删除(当没有customer时会自动删除)
    		exclusive:
    			1.是否独占,只有一个消费者监听这个队列
    			2.当coon关闭的时候,是否删除队列
    		noWait: 是否等待
    		args: 额外的参数
    	*/
    	// 发布消息
    	message := amqp.Publishing{
    		Body: []byte("hello world"),
    	}
    	if err := channel.Publish("", "hello", false, false, message); err != nil {
    		fmt.Println(err)
    	}
    	/*
    		exchange:交换机的名称,简单模式下使用默认的""
    		routerKey: 路由名称, 简单模式下使用和队列名称一样
    	*/
    }
    

    消费者

    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    
    	delivery, err := channel.Consume("hello", "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Println(string(message.Body))
    	}
    }
    

    工作模式

    一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    	"strconv"
    )
    
    const (
    	url   = "amqp://guest:guest@localhost:5672/"
    	queue = "workMode"
    )
    
    func Consume(name string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s   body:%s
    ", name, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	for i := 0; i < 10; i++ {
    		message := amqp.Publishing{
    			Body: []byte("hello world" + strconv.Itoa(i)),
    		}
    		if err := channel.Publish("", queue, false, false, message); err != nil {
    			fmt.Println(err)
    		}
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	for i := 0; i < 2; i++ {
    		go Consume(fmt.Sprintf("name%d", i))
    	}
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    

    测试结果

    name:name1   body:hello world0
    name:name1   body:hello world2
    name:name1   body:hello world4
    name:name1   body:hello world6
    name:name1   body:hello world8
    name:name0   body:hello world1
    name:name0   body:hello world3
    name:name0   body:hello world5
    name:name0   body:hello world7
    name:name0   body:hello world9
    

    发布订阅模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "TestFanout"
    	queue1       = "TestFanoutQueue1"
    	queue2       = "TestFanoutQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeFanout, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	message := amqp.Publishing{
    		Body: []byte("ExchangeFanout"),
    	}
        // 使用FANOUT时,routeingKey设置为空字符串
    	if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    路由模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url             = "amqp://guest:guest@localhost:5672/"
    	exchangeName    = "direct"
    	errorRouteKey   = "error"
    	infoRouteKey    = "info"
    	warningRouteKey = "warning"
    	queue1          = "directQueue1"
    	queue2          = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeDirect, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    此时生产者发送的路由key为info,只有queue2才能收到,如果把路由key改为error,则两个队列都能收到。

    通配符模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "topic"
    	queue1       = "directQueue1"
    	queue2       = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,       // name
    		amqp.ExchangeTopic, // kind
    		false,              // durable
    		true,               // autoDelete
    		false,              // internal 是否rabbitmq内部使用
    		true,               // noWait
    		nil,                // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    通配符模式是路由模式的进阶版,它能够让一个队列监听动态的路由规则。

  • 相关阅读:
    高级特性(4)- 数据库编程
    UVA Jin Ge Jin Qu hao 12563
    UVA 116 Unidirectional TSP
    HDU 2224 The shortest path
    poj 2677 Tour
    【算法学习】双调欧几里得旅行商问题(动态规划)
    南洋理工大学 ACM 在线评测系统 矩形嵌套
    UVA The Tower of Babylon
    uva A Spy in the Metro(洛谷 P2583 地铁间谍)
    洛谷 P1095 守望者的逃离
  • 原文地址:https://www.cnblogs.com/ivy-blogs/p/14201907.html
Copyright © 2011-2022 走看看