zoukankan      html  css  js  c++  java
  • 基于消息队列的UDP并发服务器v1

    UDP是无状态的,无法用TCP一样的并发服务器。我们可以用消息队列的方式模拟下。

    首先,我们看消息队列节点

    typedef struct msg_buf
    {
        int sockfd;
        struct sockaddr_in their_addr;  /* 连接对方的地址信息 */
        int sin_size;
        char buf[BUFF_SIZE];    
        size_t len;
        struct msg_buf *next;
    }msgbuf_t;

    关于分配与释放的接口,比较习惯这样的方式了

    msgbuf_t *get_msgbuf()
    {
        return (msgbuf_t *)malloc(sizeof(struct msg_buf));
    }
    void put_msgbuf(msgbuf_t *msg)
    {
        free(msg);
    }

    服务线程相关,只有一个线程

    struct msg_buf *msg_mbox;
    sem_t recv_sem;
    pthread_mutex_t recv_mutex;
    pthread_t recv_pid;

    上述参数的初始化

    void server_init()
    {
        sem_init(&recv_sem,0,0);
        pthread_mutex_init(&recv_mutex,NULL);
        pthread_mutex_init(&send_mutex,NULL);
        pthread_create(&recv_pid,NULL,recv_thread,NULL);
    
    }

    还有个void server_destroy()处理方法。

    通过atexit在main中注册。


    消息队列发送

    void msg_post(struct msg_buf *msg)
    {
        pthread_mutex_lock(&recv_mutex);
        if(msg_mbox)
        {
            //链表插入操作
            msg->next = msg_mbox->next;
            msg_mbox->next = msg;
        }else{
            msg_mbox = msg;
            msg->next = NULL;
        }
        pthread_mutex_unlock(&recv_mutex);
        sem_post(&recv_sem);
    }

    最后调用

    sem_post通知阻塞线程处理。

    处理接收数据包线程——线程只是是实现了echo功能。

    View Code
    void *recv_thread(void *arg)
    {
        static int cnt = 0;
        while(1)
        {
            sem_wait(&recv_sem); 
            pthread_mutex_lock(&recv_mutex);
            struct msg_buf *msg = msg_mbox;
            msg_mbox = msg->next;
            pthread_mutex_unlock(&recv_mutex);
    
            //处理msg消息
            int retval = sendto(msg->sockfd, msg->buf,msg->len, 0,
                            (struct sockaddr *)&(msg->their_addr), msg->sin_size);
            printf("Received %d from %s\n%s\n",++cnt,inet_ntoa((msg->their_addr).sin_addr),msg->buf);
            put_msgbuf(msg);
        }
    }

    main中循环

    View Code
    int main(int argc,char *argv[])
    {
    
        int sockfd;                     /* 数据端口 */
        struct sockaddr_in my_addr;     /* 自身的地址信息 */
       
        int  retval;
        if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
            perror("socket");
            exit(1);
        }
        /* 设置地址可复用 */
        int option = 1;
        setsockopt( sockfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option) );
    
        my_addr.sin_family = AF_INET;
        my_addr.sin_port = htons(MYPORT); /* 网络字节顺序 */
        my_addr.sin_addr.s_addr = INADDR_ANY; /* 自动填本机IP */
        bzero(&(my_addr.sin_zero), 8); /* 其余部分置0 */
    
        if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {
            perror("bind");
            exit(1);
        }
    
        /**
         * 其它初始化 
         * */
    
        server_init();
    
        /* 主循环 */
        while(1) {
            struct msg_buf     *recvmsg = get_msgbuf();
            size_t len = sizeof(recvmsg->buf);
            char *buf  = recvmsg->buf;
            struct sockaddr_in their_addr; 
            int sin_size;
            memset(buf,len,0);
    
            retval = recvfrom(sockfd, buf, len, 0,
                              (struct sockaddr *)&their_addr, &sin_size);
            printf("%s\t%s\n",inet_ntoa(their_addr.sin_addr),buf);
            if (retval == 0) {
                perror ("recvfrom");
                close(sockfd);
                break;
            }//
            //封装消息
            recvmsg->their_addr = their_addr;
            recvmsg->sin_size = sin_size;
            recvmsg->sockfd = sockfd;
            recvmsg->len = retval;
            //发送给某消息处理
            msg_post(recvmsg);
        }
        return 0;
    }

    Over!

  • 相关阅读:
    解题报告 poj 1486
    解题报告 比赛
    解题报告 keke 的房子
    解题报告 Tree
    解题报告 聚会
    解题报告 Valentine‘s seat
    解题报告 报数
    解题报告 黑书 Water pail poi 1999
    解题报告 poj 1639
    解题报告 数数
  • 原文地址:https://www.cnblogs.com/westfly/p/2446461.html
Copyright © 2011-2022 走看看