聊天程序的底层socket实现我们用开源的GCDAsyncUdpSocket,本文依据GCDAsyncUdpSocket源码来解析UDP socket通信。
socket通信的整体流程是:
创建并初始化一个socket进行相应的配置 -> 本地地址和端口的绑定 -> 连接socket到被给的地址和端口 -> 发送数据 -> 接收数据 -> 关闭socket
1.创建并初始化socket并进行相应的配置
初始化GCDAsyncUdpSocket时,我们需要设置一个接收数据的delegate和delegateQueue,还需要设置一个发送队列(socket queue),也可以不指定发送队列,这时GCDAsyncUdpSocket内不会创建一个默认的发送队列,发送队列应为串行队列,这样此socket发送数据的操作都会在此串行队列中操作。
GCDAsyncUdpSocket是支持IPV4和IPV6的,如果DNS只返回IPV4地址,则GCDAsyncUdpSocket自动使用IPV4,如果DNS只返回IPV6地址,则GCDAsyncUdpSocket自动使用IPV6,如果DNS返回IPV4和IPV6地址,则GCDAsyncUdpSocket使用你设置的优先使用的选项。
我们可以设置接收数据包的大小,每次读取接收数据时,只从接收缓存中读取之前设定大小的数据。
2.绑定目的地址和端口
应该在服务器socket发送数据前,对socket进行绑定本地的地址和端口号。一般情况下对客户端而言可以不用绑定地址和端口,在socket发送数据时,操作系统会自动分配一个可用的端口给socket,但这种情况只适用于客户端先给服务器发送消息,如果是服务器创建socket后先给客户端发送消息,客户端也需要给socket绑定端口号,否则客户端收到消息后也不知道分配给哪个应用程序。
绑定只能进行一次,绑定只能在socket建立连接之前,建立连接后不能在对socket进行绑定
3.socket建立连接
UDP是无连接的协议,建立连接并不是必须的。
建立连接到一个指定的host/port,有如下作用:
-你只能发送数据到连接到的host/port(即发送消息使用sendto函数时不能指定目标地址,可以使用send函数)
-你只能从连接到的host/port接收数据(接收数据时不必使用recvfrom函数来指定对端地址(IP和端口号),可以使用read, recv或recvmsg函数,除了连接的对端地址外的地址到达的数据包都不被传递到连接的socket上)
-你只能从连接到的host/port接收ICMP报文消息,像“连接拒绝”(未连接的UDP socket不会收到异步错误)
udp socket的connect函数并不会像TCP socket的connect函数一样与对端进行通信,进行三次握手。相反内核只检查任何能立即发现的错误(如,明显无法到达的目的地),从传递给connect函数的socket地址结构中记录对等体的ip地址和端口号,并立即返回到调用进程。
多次调用connect函数主要有两个目的:
-重现指定peername
-断开连接,即删除peername(也可能会删除socketname)
对于第一个目的来说,很简单,只要设置好正确的套接字地址,传参给connect即可。
对于第二个目的来说,需要将socket地址结构struct sockaddr中的sin_family成员设置成AF_UNSPEC,如下:
struct sockaddr_in disconnaddr; memset(&disconnaddr, 0, sizeof(disconnaddr)); disconnaddr.sin_family = AF_UNSPEC; // 断开连接 connect(sockfd, &disconnaddr, sizeof(disconnaddr));
从性能上来说,当应用程序在未连接的UDP socket上调用sendto函数时,Berkeley派生的内核会临时连接socket,发送数据报,然后取消socket连接。
// 连接两次调用 sendto sendto(sockfd, buf, 100, 0, &servaddr, sizeof(servaddr)); sendto(sockfd, buf, 200, 0, &servaddr, sizeof(servaddr));
在未连接的UDP socket上调用sendto函数以获取两个数据报,内核涉及以下六个步骤:
-连接socket
-输出第一个数据报
-断开socket连接
-连接socket
-输出第二个socket
-断开socket连接
另一个考虑的因素是路由表的搜索次数。
当应用程序直到它将向同一个地址发送多个数据报时,显式连接socket更有效。
connect(sockfd, &servaddr, sizeof(servaddr)); write(sockfd, buf, 100); write(sockfd, buf, 200);
调用connect然后调用write两次,涉及内核以下步骤:
-连接socket
-输出第一个数据报
-输出第二个socket
在这种情况下内核只复制一次含有目的IP和port的socket地址,而使用sendto函数时,需要复制两次,临时连接未连接的UDP socket大约会消耗每个UDP传输三分之一的开销。
4.发送数据
在发送数据时尤其需要注意的一点是:在发送完数据的回调方法调用之前都不应改变被发送的数据。
在业务上可以在发送时设置过滤器,(外部设置,发送时判断是否有过滤器,有就先执行过滤器,根据结果执行后续操作)。
5.接收数据
在接收数据时也可设置过滤器,(同样是外部设置,在接收到数据后,只有通过过滤器的数据才交给上层应用进程)。
在接收数据时,应合理设置接收缓存的大小,设置的过大会浪费存储空间;设置的过小不足以容纳接收回来的数据时,则会丢弃容不下的数据,而且此时recvfrom函数并不会返回一个错误的值。
在业务上还可以设置一次接收全部数据,还可以设置分多次接收数据,例如源码中的receiveOnce和beginReceiving函数可切换是否多次接收数据。
另外我们还可以暂停接收数据,这里需要注意的问题是,因为接收数据是异步进行的,所以调用pauseReceiving方法时,接收数据的代理方法可能已经触发,此时这些方法仍会继续调用。
6.关闭socket
主要是关闭发送和接收的stream及注销其在runloop中注册的监听,以及释放相关资源。
在这里可以选择立即关闭socket或将未发送数据发送完后再关闭socket。
基于UDP socket的通信还可以发多播(组播)和广播消息。
IGMP协议是IP组播的基础。
加入和离开多播组只需要调用以下代码:
//加入多播组 int status = setsockopt(socket4FD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(imreq)); //离开多播组 int status = setsockopt(socket4FD, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&imreq, sizeof(imreq));
上面介绍了UDP socket通信的大致流程下面来看GCDAsyncUdpSocket的源码。
它定义了一个发送数据包的结构GCDAsyncUdpSendPacket,在发送数据包时,用相应的数据去填充该数据结构,然后将其压缩发送出去。
/** * The GCDAsyncUdpSendPacket encompasses the instructions for a single send/write. **/ @interface GCDAsyncUdpSendPacket : NSObject { @public NSData *buffer; NSTimeInterval timeout; long tag; BOOL resolveInProgress; BOOL filterInProgress; NSArray *resolvedAddresses; NSError *resolveError; NSData *address; int addressFamily; } - (id)initWithData:(NSData *)d timeout:(NSTimeInterval)t tag:(long)i; @end @implementation GCDAsyncUdpSendPacket - (id)initWithData:(NSData *)d timeout:(NSTimeInterval)t tag:(long)i { if ((self = [super init])) { buffer = d; timeout = t; tag = i; resolveInProgress = NO; } return self; } @end
- (void)sendData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag { LogTrace(); if ([data length] == 0) { LogWarn(@"Ignoring attempt to send nil/empty data."); return; } GCDAsyncUdpSendPacket *packet = [[GCDAsyncUdpSendPacket alloc] initWithData:data timeout:timeout tag:tag]; dispatch_async(socketQueue, ^{ @autoreleasepool { [sendQueue addObject:packet]; [self maybeDequeueSend]; }}); }
同时还定义了一个用于连接的数据包的结构GCDAsyncUdpSpecialPacket
@interface GCDAsyncUdpSpecialPacket : NSObject { @public // uint8_t type; BOOL resolveInProgress; NSArray *addresses; NSError *error; } - (id)init; @end @implementation GCDAsyncUdpSpecialPacket - (id)init { self = [super init]; return self; } @end
GCDAsyncUdpSocket底层是基于stream来实现的,在使用socket时我们需要创建4个stream(readStream4, writeStream4, readStream6 , writeStream6),分别用于IPV4和IPV6收发数据:
首先需要获取readSrteam和writeStream:
- (CFReadStreamRef)readStream { if (! dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) { LogWarn(@"%@: %@ - Method only available from within the context of a performBlock: invocation", THIS_FILE, THIS_METHOD); return NULL; } NSError *err = nil; if (![self createReadAndWriteStreams:&err]) { LogError(@"Error creating CFStream(s): %@", err); return NULL; } // Todo... if (readStream4) return readStream4; else return readStream6; } - (CFWriteStreamRef)writeStream { if (! dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) { LogWarn(@"%@: %@ - Method only available from within the context of a performBlock: invocation", THIS_FILE, THIS_METHOD); return NULL; } NSError *err = nil; if (![self createReadAndWriteStreams:&err]) { LogError(@"Error creating CFStream(s): %@", err); return NULL; } if (writeStream4) return writeStream4; else return writeStream6; }
这里默认使用IPV4。
dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)
上面这个方法是判断当前是否是在socketQueue,这里涉及到以下两个方法:
dispatch_queue_set_specific() dispatch_get_specific() // The dispatch_queue_set_specific() and dispatch_get_specific() functions take a "void *key" parameter. // From the documentation: // // > Keys are only compared as pointers and are never dereferenced. // > Thus, you can use a pointer to a static variable for a specific subsystem or // > any other value that allows you to identify the value uniquely. // // We're just going to use the memory address of an ivar. // Specifically an ivar that is explicitly named for our purpose to make the code more readable. // // However, it feels tedious (and less readable) to include the "&" all the time: // dispatch_get_specific(&IsOnSocketQueueOrTargetQueueKey) // // So we're going to make it so it doesn't matter if we use the '&' or not, // by assigning the value of the ivar to the address of the ivar. // Thus: IsOnSocketQueueOrTargetQueueKey == &IsOnSocketQueueOrTargetQueueKey;
以下是创建read/write stream函数:
- (BOOL)createReadAndWriteStreams:(NSError **)errPtr { LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSError *err = nil; if (readStream4 || writeStream4 || readStream6 || writeStream6) { // Streams already created return YES; } if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL) { err = [self otherError:@"Cannot create streams without a file descriptor"]; goto Failed; } // Create streams LogVerbose(@"Creating read and write stream(s)..."); if (socket4FD != SOCKET_NULL) { CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socket4FD, &readStream4, &writeStream4); if (!readStream4 || !writeStream4) { err = [self otherError:@"Error in CFStreamCreatePairWithSocket() [IPv4]"]; //使用goto语句跳转到Failed. goto Failed; } } if (socket6FD != SOCKET_NULL) { CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socket6FD, &readStream6, &writeStream6); if (!readStream6 || !writeStream6) { err = [self otherError:@"Error in CFStreamCreatePairWithSocket() [IPv6]"]; goto Failed; } } // Ensure the CFStream's don't close our underlying socket CFReadStreamSetProperty(readStream4, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFWriteStreamSetProperty(writeStream4, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFReadStreamSetProperty(readStream6, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFWriteStreamSetProperty(writeStream6, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); return YES; Failed: if (readStream4) { CFReadStreamClose(readStream4); CFRelease(readStream4); readStream4 = NULL; } if (writeStream4) { CFWriteStreamClose(writeStream4); CFRelease(writeStream4); writeStream4 = NULL; } if (readStream6) { CFReadStreamClose(readStream6); CFRelease(readStream6); readStream6 = NULL; } if (writeStream6) { CFWriteStreamClose(writeStream6); CFRelease(writeStream6); writeStream6 = NULL; } if (errPtr) *errPtr = err; return NO; }
如果相应的stream已经创建了,直接返回YES,之后判断socket状态不都为SOCKET_NULL(为SOCKET_NULL则用goto语句跳转到Failed),之后调用CFStreamCreatePairWithSocket函数创建read/write streams,并于socket绑定。