要了解 connection 和 session 的概念,可以先从 ConnectionState 和 SessionState 入手:
// 省略部分代码 public class ConnectionState { ConnectionInfo info; private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>(); private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>(); private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>()); private final AtomicBoolean shutdown = new AtomicBoolean(false); private boolean connectionInterruptProcessingComplete = true; private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers; public ConnectionState(ConnectionInfo info) { this.info = info; // Add the default session id. addSession(new SessionInfo(info, -1)); } }
从代码可以看出,连接里有事务集合、会话集合、临时队列集合等,这说明:
1. 事务属于一个连接; 2. 会话属于一个连接; 3. 临时队列的生存期是连接的有效期
// 省略部分代码 public class SessionState { final SessionInfo info; private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>(); private final Map<ConsumerId, ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, ConsumerState>(); private final AtomicBoolean shutdown = new AtomicBoolean(false); public SessionState(SessionInfo info) { this.info = info; } }
从上面能看出,producer 和 consumer 是属于某个会话的,producer 和 consumer 都有唯一的 ID 。
// 省略部分代码 public class ProducerState { final ProducerInfo info; private TransactionState transactionState; } public class ConsumerState { final ConsumerInfo info; }
ProducerState 和 ConsumerState 只是做了简单的封装。
其中 ConnectionInfo, SessionInfo, ProducerInfo, ConsumerInfo 都是消息类型,均继承自 BaseCommand 接口。