UDP是无状态的,无法用TCP一样的并发服务器。我们可以用消息队列的方式模拟下。
首先,我们看消息队列节点
typedef struct msg_buf { int sockfd; struct sockaddr_in their_addr; /* 连接对方的地址信息 */ int sin_size; char buf[BUFF_SIZE]; size_t len; struct msg_buf *next; }msgbuf_t;
关于分配与释放的接口,比较习惯这样的方式了
msgbuf_t *get_msgbuf() { return (msgbuf_t *)malloc(sizeof(struct msg_buf)); } void put_msgbuf(msgbuf_t *msg) { free(msg); }
服务线程相关,只有一个线程
struct msg_buf *msg_mbox; sem_t recv_sem; pthread_mutex_t recv_mutex; pthread_t recv_pid;
上述参数的初始化
void server_init() { sem_init(&recv_sem,0,0); pthread_mutex_init(&recv_mutex,NULL); pthread_mutex_init(&send_mutex,NULL); pthread_create(&recv_pid,NULL,recv_thread,NULL); }
还有个void server_destroy()处理方法。
通过atexit在main中注册。
消息队列发送
void msg_post(struct msg_buf *msg) { pthread_mutex_lock(&recv_mutex); if(msg_mbox) { //链表插入操作 msg->next = msg_mbox->next; msg_mbox->next = msg; }else{ msg_mbox = msg; msg->next = NULL; } pthread_mutex_unlock(&recv_mutex); sem_post(&recv_sem); }
最后调用
sem_post通知阻塞线程处理。
处理接收数据包线程——线程只是是实现了echo功能。
View Code
void *recv_thread(void *arg) { static int cnt = 0; while(1) { sem_wait(&recv_sem); pthread_mutex_lock(&recv_mutex); struct msg_buf *msg = msg_mbox; msg_mbox = msg->next; pthread_mutex_unlock(&recv_mutex); //处理msg消息 int retval = sendto(msg->sockfd, msg->buf,msg->len, 0, (struct sockaddr *)&(msg->their_addr), msg->sin_size); printf("Received %d from %s\n%s\n",++cnt,inet_ntoa((msg->their_addr).sin_addr),msg->buf); put_msgbuf(msg); } }
main中循环
View Code
int main(int argc,char *argv[]) { int sockfd; /* 数据端口 */ struct sockaddr_in my_addr; /* 自身的地址信息 */ int retval; if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { perror("socket"); exit(1); } /* 设置地址可复用 */ int option = 1; setsockopt( sockfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option) ); my_addr.sin_family = AF_INET; my_addr.sin_port = htons(MYPORT); /* 网络字节顺序 */ my_addr.sin_addr.s_addr = INADDR_ANY; /* 自动填本机IP */ bzero(&(my_addr.sin_zero), 8); /* 其余部分置0 */ if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) { perror("bind"); exit(1); } /** * 其它初始化 * */ server_init(); /* 主循环 */ while(1) { struct msg_buf *recvmsg = get_msgbuf(); size_t len = sizeof(recvmsg->buf); char *buf = recvmsg->buf; struct sockaddr_in their_addr; int sin_size; memset(buf,len,0); retval = recvfrom(sockfd, buf, len, 0, (struct sockaddr *)&their_addr, &sin_size); printf("%s\t%s\n",inet_ntoa(their_addr.sin_addr),buf); if (retval == 0) { perror ("recvfrom"); close(sockfd); break; }// //封装消息 recvmsg->their_addr = their_addr; recvmsg->sin_size = sin_size; recvmsg->sockfd = sockfd; recvmsg->len = retval; //发送给某消息处理 msg_post(recvmsg); } return 0; }
Over!