zoukankan      html  css  js  c++  java
  • golang操作Rabbit

    GOLANG操作rabbitmq

    简单模式

    一个生产者对应一个消费者!!!

    生产者

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    	/*
    		name:queue的名称
    		durable:是否持久化
    		autoDelete: 是否自动删除(当没有customer时会自动删除)
    		exclusive:
    			1.是否独占,只有一个消费者监听这个队列
    			2.当coon关闭的时候,是否删除队列
    		noWait: 是否等待
    		args: 额外的参数
    	*/
    	// 发布消息
    	message := amqp.Publishing{
    		Body: []byte("hello world"),
    	}
    	if err := channel.Publish("", "hello", false, false, message); err != nil {
    		fmt.Println(err)
    	}
    	/*
    		exchange:交换机的名称,简单模式下使用默认的""
    		routerKey: 路由名称, 简单模式下使用和队列名称一样
    	*/
    }
    

    消费者

    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    
    	delivery, err := channel.Consume("hello", "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Println(string(message.Body))
    	}
    }
    

    工作模式

    一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    	"strconv"
    )
    
    const (
    	url   = "amqp://guest:guest@localhost:5672/"
    	queue = "workMode"
    )
    
    func Consume(name string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s   body:%s
    ", name, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	for i := 0; i < 10; i++ {
    		message := amqp.Publishing{
    			Body: []byte("hello world" + strconv.Itoa(i)),
    		}
    		if err := channel.Publish("", queue, false, false, message); err != nil {
    			fmt.Println(err)
    		}
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	for i := 0; i < 2; i++ {
    		go Consume(fmt.Sprintf("name%d", i))
    	}
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    

    测试结果

    name:name1   body:hello world0
    name:name1   body:hello world2
    name:name1   body:hello world4
    name:name1   body:hello world6
    name:name1   body:hello world8
    name:name0   body:hello world1
    name:name0   body:hello world3
    name:name0   body:hello world5
    name:name0   body:hello world7
    name:name0   body:hello world9
    

    发布订阅模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "TestFanout"
    	queue1       = "TestFanoutQueue1"
    	queue2       = "TestFanoutQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeFanout, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	message := amqp.Publishing{
    		Body: []byte("ExchangeFanout"),
    	}
        // 使用FANOUT时,routeingKey设置为空字符串
    	if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    路由模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url             = "amqp://guest:guest@localhost:5672/"
    	exchangeName    = "direct"
    	errorRouteKey   = "error"
    	infoRouteKey    = "info"
    	warningRouteKey = "warning"
    	queue1          = "directQueue1"
    	queue2          = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeDirect, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    此时生产者发送的路由key为info,只有queue2才能收到,如果把路由key改为error,则两个队列都能收到。

    通配符模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "topic"
    	queue1       = "directQueue1"
    	queue2       = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,       // name
    		amqp.ExchangeTopic, // kind
    		false,              // durable
    		true,               // autoDelete
    		false,              // internal 是否rabbitmq内部使用
    		true,               // noWait
    		nil,                // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    通配符模式是路由模式的进阶版,它能够让一个队列监听动态的路由规则。

  • 相关阅读:
    linux-log-files/
    SSL SSH
    C++学习的书籍
    Linux IO 分析
    LINUX 常用操作
    Find Large Files in Linux
    Linux 常见操作
    Linux Performance tool
    /linux-command-line-bash-shortcut-keys/
    Sed
  • 原文地址:https://www.cnblogs.com/ivy-blogs/p/14201907.html
Copyright © 2011-2022 走看看