生成子key: 通配符号 #
订阅:通配符 +
package main import ( "encoding/json" "fmt" "os" "time" emitter "github.com/emitter-io/go" ) type MyBody struct { Id uint64 Body string } var localID uint64 func main() { localID = 0 var mykey string rMsg := &MyBody{} o := emitter.NewClientOptions() o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) { fmt.Printf("Received message: %s ", msg.Payload()) err := json.Unmarshal(msg.Payload(), rMsg) if err != nil { fmt.Println("parse json data err!") } fmt.Printf("Received message: %d ", rMsg.Id) if (localID + 1) != rMsg.Id { fmt.Println("seq err!") os.Exit(1) } localID = rMsg.Id //fmt.Printf("Received message: %d ", rMsg.Id) }) o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) { fmt.Printf("get key: %s ", key) mykey = key.Key }) o.ClientID = "tttt_123456" o.AddBroker("tcp://192.168.162.34:49250") o.AddBroker("tcp://192.168.163.200:8081") o.AddBroker("tcp://119.3.108.139:8081") c := emitter.NewClient(o) sToken := c.Connect() if sToken.Wait() && sToken.Error() != nil { panic("Error on Client.Connect(): " + sToken.Error().Error()) } getKey := emitter.NewKeyGenRequest() getKey.Channel = "shuguo/#/" getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server getKey.TTL = 0 getKey.Type = "rwlsp" sToken = c.GenerateKey(getKey) if sToken.Wait() && sToken.Error() != nil { panic("Error on client generateKey(): " + sToken.Error().Error()) } time.Sleep(2 * time.Second) //c.Subscribe(mykey, "shuguo/unicast/user@111/") c.Subscribe(mykey, "shuguo/aaa/unicast/+/") time.Sleep(1 * time.Second) c.Publish(mykey, "shuguo/unicast/user@111", "hello") //c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello") //c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world") //c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!") //c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020") select {} }
用 emitter 的client库封装了 paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)
package main import ( "fmt" "log" "os" "time" "github.com/eclipse/paho.mqtt.golang" ) var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("TOPIC: %s ", msg.Topic()) fmt.Printf("MSG: %s ", msg.Payload()) } func main() { mqtt.DEBUG = log.New(os.Stdout, "", 0) mqtt.ERROR = log.New(os.Stdout, "", 0) opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111") opts.SetKeepAlive(2 * time.Second) opts.SetDefaultPublishHandler(f) opts.SetPingTimeout(1 * time.Second) c := mqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text) token.Wait() } time.Sleep(3 * time.Second) if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } c.Disconnect(250) time.Sleep(1 * time.Second) }