虽然个人也不怎么推荐activeMQ, 只是由于项目需要, 所以也做一个简单的整理, 在订阅的时候 ,一般我们的业务都是处理字符串,但有时候AckMode 设置为AckAuto不可以,客服端处理完业务后在发回确认,所以订阅封装了2个方法
utils.go如下:
//Usage: // // //Send // if err := utils.NewActiveMQ("localhost:61613").Send("/queue/test-1", "test from 1"); err != nil { // fmt.Println("AMQ ERROR:", err) // // //this func will handle the messges get from activeMQ server. // handler := func(msg string,err error) { fmt.Println("AMQ MSG:", err, msg) } // if err := utils.NewActiveMQ("localhost:61613").Subscribe("/queue/test-1", handler); err != nil { // fmt.Println("AMQ ERROR:", err) // } // package utils import ( "time" "github.com/go-stomp/stomp" ) type ActiveMQ struct { Addr string } var options = []func(*stomp.Conn) error{ //设置读写超时,超时时间为1个小时 stomp.ConnOpt.HeartBeat(7200*time.Second, 7200*time.Second), stomp.ConnOpt.HeartBeatError(360 * time.Second), } //New activeMQ with addr[eg:localhost:61613] as host address. func NewActiveMQ(addr string) *ActiveMQ { if addr == "" { addr = "localhost:61613" } return &ActiveMQ{addr} } // Used for health check func (this *ActiveMQ) Check() error { conn, err := this.Connect() if err == nil { defer conn.Disconnect() return nil } else { return err } } // Connect to activeMQ func (this *ActiveMQ) Connect() (*stomp.Conn, error) { return stomp.Dial("tcp", this.Addr, options...) } // Send msg to destination func (this *ActiveMQ) Send(destination string, msg string) error { conn, err := this.Connect() if err != nil { return err } defer conn.Disconnect() return conn.Send( destination, // destination "text/plain", // content-type []byte(msg)) // body } // Subscribe Message from destination // func handler handle msg reveived from destination func (this *ActiveMQ) Subscribe(destination string, ack stomp.AckMode, handler func(msg *stomp.Message, con *stomp.Conn, err error)) error { conn, err := this.Connect() if err != nil { return err } //sub, err := conn.Subscribe(destination, stomp.AckAuto) sub, err := conn.Subscribe(destination, ack) if err != nil { return err } defer conn.Disconnect() defer sub.Unsubscribe() for { m := <-sub.C handler(m, conn, m.Err) } return err } // func (this *ActiveMQ) SubscribeAuto(destination string, handler func(msg string, err error)) error { conn, err := this.Connect() if err != nil { return err } sub, err := conn.Subscribe(destination, stomp.AckAuto) if err != nil { return err } defer conn.Disconnect() defer sub.Unsubscribe() for { m := <-sub.C handler(string(m.Body), m.Err) } return err }
调用就非常简单了:
package main import ( "fmt" "main/utils" "strconv" "github.com/go-stomp/stomp" ) func main() { //生产者 go func() { mq := utils.NewActiveMQ("localhost:61613") for i := 0; i < 100; i++ { mq.Send("main", "demo"+strconv.Itoa(i+1)) } }() //消费者 go func() { mq := utils.NewActiveMQ("localhost:61613") //mq.SubscribeAuto("main", handler) mq.Subscribe("main", stomp.AckClient, handler2) }() fmt.Println("activeMQ test") var s string fmt.Scan(&s) } func handler(msg string, err error) { if err != nil { fmt.Println(err) } else { fmt.Println("AMQ MSG:", msg) } } func handler2(msg *stomp.Message, con *stomp.Conn, err error) { if err != nil { fmt.Println(err) } else { fmt.Println("AMQ MSG:", string(msg.Body)) con.Ack(msg) } }