zoukankan      html  css  js  c++  java
  • 源码rabbit_2_通过架构来看数据结构

    源码rabbit_2_通过架构来看数据结构

    架构参考文章: https://www.cnblogs.com/arthinking/p/15422958.html

    纠正上一篇博客理解上的错误: 用一条tcp连接不是所有客户端都用一个,而是每个客户端都自己有一个,订阅不同主题啥的,不用创建一个连接,而是用虚拟管道(这和mqtt主题没啥不同,太像了)

    生产端:一般地,同一个客户端(client)里面的每个生产者(producer)创建一个专门的通道(channel),复用同一个TCP连接(connection),每个生产者可以往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,并且消息需要带上routing_key;

    架构图

    https://cdn.itzhai.com/image-20211010222005614-a.png-itzhai

    数据结构解析

    Conn

    // Connection represents AMQP-connection
    type Connection struct {
    	id               uint64
    	server           *Server
    	netConn          *net.TCPConn
    	logger           *log.Entry
    	channelsLock     sync.RWMutex
    	channels         map[uint16]*Channel
    	outgoing         chan *amqp.Frame
    	clientProperties *amqp.Table
    	maxChannels      uint16
    	maxFrameSize     uint32
    	statusLock       sync.RWMutex
    	status           int
    	qos              *qos.AmqpQos
    	virtualHost      *VirtualHost
    	vhostName        string
    	closeCh          chan bool
    	srvMetrics       *SrvMetricsState
    	metrics          *ConnMetricsState
    	userName         string
    
    	wg        *sync.WaitGroup
    	ctx       context.Context
    	cancelCtx context.CancelFunc
    
    	heartbeatInterval uint16
    	heartbeatTimeout  uint16
    	heartbeatTimer    *time.Ticker
    
    	lastOutgoingTS chan time.Time
    }
    

    虚拟机

    // VirtualHost represents AMQP virtual host
    // Each virtual host is "parent" for its queues and exchanges
    type VirtualHost struct {
    	name            string
    	system          bool
    	exLock          sync.RWMutex
    	exchanges       map[string]*exchange.Exchange
    	quLock          sync.RWMutex
    	queues          map[string]*queue.Queue
    	msgStorageP     *msgstorage.MsgStorage
    	msgStorageT     *msgstorage.MsgStorage
    	srv             *Server
    	srvStorage      *srvstorage.SrvStorage
    	srvConfig       *config.Config
    	logger          *log.Entry
    	autoDeleteQueue chan string
    }
    

    队列

    // Queue is an implementation of the AMQP-queue entity
    type Queue struct {
    	safequeue.SafeQueue
    	name        string
    	connID      uint64
    	exclusive   bool
    	autoDelete  bool
    	durable     bool
    	cmrLock     sync.RWMutex
    	consumers   []interfaces.Consumer
    	consumeExcl bool
    	call        chan struct{}
    	wasConsumed bool
    	shardSize   int
    	actLock     sync.RWMutex
    	active      bool
    	// persistent storage
    	msgPStorage interfaces.MsgStorage
    	// transient storage
    	msgTStorage     interfaces.MsgStorage
    	currentConsumer int
    	metrics         *MetricsState
    	autoDeleteQueue chan string
    	queueLength     int64
    
    	// lock for sync load swapped-messages from disk
    	loadSwapLock           sync.Mutex
    	maxMessagesInRAM       uint64
    	lastStoredMsgID        uint64
    	lastMemMsgID           uint64
    	swappedToDisk          bool
    	maybeLoadFromStorageCh chan struct{}
    	wg                     *sync.WaitGroup
    }
    

    管道

    // Channel is an implementation of the AMQP-channel entity
    // Within a single socket connection, there can be multiple
    // independent threads of control, called "channels"
    type Channel struct {
    	active             bool
    	confirmMode        bool
    	id                 uint16
    	conn               *Connection
    	server             *Server
    	incoming           chan *amqp.Frame
    	outgoing           chan *amqp.Frame
    	logger             *log.Entry
    	status             int
    	protoVersion       string
    	currentMessage     *amqp.Message
    	cmrLock            sync.RWMutex
    	consumers          map[string]*consumer.Consumer
    	qos                *qos.AmqpQos
    	consumerQos        *qos.AmqpQos
    	deliveryTag        uint64
    	confirmDeliveryTag uint64
    	confirmLock        sync.Mutex
    	confirmQueue       []*amqp.ConfirmMeta
    	ackLock            sync.Mutex
    	ackStore           map[uint64]*UnackedMessage
    	metrics            *ChannelMetricsState
    
    	bufferPool *pool.BufferPool
    
    	closeCh chan bool
    }
    
  • 相关阅读:
    js冒泡排序
    总结常见疑问
    第八课 课程重点(js语句:条件、循环)
    第九课 课程重点(js语句:获取时间、数字取值、定时器)
    利用NABCD模型进行竞争性需求分析
    运行结果及总结~张萍萍
    测试与调试 滕娟
    面向对象分析设计
    设计类图 201303014029 于海飞
    SRS文档 王倩倩 201303014004
  • 原文地址:https://www.cnblogs.com/maomaomaoge/p/15606319.html
Copyright © 2011-2022 走看看