zoukankan      html  css  js  c++  java
  • emitter-Client

    生成子key: 通配符号 #

    订阅:通配符 +

    package main
    import (
    	emitter "github.com/emitter-io/go"
    type MyBody struct {
    	Id   uint64
    	Body string
    var localID uint64
    func main() {
    	localID = 0
    	var mykey string
    	rMsg := &MyBody{}
    	o := emitter.NewClientOptions()
    	o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
    		fmt.Printf("Received message: %s
    ", msg.Payload())
    		err := json.Unmarshal(msg.Payload(), rMsg)
    		if err != nil {
    			fmt.Println("parse json data err!")
    		fmt.Printf("Received message: %d
    ", rMsg.Id)
    		if (localID + 1) != rMsg.Id {
    			fmt.Println("seq err!")
    		localID = rMsg.Id
    		//fmt.Printf("Received message: %d
    ", rMsg.Id)
    	o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
    		fmt.Printf("get key: %s
    ", key)
    		mykey = key.Key
    	o.ClientID = "tttt_123456"
    	c := emitter.NewClient(o)
    	sToken := c.Connect()
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on Client.Connect(): " + sToken.Error().Error())
    	getKey := emitter.NewKeyGenRequest()
    	getKey.Channel = "shuguo/#/"
    	getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
    	getKey.TTL = 0
    	getKey.Type = "rwlsp"
    	sToken = c.GenerateKey(getKey)
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on client generateKey(): " + sToken.Error().Error())
    	time.Sleep(2 * time.Second)
    	//c.Subscribe(mykey, "shuguo/unicast/user@111/")
    	c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
    	time.Sleep(1 * time.Second)
    	c.Publish(mykey, "shuguo/unicast/user@111", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
    	select {}

      用 emitter 的client库封装了  paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)

    package main
    import (
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	fmt.Printf("TOPIC: %s
    ", msg.Topic())
    	fmt.Printf("MSG: %s
    ", msg.Payload())
    func main() {
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker("tcp://").SetClientID("vm-11111")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetPingTimeout(1 * time.Second)
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    	if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
    	for i := 0; i < 5; i++ {
    		text := fmt.Sprintf("this is msg #%d!", i)
    		token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
    	time.Sleep(3 * time.Second)
    	if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
    	time.Sleep(1 * time.Second)


  • 相关阅读:
    30 Day Challenge Day 18 | Leetcode 200. Number of Islands (BFS)
    30 Day Challenge Day 18 | Leetcode 701. Insert into a Binary Search Tree
    30 Day Challenge Day 17 | Leetcode 261. Graph Valid Tree
    30 Day Challenge Day 17 | Leetcode 559. Maximum Depth of N-ary Tree
    30 Day Challenge Day 17 | Leetcode 133. Clone Graph
    30 Day Challenge Day 17 | Leetcode 126. Word Ladder II
    30 Day Challenge Day 17 | Leetcode 489. Robot Room Cleaner
    30 Day Challenge Day 17 | Leetcode 127. Word Ladder
    30 Day Challenge Day 17 | Leetcode 624. Maximum Distance in Arrays
    30 Day Challenge Day 16 | Leetcode 701. Insert into a Binary Search Tree
  • 原文地址:https://www.cnblogs.com/yorkyang/p/12145910.html
Copyright © 2011-2022 走看看