zoukankan      html  css  js  c++  java
  • golang操作Rabbit

    GOLANG操作rabbitmq

    简单模式

    一个生产者对应一个消费者!!!

    生产者

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    	/*
    		name:queue的名称
    		durable:是否持久化
    		autoDelete: 是否自动删除(当没有customer时会自动删除)
    		exclusive:
    			1.是否独占,只有一个消费者监听这个队列
    			2.当coon关闭的时候,是否删除队列
    		noWait: 是否等待
    		args: 额外的参数
    	*/
    	// 发布消息
    	message := amqp.Publishing{
    		Body: []byte("hello world"),
    	}
    	if err := channel.Publish("", "hello", false, false, message); err != nil {
    		fmt.Println(err)
    	}
    	/*
    		exchange:交换机的名称,简单模式下使用默认的""
    		routerKey: 路由名称, 简单模式下使用和队列名称一样
    	*/
    }
    

    消费者

    func main() {
    	url := "amqp://guest:guest@localhost:5672/"
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
    
    	delivery, err := channel.Consume("hello", "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Println(string(message.Body))
    	}
    }
    

    工作模式

    一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    	"strconv"
    )
    
    const (
    	url   = "amqp://guest:guest@localhost:5672/"
    	queue = "workMode"
    )
    
    func Consume(name string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s   body:%s
    ", name, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	for i := 0; i < 10; i++ {
    		message := amqp.Publishing{
    			Body: []byte("hello world" + strconv.Itoa(i)),
    		}
    		if err := channel.Publish("", queue, false, false, message); err != nil {
    			fmt.Println(err)
    		}
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	for i := 0; i < 2; i++ {
    		go Consume(fmt.Sprintf("name%d", i))
    	}
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    

    测试结果

    name:name1   body:hello world0
    name:name1   body:hello world2
    name:name1   body:hello world4
    name:name1   body:hello world6
    name:name1   body:hello world8
    name:name0   body:hello world1
    name:name0   body:hello world3
    name:name0   body:hello world5
    name:name0   body:hello world7
    name:name0   body:hello world9
    

    发布订阅模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "TestFanout"
    	queue1       = "TestFanoutQueue1"
    	queue2       = "TestFanoutQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeFanout, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	message := amqp.Publishing{
    		Body: []byte("ExchangeFanout"),
    	}
        // 使用FANOUT时,routeingKey设置为空字符串
    	if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    路由模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url             = "amqp://guest:guest@localhost:5672/"
    	exchangeName    = "direct"
    	errorRouteKey   = "error"
    	infoRouteKey    = "info"
    	warningRouteKey = "warning"
    	queue1          = "directQueue1"
    	queue2          = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,        // name
    		amqp.ExchangeDirect, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    此时生产者发送的路由key为info,只有queue2才能收到,如果把路由key改为error,则两个队列都能收到。

    通配符模式

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    const (
    	url          = "amqp://guest:guest@localhost:5672/"
    	exchangeName = "topic"
    	queue1       = "directQueue1"
    	queue2       = "directQueue2"
    )
    
    func Consume(queue string, action string) {
    
    	// 创建链接
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	// 获取channel
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 注意:这里一定要写上,否则有异常,ioException
    	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
    
    	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    	for message := range delivery {
    		fmt.Printf("name:%s  action:%s body:%s
    ", queue, action, string(message.Body))
    	}
    }
    
    func Publish() {
    	coon, err := amqp.Dial(url)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer coon.Close()
    
    	channel, err := coon.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer channel.Close()
    
    	// 创建交换机
    	if err := channel.ExchangeDeclare(
    		exchangeName,       // name
    		amqp.ExchangeTopic, // kind
    		false,              // durable
    		true,               // autoDelete
    		false,              // internal 是否rabbitmq内部使用
    		true,               // noWait
    		nil,                // args
    	); err != nil {
    		log.Fatal(err)
    	}
    
    	// 声明队列,没有则创建
    	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
    	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
    
    	// 队列1绑定error
    	if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	// 队列2绑定error,info,warning
    	if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    	if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
    		log.Fatal(err)
    	}
    
    	message := amqp.Publishing{
    		Body: []byte("ExchangeDirect"),
    	}
    	// 发送消息指定交换机和路由key
    	if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
    		log.Fatal(err)
    	}
    }
    
    func main() {
    	// 开启两个消费者
    	go Consume(queue1, "记录日志")
    	go Consume(queue2, "保存信息")
    	// 开启一个生产者
    	go Publish()
    	// 阻塞主goroutine
    	<-make(chan int)
    }
    
    

    通配符模式是路由模式的进阶版,它能够让一个队列监听动态的路由规则。

  • 相关阅读:
    今天开始用 VSU 2010
    Visual Studio 2010 模型设计工具 基本应用
    Asp.Net访问Oracle 数据库 执行SQL语句和调用存储过程
    Enterprise Library 4.1 Security Block 快速使用图文笔记
    解决“System.Data.OracleClient 需要 Oracle 客户端软件 8.1.7 或更高版本。”(图)
    一个Oracle存储过程示例
    Enterprise Library 4.1 Application Settings 快速使用图文笔记
    Oracle 10g for Windows 简体中文版的安装过程
    Oracle 11g for Windows 简体中文版的安装过程
    Oracle 9i 数据库 创建数据库 Net 配置 创建表 SQL查询 创建存储过程 (图)
  • 原文地址:https://www.cnblogs.com/ivy-blogs/p/14201907.html
Copyright © 2011-2022 走看看