源码rabbit_2_通过架构来看数据结构
架构参考文章: https://www.cnblogs.com/arthinking/p/15422958.html
纠正上一篇博客理解上的错误: 用一条tcp连接不是所有客户端都用一个,而是每个客户端都自己有一个,订阅不同主题啥的,不用创建一个连接,而是用虚拟管道(这和mqtt主题没啥不同,太像了)
生产端
:一般地,同一个客户端(client
)里面的每个生产者(producer
)创建一个专门的通道(channel
),复用同一个TCP连接(connection
),每个生产者可以往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,并且消息需要带上routing_key;
架构图
https://cdn.itzhai.com/image-20211010222005614-a.png-itzhai
数据结构解析
Conn
// Connection represents AMQP-connection
type Connection struct {
id uint64
server *Server
netConn *net.TCPConn
logger *log.Entry
channelsLock sync.RWMutex
channels map[uint16]*Channel
outgoing chan *amqp.Frame
clientProperties *amqp.Table
maxChannels uint16
maxFrameSize uint32
statusLock sync.RWMutex
status int
qos *qos.AmqpQos
virtualHost *VirtualHost
vhostName string
closeCh chan bool
srvMetrics *SrvMetricsState
metrics *ConnMetricsState
userName string
wg *sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
heartbeatInterval uint16
heartbeatTimeout uint16
heartbeatTimer *time.Ticker
lastOutgoingTS chan time.Time
}
虚拟机
// VirtualHost represents AMQP virtual host
// Each virtual host is "parent" for its queues and exchanges
type VirtualHost struct {
name string
system bool
exLock sync.RWMutex
exchanges map[string]*exchange.Exchange
quLock sync.RWMutex
queues map[string]*queue.Queue
msgStorageP *msgstorage.MsgStorage
msgStorageT *msgstorage.MsgStorage
srv *Server
srvStorage *srvstorage.SrvStorage
srvConfig *config.Config
logger *log.Entry
autoDeleteQueue chan string
}
队列
// Queue is an implementation of the AMQP-queue entity
type Queue struct {
safequeue.SafeQueue
name string
connID uint64
exclusive bool
autoDelete bool
durable bool
cmrLock sync.RWMutex
consumers []interfaces.Consumer
consumeExcl bool
call chan struct{}
wasConsumed bool
shardSize int
actLock sync.RWMutex
active bool
// persistent storage
msgPStorage interfaces.MsgStorage
// transient storage
msgTStorage interfaces.MsgStorage
currentConsumer int
metrics *MetricsState
autoDeleteQueue chan string
queueLength int64
// lock for sync load swapped-messages from disk
loadSwapLock sync.Mutex
maxMessagesInRAM uint64
lastStoredMsgID uint64
lastMemMsgID uint64
swappedToDisk bool
maybeLoadFromStorageCh chan struct{}
wg *sync.WaitGroup
}
管道
// Channel is an implementation of the AMQP-channel entity
// Within a single socket connection, there can be multiple
// independent threads of control, called "channels"
type Channel struct {
active bool
confirmMode bool
id uint16
conn *Connection
server *Server
incoming chan *amqp.Frame
outgoing chan *amqp.Frame
logger *log.Entry
status int
protoVersion string
currentMessage *amqp.Message
cmrLock sync.RWMutex
consumers map[string]*consumer.Consumer
qos *qos.AmqpQos
consumerQos *qos.AmqpQos
deliveryTag uint64
confirmDeliveryTag uint64
confirmLock sync.Mutex
confirmQueue []*amqp.ConfirmMeta
ackLock sync.Mutex
ackStore map[uint64]*UnackedMessage
metrics *ChannelMetricsState
bufferPool *pool.BufferPool
closeCh chan bool
}