zoukankan      html  css  js  c++  java
  • emitter-Client

    生成子key: 通配符号 #

    订阅:通配符 +

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"os"
    	"time"
    	emitter "github.com/emitter-io/go"
    )
    
    type MyBody struct {
    	Id   uint64
    	Body string
    }
    
    var localID uint64
    
    func main() {
    	localID = 0
    	var mykey string
    	rMsg := &MyBody{}
    	o := emitter.NewClientOptions()
    	o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
    		fmt.Printf("Received message: %s
    ", msg.Payload())
    		err := json.Unmarshal(msg.Payload(), rMsg)
    		if err != nil {
    			fmt.Println("parse json data err!")
    		}
    		fmt.Printf("Received message: %d
    ", rMsg.Id)
    
    		if (localID + 1) != rMsg.Id {
    			fmt.Println("seq err!")
    			os.Exit(1)
    		}
    		localID = rMsg.Id
    		//fmt.Printf("Received message: %d
    ", rMsg.Id)
    	})
    	o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
    		fmt.Printf("get key: %s
    ", key)
    		mykey = key.Key
    	})
    
    	o.ClientID = "tttt_123456"
    	o.AddBroker("tcp://192.168.162.34:49250")
    	o.AddBroker("tcp://192.168.163.200:8081")
    	o.AddBroker("tcp://119.3.108.139:8081")
    	c := emitter.NewClient(o)
    	sToken := c.Connect()
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on Client.Connect(): " + sToken.Error().Error())
    	}
    
    	getKey := emitter.NewKeyGenRequest()
    	getKey.Channel = "shuguo/#/"
    	getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
    	getKey.TTL = 0
    	getKey.Type = "rwlsp"
    	sToken = c.GenerateKey(getKey)
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on client generateKey(): " + sToken.Error().Error())
    	}
    	time.Sleep(2 * time.Second)
    	//c.Subscribe(mykey, "shuguo/unicast/user@111/")
    	c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
    	time.Sleep(1 * time.Second)
    	c.Publish(mykey, "shuguo/unicast/user@111", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
    	select {}
    }
    

      用 emitter 的client库封装了  paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	fmt.Printf("TOPIC: %s
    ", msg.Topic())
    	fmt.Printf("MSG: %s
    ", msg.Payload())
    }
    
    func main() {
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetDefaultPublishHandler(f)
    	opts.SetPingTimeout(1 * time.Second)
    
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    	if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	for i := 0; i < 5; i++ {
    		text := fmt.Sprintf("this is msg #%d!", i)
    		token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
    		token.Wait()
    	}
    
    	time.Sleep(3 * time.Second)
    
    	if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	c.Disconnect(250)
    
    	time.Sleep(1 * time.Second)
    }
    

      

  • 相关阅读:
    【PAT甲级】1043 Is It a Binary Search Tree (25 分)(判断是否为BST的先序遍历并输出后序遍历)
    Educational Codeforces Round 73 (Rated for Div. 2)F(线段树,扫描线)
    【PAT甲级】1042 Shuffling Machine (20 分)
    【PAT甲级】1041 Be Unique (20 分)(多重集)
    【PAT甲级】1040 Longest Symmetric String (25 分)(cin.getline(s,1007))
    【PAT甲级】1039 Course List for Student (25 分)(vector嵌套于map,段错误原因未知)
    Codeforces Round #588 (Div. 2)E(DFS,思维,__gcd,树)
    2017-3-9 SQL server 数据库
    2017-3-8 学生信息展示习题
    2017-3-5 C#基础 函数--递归
  • 原文地址:https://www.cnblogs.com/yorkyang/p/12145910.html
Copyright © 2011-2022 走看看