zoukankan      html  css  js  c++  java
  • golang rabbitmq 工具类

    package mq
    
    import (
    	"bytes"
    	"errors"
    	"github.com/streadway/amqp"
    	"strings"
    )
    
    var conn *amqp.Connection
    var channel *amqp.Channel
    var exchanges string
    var topics string
    var hasMQ bool = false
    var mqAddr string
    
    type Reader interface {
    	Read(msg *string) (err error)
    }
    
    // 初始化 参数格式:amqp://用户名:密码@地址:端口号/host
    func SetupRMQ(rmqAddr string) (err error) {
    	//用于重连
    	mqAddr = rmqAddr
    
    	if channel == nil || conn == nil {
    		conn, err = amqp.Dial(rmqAddr)
    		if err != nil {
    			return err
    		}
    
    		channel, err = conn.Channel()
    		if err != nil {
    			return err
    		}
    
    		hasMQ = true
    	}
    	if conn.IsClosed() {
    		conn, err = amqp.Dial(rmqAddr)
    		if err != nil {
    			return err
    		}
    
    		channel, err = conn.Channel()
    		if err != nil {
    			return err
    		}
    
    		hasMQ = true
    	}
    	return nil
    }
    
    // 是否已经初始化
    func HasMQ() bool {
    	return hasMQ
    }
    
    // 测试连接是否正常
    func Ping() (err error) {
    
    	if !hasMQ || channel == nil {
    		return errors.New("RabbitMQ is not initialize")
    	}
    
    	err = channel.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil)
    	if err != nil {
    		return err
    	}
    
    	msgContent := "ping.ping"
    
    	err = channel.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{
    		ContentType: "text/plain",
    		Body:        []byte(msgContent),
    	})
    
    	if err != nil {
    		return err
    	}
    
    	err = channel.ExchangeDelete("ping.ping", false, false)
    
    	return err
    }
    
    // 发布消息
    func Publish(exchange, routeKey string, msg string, priority uint8) (err error) {
    	if conn == nil {
    		_ = SetupRMQ(mqAddr)
    	}
    
    	if conn.IsClosed() {
    		_ = SetupRMQ(mqAddr)
    	}
    
    	if exchanges == "" || !strings.Contains(exchanges, exchange) {
    		err = channel.ExchangeDeclare(exchange, "topic", true, false, false, true, nil)
    		if err != nil {
    			return err
    		}
    		err = channel.ExchangeDeclare(exchange+"_dlx", "topic", true, false, false, true, nil)
    		if err != nil {
    			return err
    		}
    		exchanges += "  " + exchange + "  "
    	}
    
    	err = channel.Publish(exchange, routeKey, false, false, amqp.Publishing{
    		Priority:     priority,
    		DeliveryMode: amqp.Persistent,
    		ContentType:  "text/plain",
    		Body:         []byte(msg),
    	})
    	if err != nil {
    		_ = SetupRMQ(mqAddr)
    	}
    	return err
    }
    
    // 监听接收到的消息
    func Receive(exchange, topic string, reader func(msg *string)) (err error) {
    	if exchanges == "" || !strings.Contains(exchanges, exchange) {
    		err = channel.ExchangeDeclare(exchange, "topic", true, false, false, true, nil)
    		if err != nil {
    			return err
    		}
    		exchanges += "  " + exchange + "  "
    	}
    	if topics == "" || !strings.Contains(topics, topic) {
    		//声明队列为优先级队列
    		queeuDeclareArgs := make(map[string]interface{})
    		queeuDeclareArgs["x-max-priority"] = 255
    		_, err = channel.QueueDeclare(topic, true, false, false, true, queeuDeclareArgs)
    		if err != nil {
    			return err
    		}
    		err = channel.QueueBind(topic, exchange, exchange, true, nil)
    		if err != nil {
    			return err
    		}
    		topics += "  " + topic + "  "
    	}
    
    	msgs, err := channel.Consume(topic, "", true, false, false, false, nil)
    	if err != nil {
    		return err
    	}
    
    	go func() {
    		//fmt.Println(*msgs)
    		for d := range msgs {
    			s := bytesToString(&(d.Body))
    			reader(s)
    		}
    	}()
    
    	return nil
    }
    
    // 关闭连接
    func Close() {
    	channel.Close()
    	conn.Close()
    	hasMQ = false
    }
    
    func bytesToString(b *[]byte) *string {
    	s := bytes.NewBuffer(*b)
    	r := s.String()
    	return &r
    }
    

      

  • 相关阅读:
    218. The Skyline Problem
    327. 区间和的个数
    37 Sudoku Solver
    36. Valid Sudoku
    差分数组(1109. 航班预订统计)
    android开发里跳过的坑——onActivityResult在启动另一个activity的时候马上回调
    重启系统media服务
    android源码mm时的编译错误no ruler to make target `out/target/common/obj/JAVA_LIBRARIES/xxxx/javalib.jar', needed by `out/target/common/obj/APPS/xxxx_intermediates/classes-full-debug.jar'. Stop.
    关于android系统启动不同activity默认过渡动画不同的一些认识
    android开发里跳过的坑——android studio 错误Error:Execution failed for task ':processDebugManifest'. > Manifest merger failed with multiple errors, see logs
  • 原文地址:https://www.cnblogs.com/zipon/p/12980937.html
Copyright © 2011-2022 走看看