zoukankan      html  css  js  c++  java
  • emitter-Client

    生成子key: 通配符号 #

    订阅:通配符 +

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"os"
    	"time"
    	emitter "github.com/emitter-io/go"
    )
    
    type MyBody struct {
    	Id   uint64
    	Body string
    }
    
    var localID uint64
    
    func main() {
    	localID = 0
    	var mykey string
    	rMsg := &MyBody{}
    	o := emitter.NewClientOptions()
    	o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
    		fmt.Printf("Received message: %s
    ", msg.Payload())
    		err := json.Unmarshal(msg.Payload(), rMsg)
    		if err != nil {
    			fmt.Println("parse json data err!")
    		}
    		fmt.Printf("Received message: %d
    ", rMsg.Id)
    
    		if (localID + 1) != rMsg.Id {
    			fmt.Println("seq err!")
    			os.Exit(1)
    		}
    		localID = rMsg.Id
    		//fmt.Printf("Received message: %d
    ", rMsg.Id)
    	})
    	o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
    		fmt.Printf("get key: %s
    ", key)
    		mykey = key.Key
    	})
    
    	o.ClientID = "tttt_123456"
    	o.AddBroker("tcp://192.168.162.34:49250")
    	o.AddBroker("tcp://192.168.163.200:8081")
    	o.AddBroker("tcp://119.3.108.139:8081")
    	c := emitter.NewClient(o)
    	sToken := c.Connect()
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on Client.Connect(): " + sToken.Error().Error())
    	}
    
    	getKey := emitter.NewKeyGenRequest()
    	getKey.Channel = "shuguo/#/"
    	getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
    	getKey.TTL = 0
    	getKey.Type = "rwlsp"
    	sToken = c.GenerateKey(getKey)
    	if sToken.Wait() && sToken.Error() != nil {
    		panic("Error on client generateKey(): " + sToken.Error().Error())
    	}
    	time.Sleep(2 * time.Second)
    	//c.Subscribe(mykey, "shuguo/unicast/user@111/")
    	c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
    	time.Sleep(1 * time.Second)
    	c.Publish(mykey, "shuguo/unicast/user@111", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
    	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
    	select {}
    }
    

      用 emitter 的client库封装了  paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	fmt.Printf("TOPIC: %s
    ", msg.Topic())
    	fmt.Printf("MSG: %s
    ", msg.Payload())
    }
    
    func main() {
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetDefaultPublishHandler(f)
    	opts.SetPingTimeout(1 * time.Second)
    
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    	if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	for i := 0; i < 5; i++ {
    		text := fmt.Sprintf("this is msg #%d!", i)
    		token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
    		token.Wait()
    	}
    
    	time.Sleep(3 * time.Second)
    
    	if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	c.Disconnect(250)
    
    	time.Sleep(1 * time.Second)
    }
    

      

  • 相关阅读:
    待办
    安卓微信浏览器修改的代码总是不生效
    微信浏览器内核2
    微信浏览器内核
    随记
    三次握手最后一个ack没有收到怎么办?
    判断偶数:
    利用kubeadm工具安装Kubernetes1.15版本
    kubernetes安装Helm
    最大子列和(附加子列初始元素和末尾元素)
  • 原文地址:https://www.cnblogs.com/yorkyang/p/12145910.html
Copyright © 2011-2022 走看看