zoukankan      html  css  js  c++  java
  • 源码实践_实现一个mqtt(1)

    源码实践_实现一个mqtt(1)

    https://gitee.com/maomaomaoge/opmq

    写这个原因

    因为用开源的emqx,背后的语言是erlang,不是计算的语言,但是天生的分布式,单机测试吞吐非常容易卡死,部署分布式,说实话,我不会,小公司也没有那个人力去部署,一般用一个单机搞,能多大,就多大,所以,我想实现一个单机最大效率的mq(当然这和机器的网卡和cpu有关),只是支持简单的发布订阅即可,不正那么多弯弯绕,小公司用不到的东西

    设计topic

    因为我没台关注topic的解析,但是可以为以后考虑到是topic主导,还是tcp连接主导。也就是用连接去找topic,还是topic去找tcp,最终敲定topic下面带着tcp切片。数据结构用谷歌的Btree,因为这个数据结构在内存查询很快。

    下面是初步实现的topic和tcp的关系,结合了btree

    package opmq
    
    import (
    	"github.com/google/btree"
    	"net"
    )
    
    type Topic struct {
    	Name string
    	Conn []net.Conn
    	Hash int64
    }
    
    func NewTopic(name string) *Topic {
    	t :=  &Topic{
    		Name: name,
    		Conn: make([]net.Conn, 0),
    	}
    	t.CalcHash()
    
    	return t
    }
    
    func (t *Topic) Less(b btree.Item) bool {
    	return t.Hash < b.(*Topic).Hash
    }
    
    func (t *Topic) CalcHash()  {
    	b := []byte(t.Name)
    	for _, v := range b {
    		t.Hash += int64(v)
    	}
    }
    

    测试代码

    package opmq
    
    import (
    	"flag"
    	"fmt"
    	"github.com/google/btree"
    	"strconv"
    	"testing"
    )
    
    // all extracts all items from a tree in order as a slice.
    func all(t *btree.BTree) (out []btree.Item) {
    	t.Ascend(func(a btree.Item) bool {
    		out = append(out, a)
    		return true
    	})
    	return
    }
    
    func hashHere(s string) int64 {
    	h := int64(0)
    	b := []byte(s)
    	for _, v := range b {
    		h += int64(v)
    	}
    
    	return h
    }
    
    func TestTopic_CalcHash(t *testing.T) {
    	var btreeDegree = flag.Int("degree", 32, "B-Tree degree")
    	tree := btree.New(*btreeDegree)
    
    	for i := 0; i < 10; i++ {
    		to := &Topic{
    			Name: strconv.Itoa(i),
    		}
    		to.CalcHash()
    
    		tree.ReplaceOrInsert(to)
    	}
    
    	for _, v := range all(tree) {
    		fmt.Println(v)
    	}
    
    	fmt.Println("查找")
    	to2 := &Topic{
    		Name: "1",
    	}
    	to2.CalcHash()
    	fmt.Println(tree.Get(to2))
    }
    
    # 输出代码
    === RUN   TestTopic_CalcHash
    &{0 [] 48}
    &{1 [] 49}
    &{2 [] 50}
    &{3 [] 51}
    &{4 [] 52}
    &{5 [] 53}
    &{6 [] 54}
    &{7 [] 55}
    &{8 [] 56}
    &{9 [] 57}
    查找
    &{1 [] 49}
    --- PASS: TestTopic_CalcHash (0.00s)
    PASS
    
    Process finished with the exit code 0
    

    报文

    直接用开源包的报文解析

    测试报文

    server.go

    package opmq
    
    import (
    	"fmt"
    	"gitee.com/maomaomaoge/opmq/packets"
    	"log"
    	"net"
    )
    
    func Serve() {
    	ln, err := net.Listen("tcp", ":7000")
    	if err != nil {
    		return
    	}
    	for {
    		conn, err := ln.Accept()
    		if err != nil {
    			log.Fatalln(err)
    		}
    
    		go read(conn)
    	}
    }
    
    func read(conn net.Conn)  {
    	for {
    		packet, err := packets.ReadPacket(conn)
    		if err != nil {
    			return
    		}
    
    		fmt.Println("收到的报文: ", packet.String())
    
    		ack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
    		ack.Write(conn)
    	}
    }
    
    

    开源包的最简单代码

    /*
     * Copyright (c) 2021 IBM Corp and others.
     *
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v2.0
     * and Eclipse Distribution License v1.0 which accompany this distribution.
     *
     * The Eclipse Public License is available at
     *    https://www.eclipse.org/legal/epl-2.0/
     * and the Eclipse Distribution License is available at
     *   http://www.eclipse.org/org/documents/edl-v10.php.
     *
     * Contributors:
     *    Seth Hoenig
     *    Allan Stockdill-Mander
     *    Mike Robertson
     */
    
    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	fmt.Printf("TOPIC: %s
    ", msg.Topic())
    	fmt.Printf("MSG: %s
    ", msg.Payload())
    }
    
    func main() {
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:7000")
    
    	c := mqtt.NewClient(opts)
    	token := c.Connect()
    	if token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    
    	if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	for i := 0; i < 5; i++ {
    		text := fmt.Sprintf("this is msg #%d!", i)
    		token := c.Publish("go-mqtt/sample", 0, false, text)
    		token.Wait()
    	}
    
    	time.Sleep(6 * time.Second)
    
    	if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	c.Disconnect(250)
    
    	time.Sleep(1 * time.Second)
    }
    

    serve收到的报文

    === RUN   TestServe
    收到的报文:  CONNECT: dup: false qos: 0 retain: false rLength: 12 protocolversion: 4 protocolname: MQTT cleansession: true willflag: false WillQos: 0 WillRetain: false Usernameflag: false Passwordflag: false keepalive: 30 clientId:  willtopic:  willmessage:  Username:  Password: 
    

    nice, 初步完成

  • 相关阅读:
    sql server 2008 安装过程与创建建sql server登录用户
    Angularjs之controller 和filter(四)
    Angularjs之表单实例(三)
    antlr应用
    antlr4笔记(转)
    go升级版本
    go安装依赖包
    tsar
    java纤程
    HighLevelRestApi管理ES
  • 原文地址:https://www.cnblogs.com/maomaomaoge/p/15213773.html
Copyright © 2011-2022 走看看