zoukankan      html  css  js  c++  java
  • TCP/IP网络编程之多线程服务端的实现(二)

    线程存在的问题和临界区

    上一章TCP/IP网络编程之多线程服务端的实现(一)的thread4.c中,我们发现多线程对同一变量进行加减,最后的结果居然不是我们预料之内的。其实,如果多执行几次程序,会发现每次程序计算的结果都不一样。那么,造成这样的原因是什么呢?

    现在,假设我们一个全局变量sum的值为99,我们创建两个线程,要对sum进行加1操作,那么理想情况下,sum的值应为101。那么要对sum加1并赋值给sum,我们可以简写为:sum+=1。于是多线程可以开始对sum进行操作,但因为是多线程,有可能在一个进程内,一个线程还没执行完,另外一个线程得到CPU时间开始执行。

    所以,让我们把目光放回:sum+=1这行代码,这段代码其实有两个动作,第一个动作是sum+1,第二个是将之前相加的结果重新赋值给sum。那么,有两个线程并发给sum加1,有可能第一个线程执行完相加的结果,得到100,但在赋值之前便失去了CPU时间,轮到另外一个线程获得CPU时间,执行加1的操作,等到执行完相加的操作,第二个线程的CPU时间到头了,最后,两个线程都要执行赋值操作,最后我们看到sum只有100,而并不是我们预想中的101

    线程同步

    为了要解决这一问题,我们必须要求在从sum加1到完成sum的赋值这段临界区,只能有一个线程来完成。而就涉及到线程同步了,这里有两种技术可以实现线程同步,分别是“互斥量”(Mutex)和“信号量”(Semaphore)

    互斥量

    互斥量是“Mutual Exclusion”的简写,表示不允许多个线程同时访问。举个例子,临界区就好比是小房间里的取款机,现在大部分取款机都会装在一个小房间,当需要取款时一个一个人按顺序进入小房间,栓上门开始取款,取款完毕后离开小房间,让下一个人进来取款。把取款的人当做线程,如果多人同时进入小房间(临界区)那肯定会造成安全上的问题,如账号密码泄露。因此,门栓就是互斥量,当栓上门时,不允许其他人(线程)进入小房间(临界区)进行操作

    #include <pthread.h>
    int pthread_mutex_init(pthread_mutex_t * mutex,const pthread_mutexattr_t * attr);
    int pthread_mutex_destroy(pthread_mutex_t * mutex);
    //成功时返回0,失败时返回其他值
    

      

    • mutex:创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址值
    • attr:传递即将创建的互斥量属性,没有特别需要指定的属性时传递NULL

    从上述函数声明中可以看出,为了创建相当于锁系统的互斥量,需要声明如下pthread_mutex_t型变量

    pthread_mutex_t mutex;
    

      

    该变量的地址将传递给pthread_mutex_init函数,用来保存操作系统创建的互斥量(锁系统)。调用pthread_mutex_destroy函数时同样需要该信息,如果不需要配置特殊的互斥量属性,则向第二个参数传递NULL时,可以利用PTHREAD_MUTEX_INITALIZER宏进行如下声明:

    pthread_mutex_t mutex = PTHREAD_MUTEX_INITALIZER;
    

      

    推荐使用pthread_mutex_init函数进行初始化,因为通过宏进行初始化时很难发现发生的错误。接下来介绍利用互斥量锁住或释放临界区时使用的函数

    #include <pthread.h>
    int pthread_mutex_lock(pthread_mutex_t * mutex);
    int pthread_mutex_unlock(pthread_mutex_t * mutex);
    //成功时返回0,失败时返回其他值
    

      

    通过函数名我们很容易理解函数的作用,进入临界区前调用pthread_mutex_lock,调用该函数时,如果发现有其他线程已进入临界区,则pthread_mutex_lock函数会陷入阻塞,直到进入里面的线程调用pthread_mutex_unlock函数退出临界区为止

    mutex.c

    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <pthread.h>
    #define NUM_THREAD 100
    
    void *thread_inc(void *arg);
    void *thread_des(void *arg);
    
    long long num = 0;
    pthread_mutex_t mutex;
    
    int main(int argc, char *argv[])
    {
        pthread_t thread_id[NUM_THREAD];
        int i;
    
        pthread_mutex_init(&mutex, NULL);
    
        for (i = 0; i < NUM_THREAD; i++)
        {
            if (i % 2)
                pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
            else
                pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
        }
    
        for (i = 0; i < NUM_THREAD; i++)
            pthread_join(thread_id[i], NULL);
    
        printf("result: %lld 
    ", num);
        pthread_mutex_destroy(&mutex);
        return 0;
    }
    
    void *thread_inc(void *arg)
    {
        int i;
        pthread_mutex_lock(&mutex);
        for (i = 0; i < 50000000; i++)
            num += 1;
        pthread_mutex_unlock(&mutex);
        return NULL;
    }
    void *thread_des(void *arg)
    {
        int i;
        for (i = 0; i < 50000000; i++)
        {
            pthread_mutex_lock(&mutex);
            num -= 1;
            pthread_mutex_unlock(&mutex);
        }
        return NULL;
    }
    

      

    • 第11行:声明了保存互斥量读取值的变量,之所以声明全局变量是因为thread_inc函数和thread_des函数都需要访问互斥量
    • 第32行:销毁互斥量,不需要互斥量时应销毁
    • 第39、42行:实际临界区只是第41行,但此处连同第40行的循环语句一起用作临界区,调用了lock、unlock函数
    • 第50、52行:通过lock、unlock函数围住对应于临界区的第51行语句

    编译mutex.c并运行

    # gcc mutex.c -D_REENTRANT -o mutex -lpthread
    # ./mutex 
    result: 0 
    

      

    从运行结果可以看出,已解TCP/IP网络编程之多线程服务端的实现(一)中thread4.c的问题。但确认运行结果需要等待比较长的时间,因为互斥量lock、unlock函数的调用过程耗时较久。首先 ,分析一下thread_inc函数的同步过程

    void *thread_inc(void *arg)
    {
        int i;
        pthread_mutex_lock(&mutex);
        for (i = 0; i < 50000000; i++)
            num += 1;
        pthread_mutex_unlock(&mutex);
        return NULL;
    }
    

      

     以上临界区划分范围较大,但可以最大限度减少互斥量lock、unlock函数的调用次数,上述示例中,thread_des函数比thread_inc函数多调用49,999,999次互斥量lock、unlock函数;但是thread_inc相比于thread_des也不是全无缺点,因为当循环完成之前,不允许任何线程访问   

    #include <semaphore.h>
    int sem_init(sem_t * sem, int pshared, unsigned int value); 
    int sem_destroy(sem_t * sem); 
    //成功时返回0,失败时返回其他值
    

      

    • sem:创建信号量时传递保存信号量的变量地址值,销毁时传递需要销毁的信号量变量地址值
    • pshared:传递其他值时,创建可由多个进程共享的信号量;传递0时,创建只允许一个进程内部使用的信号量,我们需要完成同一进程内的线程同步,故传0
    • value:指定新创建的信号量初始值

    上述函数的pshared参数超出我们关注的范围,故默认向其传递0。稍后讲解通过value参数初始化的信号量值是多少,接下来介绍信号量中相当于互斥量lock、unlock的函数

    #include <semaphore.h>
    int sem_wait(sem_t *sem); 
    int sem_post(sem_t *sem); 
    

      

    •  sem:传递保存信号量读取值的变量地址值,传递给sem_post时信号量加1,传递给sem_wait时信号量减1

    调用sem_init函数时,操作系统将创建信号量对象,此对象中记录着“信号量值”(整数)。该值在调用sem_post函数时加1,调用sem_wait函数时减1。但信号量的值不能小于0。因此,在信号量为0的情况下调用sem_wait函数时,调用函数的线程将进入阻塞状态,如果此时有其他线程函数调用sem_post函数,信号量的值将变为1,而原本阻塞的线程可以将该信号量重新减为0并跳出阻塞状态。实际上就是通过这种特性完成临界区的同步操作,可以通过如下形式同步临界区

    sem_wait(&sem);
    //临界区开始
    //……
    //临界区结束
    sem_post(&sem);
    

      

    上述代码结构中,调用sem_wait函数进入临界区的线程在调用sem_post函数前不允许其他线程进入临界区。信号量的值在0和1之间跳转。因此,具有这种特性的机制称为“二进制信号量”

    semaphore.c

    #include <stdio.h>
    #include <pthread.h>
    #include <semaphore.h>
    
    void *read(void *arg);
    void *accu(void *arg);
    static sem_t sem_one;
    static sem_t sem_two;
    static int num;
    
    int main(int argc, char *argv[])
    {
        pthread_t id_t1, id_t2;
        sem_init(&sem_one, 0, 0);
        sem_init(&sem_two, 0, 1);
    
        pthread_create(&id_t1, NULL, read, NULL);
        pthread_create(&id_t2, NULL, accu, NULL);
    
        pthread_join(id_t1, NULL);
        pthread_join(id_t2, NULL);
    
        sem_destroy(&sem_one);
        sem_destroy(&sem_two);
        return 0;
    }
    
    void *read(void *arg)
    {
        int i;
        for (i = 0; i < 5; i++)
        {
            fputs("Input num: ", stdout);
    
            sem_wait(&sem_two);
            scanf("%d", &num);
            sem_post(&sem_one);
        }
        return NULL;
    }
    void *accu(void *arg)
    {
        int sum = 0, i;
        for (i = 0; i < 5; i++)
        {
            sem_wait(&sem_one);
            sum += num;
            sem_post(&sem_two);
        }
        printf("Result: %d 
    ", sum);
        return NULL;
    }
    

      

    • 第14、15行:生成两个信号量,一个信号量的值为0,另一个为1
    • 第35、48行:利用信号量变量sem_two调用wait函数和post函数,这是为了防止在调用accu函数的线程还未取走数据的情况下,调用read函数的线程覆盖原值
    • 第37、46行:利用信号量变量sem_one调用wait和post函数,这是为了防止调用read函数的线程写入新值之前,accu函数再取走旧的数据

    编译semaphore.c并运行

    # gcc semaphore.c -D_REENTRANT -o semaphore -lpthread
    # ./semaphore 
    Input num: 1
    Input num: 2
    Input num: 3
    Input num: 4
    Input num: 5
    Result: 15 
    

      

    线程的销毁和多线程并发服务端的实现

    Linux线程并不是在首次调用的线程main函数返回时自动销毁,所以用如下两种方法之一加以明确,否则由线程创建的内存空间将一直存在

    • 调用pthread_join函数
    • 调用pthread_detach函数

    之前调用过pthread_join函数,调用该函数时,不仅会等待线程终止,还会引导线程销毁。但该函数的问题是,线程终止前,调用该函数的线程将进入阻塞状态。因此,通常通过如下函数调用引导线程销毁

    #include <pthread.h>
    int pthread_detach(pthread_t thread);//成功时返回0,失败时返回其他值
    

      

    • thread:终止的同时需要销毁的线程ID

    调用上述函数不会引起线程终止或进入阻塞状态,可以通过该函数引导销毁线程创建的内存空间。调用该函数后不能针对相应线程调用pthread_join函数,需要注意一下。

    多线程并发服务端的实现

    chat_serv.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <pthread.h>
    
    #define BUF_SIZE 100
    #define MAX_CLNT 256
    
    void *handle_clnt(void *arg);
    void send_msg(char *msg, int len);
    void error_handling(char *msg);
    
    int clnt_cnt = 0;
    int clnt_socks[MAX_CLNT];
    pthread_mutex_t mutx;
    
    int main(int argc, char *argv[])
    {
        int serv_sock, clnt_sock;
        struct sockaddr_in serv_adr, clnt_adr;
        int clnt_adr_sz;
        pthread_t t_id;
        if (argc != 2) {
            printf("Usage : %s <port>
    ", argv[0]);
            exit(1);
        }
    
        pthread_mutex_init(&mutx, NULL);
        serv_sock = socket(PF_INET, SOCK_STREAM, 0);
    
        memset(&serv_adr, 0, sizeof(serv_adr));
        serv_adr.sin_family = AF_INET;
        serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
        serv_adr.sin_port = htons(atoi(argv[1]));
    
        if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1)
            error_handling("bind() error");
        if (listen(serv_sock, 5) == -1)
            error_handling("listen() error");
    
        while (1)
        {
            clnt_adr_sz = sizeof(clnt_adr);
            clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &clnt_adr_sz);
    
            pthread_mutex_lock(&mutx);
            clnt_socks[clnt_cnt++] = clnt_sock;
            pthread_mutex_unlock(&mutx);
    
            pthread_create(&t_id, NULL, handle_clnt, (void *)&clnt_sock);
            pthread_detach(t_id);
            printf("Connected client IP: %s 
    ", inet_ntoa(clnt_adr.sin_addr));
        }
        close(serv_sock);
        return 0;
    }
    
    void *handle_clnt(void *arg)
    {
        int clnt_sock = *((int *)arg);
        int str_len = 0, i;
        char msg[BUF_SIZE];
    
        while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
            send_msg(msg, str_len);
    
        pthread_mutex_lock(&mutx);
        for (i = 0; i < clnt_cnt; i++) // remove disconnected client
        {
            if (clnt_sock == clnt_socks[i])
            {
                while (i++ < clnt_cnt - 1)
                    clnt_socks[i] = clnt_socks[i + 1];
                break;
            }
        }
        clnt_cnt--;
        pthread_mutex_unlock(&mutx);
        close(clnt_sock);
        return NULL;
    }
    void send_msg(char *msg, int len) // send to all
    {
        int i;
        pthread_mutex_lock(&mutx);
        for (i = 0; i < clnt_cnt; i++)
            write(clnt_socks[i], msg, len);
        pthread_mutex_unlock(&mutx);
    }
    void error_handling(char *msg)
    {
        fputs(msg, stderr);
        fputc('
    ', stderr);
        exit(1);
    }
    

      

    • 第17、18行:用于管理接入的客户端套接字的变量和数组,访问这两个变量的代码将构成临界区
    • 第51行:每当有新连接时,将相关信息写入变量clnt_cnt和clnt_socks
    • 第54行:创建线程向新接入的客户端提供服务,由该线程执行第62行定义的函数
    • 第55行:调用pthread_detach函数从内存中完全销毁已终止的线程
    • 第86行: 该函数负责向所连接的客户端发送信息

    chat_clnt.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <pthread.h>
    
    #define BUF_SIZE 100
    #define NAME_SIZE 20
    
    void *send_msg(void *arg);
    void *recv_msg(void *arg);
    void error_handling(char *msg);
    
    char name[NAME_SIZE] = "[DEFAULT]";
    char msg[BUF_SIZE];
    
    int main(int argc, char *argv[])
    {
        int sock;
        struct sockaddr_in serv_addr;
        pthread_t snd_thread, rcv_thread;
        void *thread_return;
        if (argc != 4) {
            printf("Usage : %s <IP> <port> <name>
    ", argv[0]);
            exit(1);
        }
    
        sprintf(name, "[%s]", argv[3]);
        sock = socket(PF_INET, SOCK_STREAM, 0);
    
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
        serv_addr.sin_port = htons(atoi(argv[2]));
    
        if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
            error_handling("connect() error");
    
        pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
        pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
        pthread_join(snd_thread, &thread_return);
        pthread_join(rcv_thread, &thread_return);
        close(sock);
        return 0;
    }
    
    void *send_msg(void *arg) // send thread main
    {
        int sock = *((int *)arg);
        char name_msg[NAME_SIZE + BUF_SIZE];
        while (1)
        {
            fgets(msg, BUF_SIZE, stdin);
            if (!strcmp(msg, "q
    ") || !strcmp(msg, "Q
    "))
            {
                close(sock);
                exit(0);
            }
            sprintf(name_msg, "%s %s", name, msg);
            write(sock, name_msg, strlen(name_msg));
        }
        return NULL;
    }
    
    void *recv_msg(void *arg) // read thread main
    {
        int sock = *((int *)arg);
        char name_msg[NAME_SIZE + BUF_SIZE];
        int str_len;
        while (1)
        {
            str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
            if (str_len == -1)
                return (void *)-1;
            name_msg[str_len] = 0;
            fputs(name_msg, stdout);
        }
        return NULL;
    }
    
    void error_handling(char *msg)
    {
        fputs(msg, stderr);
        fputc('
    ', stderr);
        exit(1);
    }
    

       

    编译chat_serv.c并运行 

    # gcc chat_serv.c -D_REENTRANT -o chat_serv -lpthread
    # ./chat_serv 8500
    Connected client IP: 127.0.0.1 
    Connected client IP: 127.0.0.1 
    

      

    编译chat_clnt.c并运行

    # gcc chat_clnt.c -D_REENTRANT -o chat_clnt -lpthread
    # ./chat_clnt 127.0.0.1 8500 Sam
    Hi everyone~
    [Sam] Hi everyone~
    [Amy] Hi Sam!
    Hello Amy!
    [Sam] Hello Amy!
    

      

    # ./chat_clnt 127.0.0.1 8500 Amy
    [Sam] Hi everyone~
    Hi Sam!
    [Amy] Hi Sam!
    [Sam] Hello Amy!
    
  • 相关阅读:
    Mac + Python3 安装scrapy
    Pyqt4+Eric6+python2.7.13(windows)
    js基础⑥
    python模块之os,sys
    Python模块之random
    Python模块之PIL
    js基础⑤
    js基础④
    js基础③
    centOS目录结构详细版
  • 原文地址:https://www.cnblogs.com/beiluowuzheng/p/9709974.html
Copyright © 2011-2022 走看看