zoukankan      html  css  js  c++  java
  • 开源包mqtt源码_token

    开源包mqtt源码_token

    github.com/eclipse/paho.mqtt.golang

    测试代码

    /*
     * Copyright (c) 2021 IBM Corp and others.
     *
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v2.0
     * and Eclipse Distribution License v1.0 which accompany this distribution.
     *
     * The Eclipse Public License is available at
     *    https://www.eclipse.org/legal/epl-2.0/
     * and the Eclipse Distribution License is available at
     *   http://www.eclipse.org/org/documents/edl-v10.php.
     *
     * Contributors:
     *    Allan Stockdill-Mander
     */
    
    package mqtt
    
    import (
    	"errors"
    	"testing"
    	"time"
    )
    
    func TestWaitTimeout(t *testing.T) {
    	b := baseToken{}
    
    	if b.WaitTimeout(time.Second) {
    		t.Fatal("Should have failed")
    	}
    
    	// Now lets confirm that WaitTimeout returns
    	// setError() grabs the mutex which previously caused issues
    	// when there is a result (it returns true in this case)
    	b = baseToken{complete: make(chan struct{})}
    	go func(bt *baseToken) {
    		bt.setError(errors.New("test error"))
    	}(&b)
    	if !b.WaitTimeout(5 * time.Second) {
    		t.Fatal("Should have succeeded")
    	}
    }
    
    

    token代码

    /*
     * Copyright (c) 2021 IBM Corp and others.
     *
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v2.0
     * and Eclipse Distribution License v1.0 which accompany this distribution.
     *
     * The Eclipse Public License is available at
     *    https://www.eclipse.org/legal/epl-2.0/
     * and the Eclipse Distribution License is available at
     *   http://www.eclipse.org/org/documents/edl-v10.php.
     *
     * Contributors:
     *    Allan Stockdill-Mander
     */
    
    package mqtt
    
    import (
    	"sync"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang/packets"
    )
    
    // PacketAndToken is a struct that contains both a ControlPacket and a
    // Token. This struct is passed via channels between the client interface
    // code and the underlying code responsible for sending and receiving
    // MQTT messages.
    type PacketAndToken struct {
    	p packets.ControlPacket
    	t tokenCompletor
    }
    
    // Token defines the interface for the tokens used to indicate when
    // actions have completed.
    type Token interface {
    	// Wait will wait indefinitely for the Token to complete, ie the Publish
    	// to be sent and confirmed receipt from the broker.
    	Wait() bool
    
    	// WaitTimeout takes a time.Duration to wait for the flow associated with the
    	// Token to complete, returns true if it returned before the timeout or
    	// returns false if the timeout occurred. In the case of a timeout the Token
    	// does not have an error set in case the caller wishes to wait again.
    	WaitTimeout(time.Duration) bool
    
    	// Done returns a channel that is closed when the flow associated
    	// with the Token completes. Clients should call Error after the
    	// channel is closed to check if the flow completed successfully.
    	//
    	// Done is provided for use in select statements. Simple use cases may
    	// use Wait or WaitTimeout.
    	Done() <-chan struct{}
    
    	Error() error
    }
    
    type TokenErrorSetter interface {
    	setError(error)
    }
    
    type tokenCompletor interface {
    	Token
    	TokenErrorSetter
    	flowComplete()
    }
    
    type baseToken struct {
    	m        sync.RWMutex
    	complete chan struct{}
    	err      error
    }
    
    // Wait implements the Token Wait method.
    func (b *baseToken) Wait() bool {
    	<-b.complete
    	return true
    }
    
    // WaitTimeout implements the Token WaitTimeout method.
    func (b *baseToken) WaitTimeout(d time.Duration) bool {
    	timer := time.NewTimer(d)
    	select {
    	case <-b.complete:
    		if !timer.Stop() {
    			<-timer.C
    		}
    		return true
    	case <-timer.C:
    	}
    
    	return false
    }
    
    // Done implements the Token Done method.
    func (b *baseToken) Done() <-chan struct{} {
    	return b.complete
    }
    
    func (b *baseToken) flowComplete() {
    	select {
    	case <-b.complete:
    	default:
    		close(b.complete)
    	}
    }
    
    func (b *baseToken) Error() error {
    	b.m.RLock()
    	defer b.m.RUnlock()
    	return b.err
    }
    
    func (b *baseToken) setError(e error) {
    	b.m.Lock()
    	b.err = e
    	b.flowComplete()
    	b.m.Unlock()
    }
    
    func newToken(tType byte) tokenCompletor {
    	switch tType {
    	case packets.Connect:
    		return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
    	case packets.Subscribe:
    		return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
    	case packets.Publish:
    		return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
    	case packets.Unsubscribe:
    		return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
    	case packets.Disconnect:
    		return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
    	}
    	return nil
    }
    
    // ConnectToken is an extension of Token containing the extra fields
    // required to provide information about calls to Connect()
    type ConnectToken struct {
    	baseToken
    	returnCode     byte
    	sessionPresent bool
    }
    
    // ReturnCode returns the acknowledgement code in the connack sent
    // in response to a Connect()
    func (c *ConnectToken) ReturnCode() byte {
    	c.m.RLock()
    	defer c.m.RUnlock()
    	return c.returnCode
    }
    
    // SessionPresent returns a bool representing the value of the
    // session present field in the connack sent in response to a Connect()
    func (c *ConnectToken) SessionPresent() bool {
    	c.m.RLock()
    	defer c.m.RUnlock()
    	return c.sessionPresent
    }
    
    // PublishToken is an extension of Token containing the extra fields
    // required to provide information about calls to Publish()
    type PublishToken struct {
    	baseToken
    	messageID uint16
    }
    
    // MessageID returns the MQTT message ID that was assigned to the
    // Publish packet when it was sent to the broker
    func (p *PublishToken) MessageID() uint16 {
    	return p.messageID
    }
    
    // SubscribeToken is an extension of Token containing the extra fields
    // required to provide information about calls to Subscribe()
    type SubscribeToken struct {
    	baseToken
    	subs      []string
    	subResult map[string]byte
    	messageID uint16
    }
    
    // Result returns a map of topics that were subscribed to along with
    // the matching return code from the broker. This is either the Qos
    // value of the subscription or an error code.
    func (s *SubscribeToken) Result() map[string]byte {
    	s.m.RLock()
    	defer s.m.RUnlock()
    	return s.subResult
    }
    
    // UnsubscribeToken is an extension of Token containing the extra fields
    // required to provide information about calls to Unsubscribe()
    type UnsubscribeToken struct {
    	baseToken
    	messageID uint16
    }
    
    // DisconnectToken is an extension of Token containing the extra fields
    // required to provide information about calls to Disconnect()
    type DisconnectToken struct {
    	baseToken
    }
    
    

    分析

    去除业务,这个token包主要是为了表示动作完成的操作

    附加

    可以综合mqtt文档看一下mqtt包控制报文类型

    // mqtt 控制包类型
    // Below are the constants assigned to each of the MQTT packet type
    // 下面来自mqtt 中文文档
    //Table 2.1 - Control packet types
    //
    //|Name           |Value          |Direction of flow                   |Description
    //|Reserved       |0              |Forbidden                   |Reserved
    //|CONNECT        |1              |Client to Server                       |
    //|CONNACK        |2              |Server to Client                       |Connect acknowledgment
    //|PUBLISH        |3              |Client to Server or Server to Client   |Publish message
    //|PUBACK         |4              |Client to Server or Server to Client   |Publish acknowledgment
    //|PUBREC         |5              |Client to Server or Server to Client   |Publish received (assured delivery part 1)
    //|PUBREL         |6              |Client to Server or Server to Client   |Publish release (assured delivery part 2)
    //|PUBCOMP        |7              |Client to Server or Server to Client   |Publish complete (assured delivery part 3)
    //|SUBSCRIBE      |8              |Client to Server                       |Client subscribe request
    //|SUBACK         |9              |Server to Client                       |Subscribe acknowledgment
    //|UNSUBSCRIBE    |10             |Client to Server                       |Unsubscribe request
    //|UNSUBACK       |11             |Server to Client                       |Unsubscribe acknowledgment
    //|PINGREQ        |12             |Client to Server                       |PING request
    //|PINGRESP       |13             |Server to Client                       |PING response
    //|DISCONNECT     |14             |Client to Server                       |Client is disconnecting
    //|Reserved       |15             |Forbidden                              |Reserved
    const (
    	Connect     = 1
    	Connack     = 2
    	Publish     = 3
    	Puback      = 4
    	Pubrec      = 5
    	Pubrel      = 6
    	Pubcomp     = 7
    	Subscribe   = 8
    	Suback      = 9
    	Unsubscribe = 10
    	Unsuback    = 11
    	Pingreq     = 12
    	Pingresp    = 13
    	Disconnect  = 14
    )
    
  • 相关阅读:
    【Linux】freetds安装配置连接MSSQL
    【MySQL】Sysbench性能测试
    【MySQL】mysql buffer pool结构分析
    【MySQL】MySQL锁和隔离级别浅析一
    Spring Boot 1.4测试的改进
    Spring Boot 定时任务的使用
    linux:nohup 不生成 nohup.out的方法
    Spring Boot应用的后台运行配置
    深入理解Session与Cookie(一)
    学习Maven之Cobertura Maven Plugin
  • 原文地址:https://www.cnblogs.com/maomaomaoge/p/15136000.html
Copyright © 2011-2022 走看看