zoukankan      html  css  js  c++  java
  • go rabbitmq延时队列

    参考了:https://www.cnblogs.com/mfrank/p/11260355.html#autoid-0-8-0

    demo:

    利用的是RabbitMQ中的TTL(Time To Live)特性

    package new_rabbitmq
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    	"time"
    )
    
    var (
    	dm  *RabbitMQ
    	dl  *RabbitMQ
    	err error
    	url = "amqp://guest:guest@localhost:5672/?heartbeat=5"
    )
    
    // 获取rabbitmq连接
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	//队列名称
    	QueueName string
    	//交换机
    	Exchange string
    	//key
    	key string
    	//连接信息
    	Url string
    }
    
    //创建RabbitMQ结构体实例
    func NewRabbitMQ(url, queueName, exchange, key string) (*RabbitMQ, error) {
    	rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Url: url}
    	var err error
    	//创建RabbitMQ连接
    	rabbitmq.conn, err = amqp.Dial(rabbitmq.Url)
    	//rabbitmq.failOnErr(err, "创建连接错误!")
    	if err != nil {
    		return nil, err
    	}
    
    	rabbitmq.channel, err = rabbitmq.conn.Channel()
    	if err != nil {
    		return nil, err
    	}
    	return rabbitmq, nil
    }
    // new一个延迟队列 绑定交换机和queue
    func NewDelayMq() *RabbitMQ {
    
    	dm, err = NewRabbitMQ(url, "delay_queue", "delay_exchange", "delay_key")
    	if err != nil {
    		fmt.Println("connect err:" + err.Error())
    	}
    	dm.SetExchange()
    	dm.SetAndBindQueue(map[string]interface{}{
    		"x-message-ttl":             10000, // 10s超时
    		"x-dead-letter-exchange":    "dead_letter_exchange", // 指定超时时发送到哪个死信交换机
    		"x-dead-letter-routing-key": "dead_letter_key", // 哪个死信队列负责处理
    	})
    	return dm
    }
    // new死信队列 绑定交换机和queue
    func NewDeadLetterMq() *RabbitMQ {
    	dl, err = NewRabbitMQ(url, "dead_letter_queue", "dead_letter_exchange", "dead_letter_key")
    	if err != nil {
    		fmt.Println("connect err:" + err.Error())
    	}
    	dl.SetExchange()
    	dl.SetAndBindQueue(nil)
    
    	return dl
    }
    // 延时队列推消息
    func DelayPublish() {
    	dm.DelayPublish("test delay1")
    	dm.DelayPublish("test delay2")
    }
    // 死信消费
    func DeadLetterConsume() {
    	dl.DeadLetterConsume()
    }
    
    func (r *RabbitMQ) SetExchange() {
    	err := r.channel.ExchangeDeclare(r.Exchange, "direct",
    		true,
    		false,
    		false,
    		false,
    		nil)
    
    	if err != nil {
    		fmt.Println("exchange declare err" + err.Error())
    	}
    }
    
    func (r *RabbitMQ) SetAndBindQueue(args amqp.Table) {
    	_, err := r.channel.QueueDeclare(r.QueueName, true,
    		false,
    		false,
    		false,
    		args)
    
    	if err != nil {
    		fmt.Println("queue declare err" + err.Error())
    	}
    
    	err = r.channel.QueueBind(r.QueueName, r.key, r.Exchange, false, nil)
    	if err != nil {
    		fmt.Println("queue bind err" + err.Error())
    	}
    }
    
    func (r *RabbitMQ) DelayPublish(msgStr string) {
    	msg := amqp.Publishing{
    		DeliveryMode: amqp.Persistent,
    		Timestamp:    time.Now(),
    		ContentType:  "text/plain",
    		Body:         []byte(msgStr),
    	}
    	err := r.channel.Publish(r.Exchange, r.key, false, false, msg)
    	if err != nil {
    		fmt.Println("DelayPublish err" + err.Error())
    	}
    	fmt.Println("delay queue push msg: " + msgStr)
    }
    
    func (r *RabbitMQ) DeadLetterConsume() {
    	_ = r.channel.Qos(1, 0, false)
    	msgChan, err := r.channel.Consume(r.QueueName, r.key,
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    	if err != nil {
    		fmt.Println("dead consume err:" + err.Error())
    	}
    
    	for delivery := range msgChan {
    		_ = delivery.Ack(true)
    		msg := string(delivery.Body)
    		fmt.Println("dead letter get msg:" + msg)
    	}
    }
    

      test:

    package new_rabbitmq
    
    import "testing"
    
    func TestDelay(t *testing.T) {
    	NewDelayMq()
    	NewDeadLetterMq()
    
    	DelayPublish()
    	DeadLetterConsume()
    }
    

      

    每天都是不想努力的一天....
  • 相关阅读:
    ftp-server(对象存储)
    zabbix监控VMware6.7
    linux安装中文字体
    vsftpd不支持目录软链接的解决办法
    linux内网IP如果判断出网IP地址
    mysql ANSI_QUOTES 这个sql_mode的作用(字段可以使用双引号)
    查看tomcat项目中,具体占用cpu高的线程。
    nginx ssl 自签证书实验
    Redis复制和哨兵部署
    利用Python脚本备份服务器上所有PostgreSQL数据库
  • 原文地址:https://www.cnblogs.com/Theia/p/15508015.html
Copyright © 2011-2022 走看看