ActiveRouter::update():
1 /**
2 * Checks out all sending connections to finalize the ready ones
3 * and abort those whose connection went down. Also drops messages
4 * whose TTL <= 0 (checking every one simulated minute).
5 * @see #addToSendingConnections(Connection)
6 */
7 @Override
8 public void update() {
9
10 super.update();
11
12 /* in theory we can have multiple sending connections even though
13 currently all routers allow only one concurrent sending connection */
14 for (int i=0; i<this.sendingConnections.size(); ) {
15 boolean removeCurrent = false;
16 Connection con = sendingConnections.get(i);
17
18 /* finalize ready transfers */
19 if (con.isMessageTransferred()) {
20 if (con.getMessage() != null) {
21 transferDone(con);
22 con.finalizeTransfer();
23 } /* else: some other entity aborted transfer */
24 removeCurrent = true;
25 }
26 /* remove connections that have gone down */
27 else if (!con.isUp()) {
28 if (con.getMessage() != null) {
29 transferAborted(con);
30 con.abortTransfer();
31 }
32 removeCurrent = true;
33 }
34
35 if (removeCurrent) {
36 // if the message being sent was holding excess buffer, free it
37 if (this.getFreeBufferSize() < 0) {
38 this.makeRoomForMessage(0);
39 }
40 sendingConnections.remove(i);
41 }
42 else {
43 /* index increase needed only if nothing was removed */
44 i++;
45 }
46 }
47
48 /* time to do a TTL check and drop old messages? Only if not sending */
49 if (SimClock.getTime() - lastTtlCheck >= TTL_CHECK_INTERVAL &&
50 sendingConnections.size() == 0) {
51 dropExpiredMessages();
52 lastTtlCheck = SimClock.getTime();
53 }
2 * Checks out all sending connections to finalize the ready ones
3 * and abort those whose connection went down. Also drops messages
4 * whose TTL <= 0 (checking every one simulated minute).
5 * @see #addToSendingConnections(Connection)
6 */
7 @Override
8 public void update() {
9
10 super.update();
11
12 /* in theory we can have multiple sending connections even though
13 currently all routers allow only one concurrent sending connection */
14 for (int i=0; i<this.sendingConnections.size(); ) {
15 boolean removeCurrent = false;
16 Connection con = sendingConnections.get(i);
17
18 /* finalize ready transfers */
19 if (con.isMessageTransferred()) {
20 if (con.getMessage() != null) {
21 transferDone(con);
22 con.finalizeTransfer();
23 } /* else: some other entity aborted transfer */
24 removeCurrent = true;
25 }
26 /* remove connections that have gone down */
27 else if (!con.isUp()) {
28 if (con.getMessage() != null) {
29 transferAborted(con);
30 con.abortTransfer();
31 }
32 removeCurrent = true;
33 }
34
35 if (removeCurrent) {
36 // if the message being sent was holding excess buffer, free it
37 if (this.getFreeBufferSize() < 0) {
38 this.makeRoomForMessage(0);
39 }
40 sendingConnections.remove(i);
41 }
42 else {
43 /* index increase needed only if nothing was removed */
44 i++;
45 }
46 }
47
48 /* time to do a TTL check and drop old messages? Only if not sending */
49 if (SimClock.getTime() - lastTtlCheck >= TTL_CHECK_INTERVAL &&
50 sendingConnections.size() == 0) {
51 dropExpiredMessages();
52 lastTtlCheck = SimClock.getTime();
53 }
54 }
注:
transferDone()是消息传输完成时的回调函数。默认是实现为空的,如果需要添加一些消息传输后的处理,可以考虑在子类中重写这个方法。
getFreeBuffers的值可能为负,这点不是很理解。接收消息的时候应该会考虑到这一点吧。
以下是ActiveRouter::makeRoomForMessage(int size)的函数体:
1 /**
2 * Removes messages from the buffer (oldest first) until
3 * there's enough space for the new message.
4 * @param size Size of the new message
5 * transferred, the transfer is aborted before message is removed
6 * @return True if enough space could be freed, false if not
7 */
8 protected boolean makeRoomForMessage(int size){
9 if (size > this.getBufferSize()) {
10 return false; // message too big for the buffer
11 }
12
13 int freeBuffer = this.getFreeBufferSize();
14 /* delete messages from the buffer until there's enough space */
15 while (freeBuffer < size) {
16 Message m = getOldestMessage(true); // don't remove msgs being sent
17
18 if (m == null) {
19 return false; // couldn't remove any more messages
20 }
21
22 /* delete message from the buffer as "drop" */
23 deleteMessage(m.getId(), true);
24 freeBuffer += m.getSize();
25 }
26
27 return true;
2 * Removes messages from the buffer (oldest first) until
3 * there's enough space for the new message.
4 * @param size Size of the new message
5 * transferred, the transfer is aborted before message is removed
6 * @return True if enough space could be freed, false if not
7 */
8 protected boolean makeRoomForMessage(int size){
9 if (size > this.getBufferSize()) {
10 return false; // message too big for the buffer
11 }
12
13 int freeBuffer = this.getFreeBufferSize();
14 /* delete messages from the buffer until there's enough space */
15 while (freeBuffer < size) {
16 Message m = getOldestMessage(true); // don't remove msgs being sent
17
18 if (m == null) {
19 return false; // couldn't remove any more messages
20 }
21
22 /* delete message from the buffer as "drop" */
23 deleteMessage(m.getId(), true);
24 freeBuffer += m.getSize();
25 }
26
27 return true;
28 }
MakeRoomForMessages函数默认的做法删除最久的消息直到腾出足够的空间。
Connection::finalizeTransfer:
1 /**
2 * Finalizes the transfer of the currently transferred message.
3 * The message that was being transferred can <STRONG>not</STRONG> be
4 * retrieved from this connections after calling this method (using
5 * {@link #getMessage()}).
6 */
7 public void finalizeTransfer() {
8 assert this.msgOnFly != null : "Nothing to finalize in " + this;
9 assert msgFromNode != null : "msgFromNode is not set";
10
11 this.bytesTransferred += msgOnFly.getSize();
12
13 getOtherNode(msgFromNode).messageTransferred(this.msgOnFly.getId(),
14 msgFromNode);
15 clearMsgOnFly();
2 * Finalizes the transfer of the currently transferred message.
3 * The message that was being transferred can <STRONG>not</STRONG> be
4 * retrieved from this connections after calling this method (using
5 * {@link #getMessage()}).
6 */
7 public void finalizeTransfer() {
8 assert this.msgOnFly != null : "Nothing to finalize in " + this;
9 assert msgFromNode != null : "msgFromNode is not set";
10
11 this.bytesTransferred += msgOnFly.getSize();
12
13 getOtherNode(msgFromNode).messageTransferred(this.msgOnFly.getId(),
14 msgFromNode);
15 clearMsgOnFly();
16 }
MessageRouter::messageTransfered:
1 /**
2 * This method should be called (on the receiving host) after a message
3 * was successfully transferred. The transferred message is put to the
4 * message buffer unless this host is the final recipient of the message.
5 * @param id Id of the transferred message
6 * @param from Host the message was from (previous hop)
7 * @return The message that this host received
8 */
9 public Message messageTransferred(String id, DTNHost from) {
10 Message incoming = removeFromIncomingBuffer(id, from);
11 boolean isFinalRecipient;
12 boolean isFirstDelivery; // is this first delivered instance of the msg
13
14
15 if (incoming == null) {
16 throw new SimError("No message with ID " + id + " in the incoming "+
17 "buffer of " + this.host);
18 }
19
20 incoming.setReceiveTime(SimClock.getTime());
21
22 // Pass the message to the application (if any) and get outgoing message
23 Message outgoing = incoming;
24 for (Application app : getApplications(incoming.getAppID())) {
25 // Note that the order of applications is significant
26 // since the next one gets the output of the previous.
27 outgoing = app.handle(outgoing, this.host);
28 if (outgoing == null) break; // Some app wanted to drop the message
29 }
30
31 Message aMessage = (outgoing==null)?(incoming):(outgoing);
32 // If the application re-targets the message (changes 'to')
33 // then the message is not considered as 'delivered' to this host.
34 isFinalRecipient = aMessage.getTo() == this.host;
35 isFirstDelivery = isFinalRecipient &&
36 !isDeliveredMessage(aMessage);
37
38 if (!isFinalRecipient && outgoing!=null) {
39 // not the final recipient and app doesn't want to drop the message
40 // -> put to buffer
41 addToMessages(aMessage, false);
42 }
43 else if (isFirstDelivery) {
44 this.deliveredMessages.put(id, aMessage);
45 }
46
47 for (MessageListener ml : this.mListeners) {
48 ml.messageTransferred(aMessage, from, this.host,
49 isFirstDelivery);
50 }
51
52 return aMessage;
2 * This method should be called (on the receiving host) after a message
3 * was successfully transferred. The transferred message is put to the
4 * message buffer unless this host is the final recipient of the message.
5 * @param id Id of the transferred message
6 * @param from Host the message was from (previous hop)
7 * @return The message that this host received
8 */
9 public Message messageTransferred(String id, DTNHost from) {
10 Message incoming = removeFromIncomingBuffer(id, from);
11 boolean isFinalRecipient;
12 boolean isFirstDelivery; // is this first delivered instance of the msg
13
14
15 if (incoming == null) {
16 throw new SimError("No message with ID " + id + " in the incoming "+
17 "buffer of " + this.host);
18 }
19
20 incoming.setReceiveTime(SimClock.getTime());
21
22 // Pass the message to the application (if any) and get outgoing message
23 Message outgoing = incoming;
24 for (Application app : getApplications(incoming.getAppID())) {
25 // Note that the order of applications is significant
26 // since the next one gets the output of the previous.
27 outgoing = app.handle(outgoing, this.host);
28 if (outgoing == null) break; // Some app wanted to drop the message
29 }
30
31 Message aMessage = (outgoing==null)?(incoming):(outgoing);
32 // If the application re-targets the message (changes 'to')
33 // then the message is not considered as 'delivered' to this host.
34 isFinalRecipient = aMessage.getTo() == this.host;
35 isFirstDelivery = isFinalRecipient &&
36 !isDeliveredMessage(aMessage);
37
38 if (!isFinalRecipient && outgoing!=null) {
39 // not the final recipient and app doesn't want to drop the message
40 // -> put to buffer
41 addToMessages(aMessage, false);
42 }
43 else if (isFirstDelivery) {
44 this.deliveredMessages.put(id, aMessage);
45 }
46
47 for (MessageListener ml : this.mListeners) {
48 ml.messageTransferred(aMessage, from, this.host,
49 isFirstDelivery);
50 }
51
52 return aMessage;
53 }
评论:对这里消息要经过应用程序处理这个流程不是很明白。应用程序可以产生消息,但如果不是最终接收者,为什么要接收消息并产生新的消息呢?