生成子key: 通配符号 #
订阅:通配符 +
package main
import (
"encoding/json"
"fmt"
"os"
"time"
emitter "github.com/emitter-io/go"
)
type MyBody struct {
Id uint64
Body string
}
var localID uint64
func main() {
localID = 0
var mykey string
rMsg := &MyBody{}
o := emitter.NewClientOptions()
o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
fmt.Printf("Received message: %s
", msg.Payload())
err := json.Unmarshal(msg.Payload(), rMsg)
if err != nil {
fmt.Println("parse json data err!")
}
fmt.Printf("Received message: %d
", rMsg.Id)
if (localID + 1) != rMsg.Id {
fmt.Println("seq err!")
os.Exit(1)
}
localID = rMsg.Id
//fmt.Printf("Received message: %d
", rMsg.Id)
})
o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
fmt.Printf("get key: %s
", key)
mykey = key.Key
})
o.ClientID = "tttt_123456"
o.AddBroker("tcp://192.168.162.34:49250")
o.AddBroker("tcp://192.168.163.200:8081")
o.AddBroker("tcp://119.3.108.139:8081")
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
panic("Error on Client.Connect(): " + sToken.Error().Error())
}
getKey := emitter.NewKeyGenRequest()
getKey.Channel = "shuguo/#/"
getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
getKey.TTL = 0
getKey.Type = "rwlsp"
sToken = c.GenerateKey(getKey)
if sToken.Wait() && sToken.Error() != nil {
panic("Error on client generateKey(): " + sToken.Error().Error())
}
time.Sleep(2 * time.Second)
//c.Subscribe(mykey, "shuguo/unicast/user@111/")
c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
time.Sleep(1 * time.Second)
c.Publish(mykey, "shuguo/unicast/user@111", "hello")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
select {}
}
用 emitter 的client库封装了 paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s
", msg.Topic())
fmt.Printf("MSG: %s
", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111")
opts.SetKeepAlive(2 * time.Second)
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
token.Wait()
}
time.Sleep(3 * time.Second)
if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
time.Sleep(1 * time.Second)
}