zoukankan      html  css  js  c++  java
  • Linux线程池在server上简单应用

    一、问题描写叙述

    如今以C/S架构为例。client向server端发送要查找的数字,server端启动线程中的线程进行对应的查询。将查询结果显示出来。

    二、实现方案

    1. 整个project以client、server、lib组织。例如以下图所看到的:


    2. 进入lib。


    socket.h、socket.c

    /**
      @file		socket.h
      @brief	Socket API header file
    
      TCP socket utility functions, it provides simple functions that helps
      to build TCP client/server.
    
      @author wangzhicheng
     */
    #ifndef SOCKET_H
    #define SOCKET_H
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <resolv.h>
    #include <fcntl.h>
    
    #define MAX_CONNECTION				20
    
    int	TCPServerInit(int port, int *serverfd);
    int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr);
    int TCPServerSelect(int* serverfdlist, int num, int *clientfd, char *clientaddr);
    int	TCPClientInit(int *clientfd);
    int	TCPClientConnect(const int clientfd, const char *addr, int port);
    int	TCPNonBlockRead(int clientfd, char* buf, int size);
    int TCPBlockRead(int clientfd, char* buf, int size);
    int	TCPWrite(int clientfd, char* buf, int size);
    void TCPClientClose(int sockfd);
    void TCPServerClose(int sockfd);
    
    #endif
    


    socket.c

    #include "socket.h"
    /*
     * @brief	initialize TCP server
     * @port		port number for socket
     * @serverfd	server socket fd
     * return server socked fd for success, on error return error code
     * */
    int	TCPServerInit(int port, int *serverfd) {
    	struct sockaddr_in dest;
    	// create socket , same as client
    	*serverfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    	if(*serverfd < 0) return -1;
    	/// initialize structure dest
    	memset((void*)&dest, '', sizeof(dest));
    	dest.sin_family = PF_INET;
    	dest.sin_port = htons(port);
    	dest.sin_addr.s_addr = INADDR_ANY;
    	// Assign a port number to socket
    	bind( *serverfd, (struct sockaddr*)&dest, sizeof(dest));
    
    	return *serverfd;
    }
    /*
     * @brief	wait client connect
     * @serverfd	server socket fd
     * @clientfd	client socket fd
     * @clientaddr	client address which connect to server
     * return client fd, on error return error code
     * */
    int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr) {
    	struct sockaddr_in client_addr;
    	socklen_t addrlen = sizeof(client_addr);
    	// make it listen to socket
    	listen( serverfd, 20);
    	// Wait and Accept connection
    	*clientfd = accept(serverfd, (struct sockaddr*)&client_addr, &addrlen);
    	strcpy( clientaddr, (const char *)( inet_ntoa( client_addr.sin_addr)));
    
    	return *clientfd;
    }
    /*
     * @brief	initialize TCP client
     * @clientfd	client socket fd
     * return client socked fd for success, on error return error code
     */
    int	TCPClientInit(int *clientfd) {
    	*clientfd = socket(PF_INET, SOCK_STREAM, 0);
    
    	return *clientfd;
    }
    /*
     * @brief	connect to TCP server
     * @clientfd	client socket fd
     * @addr		server address
     * @port		server port number
     * return 0 for success, on error -1 is returned
     */
    int	TCPClientConnect(const int clientfd, const char *addr, int port) {
    	struct sockaddr_in dest;
    	// initialize value in dest
    	memset(&dest, '', sizeof(dest));
    	dest.sin_family = PF_INET;
    	dest.sin_port = htons(port);
    	inet_aton(addr, &dest.sin_addr);
    
    	// Connecting to server
    	return connect(clientfd, (struct sockaddr*)&dest, sizeof(dest));
    }
    /*
     * @brief	non-block read from TCP socket
     * @clientfd	socket fd
     * @buf	     	input buffer
     * @size		buffer size
     * return	    the length of read data
     */
    int	TCPNonBlockRead(int clientfd, char* buf, int size) {
    	int opts;
    	opts = fcntl(clientfd, F_GETFL);
    	opts = (opts | O_NONBLOCK);
    	fcntl(clientfd, F_SETFL, opts);
    
    	return recv(clientfd, buf, size, 0);
    }
    /*
     * @brief	block read from TCP socket
     * @clientfd	socket fd
     * @buf	  	    input buffer
     * @size		buf size
     * return	    the length of read data
     */
    int	TCPBlockRead(int clientfd, char* buf, int size) {
    	int opts;
    	opts = fcntl(clientfd, F_GETFL);
    	opts = (opts & ~O_NONBLOCK);
    	fcntl(clientfd, F_SETFL, opts);
    
    	return recv(clientfd, buf, size, 0);
    }
    /*
     * @brief	write to TCP socket
     * @clientfd	socket fd
     * @buf		    output buf
     * @size		output buf length
     * return	    the length of the actual written data, -1: disconnected
     */
    int	TCPWrite(int clientfd, char* buf, int size) {
    	int len= 0;
    	/* set socket to nonblock */
    	int ret = fcntl(clientfd, F_GETFL);
    	ret |= O_NONBLOCK;
    	if (fcntl(clientfd, F_SETFL, ret) < 0 ) {
    		printf("set socket to nonblock fail [%d] !
    ", errno);
    	}
    	len = send(clientfd, buf, size, MSG_NOSIGNAL);
    
    	return len;
    }
    /*
     * @brief	close the tcp connection
     * @sockfd	socket fd
     * return	none
     */
    void TCPConnectionClose(int sockfd) {
    	close(sockfd);
    }


    threadpool.h

    #ifndef THREADPOOL_H
    #define THREADPOOL_H
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    struct job
    {
        void* (*callback_function)(void *arg);    //线程回调函数
        void *arg;                                //回调函数參数
        struct job *next;
    };
    
    struct threadpool
    {
        int thread_num;                   //线程池中开启线程的个数
        int queue_max_num;                //队列中最大job的个数
        struct job *head;                 //指向job的头指针
        struct job *tail;                 //指向job的尾指针
        pthread_t *pthreads;              //线程池中全部线程的pthread_t
        pthread_mutex_t mutex;            //相互排斥信号量
        pthread_cond_t queue_empty;       //队列为空的条件变量
        pthread_cond_t queue_not_empty;   //队列不为空的条件变量
        pthread_cond_t queue_not_full;    //队列不为满的条件变量
        int queue_cur_num;                //队列当前的job个数
        int queue_close;                  //队列是否已经关闭
        int pool_close;                   //线程池是否已经关闭
    };
    
    //================================================================================================
    //函数名:                   threadpool_init
    //函数描写叙述:                 初始化线程池
    //输入:                    [in] thread_num     线程池开启的线程个数
    //                         [in] queue_max_num  队列的最大job个数 
    //输出:                    无
    //返回:                    成功:线程池地址 失败:NULL
    //================================================================================================
    struct threadpool* threadpool_init(int thread_num, int queue_max_num);
    
    //================================================================================================
    //函数名:                    threadpool_add_job
    //函数描写叙述:                  向线程池中加入任务
    //输入:                     [in] pool                  线程池地址
    //                          [in] callback_function     回调函数
    //                          [in] arg                     回调函数參数
    //输出:                     无
    //返回:                     成功:0 失败:-1
    //================================================================================================
    int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
    
    //================================================================================================
    //函数名:                    threadpool_destroy
    //函数描写叙述:                   销毁线程池
    //输入:                      [in] pool                  线程池地址
    //输出:                      无
    //返回:                      成功:0 失败:-1
    //================================================================================================
    int threadpool_destroy(struct threadpool *pool);
    
    //================================================================================================
    //函数名:                    threadpool_function
    //函数描写叙述:                  线程池中线程函数
    //输入:                     [in] arg                  线程池地址
    //输出:                     无  
    //返回:                     无
    //================================================================================================
    void* threadpool_function(void* arg);
    #endif

    threadpool.c

    #include "threadpool.h"
    
    struct threadpool* threadpool_init(int thread_num, int queue_max_num) {
        struct threadpool *pool = NULL;
        do 
        {
            pool = malloc(sizeof(struct threadpool));
            if (NULL == pool)
            {
                printf("failed to malloc threadpool!
    ");
                break;
            }
            pool->thread_num = thread_num;
            pool->queue_max_num = queue_max_num;
            pool->queue_cur_num = 0;
            pool->head = NULL;
            pool->tail = NULL;
            if (pthread_mutex_init(&(pool->mutex), NULL))
            {
                printf("failed to init mutex!
    ");
                break;
            }
            if (pthread_cond_init(&(pool->queue_empty), NULL))
            {
                printf("failed to init queue_empty!
    ");
                break;
            }
            if (pthread_cond_init(&(pool->queue_not_empty), NULL))
            {
                printf("failed to init queue_not_empty!
    ");
                break;
            }
            if (pthread_cond_init(&(pool->queue_not_full), NULL))
            {
                printf("failed to init queue_not_full!
    ");
                break;
            }
            pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
            if (NULL == pool->pthreads)
            {
                printf("failed to malloc pthreads!
    ");
                break;
            }
            pool->queue_close = 0;
            pool->pool_close = 0;
            int i;
            for (i = 0; i < pool->thread_num; ++i)
            {
                pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
            }
            
            return pool;    
        } while (0);
        
        return NULL;
    }
    int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg) {
    	if(pool == NULL || callback_function == NULL || arg == NULL) return -1;
    
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
        {
            pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));   //队列满的时候就等待
        }
        if (pool->queue_close || pool->pool_close)    //队列关闭或者线程池关闭就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            return -1;
        }
        struct job *pjob =(struct job*) malloc(sizeof(struct job));
        if (NULL == pjob)
        {
            pthread_mutex_unlock(&(pool->mutex));
            return -1;
        } 
        pjob->callback_function = callback_function;    
        pjob->arg = arg;
        pjob->next = NULL;
        if (pool->head == NULL)   
        {
            pool->head = pool->tail = pjob;
            pthread_cond_broadcast(&(pool->queue_not_empty));  //队列空的时候,有任务来时就通知线程池中的线程:队列非空
        }
        else
        {
            pool->tail->next = pjob;
            pool->tail = pjob;    
        }
        pool->queue_cur_num++;
        pthread_mutex_unlock(&(pool->mutex));
        return 0;
    }
    
    void* threadpool_function(void* arg) {
        struct threadpool *pool = (struct threadpool*)arg;
        struct job *pjob = NULL;
        while (1)  //死循环
        {
            pthread_mutex_lock(&(pool->mutex));
            while ((pool->queue_cur_num == 0) && !pool->pool_close)   //队列为空时,就等待队列非空
            {
                pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
            }
            if (pool->pool_close)   //线程池关闭,线程就退出
            {
                pthread_mutex_unlock(&(pool->mutex));
                pthread_exit(NULL);
            }
            pool->queue_cur_num--;
            pjob = pool->head;
            if (pool->queue_cur_num == 0)
            {
                pool->head = pool->tail = NULL;
            }
            else 
            {
                pool->head = pjob->next;
            }
            if (pool->queue_cur_num == 0)
            {
                pthread_cond_signal(&(pool->queue_empty));        //队列为空,就能够通知threadpool_destroy函数,销毁线程函数
            }
            if (pool->queue_cur_num == pool->queue_max_num - 1)
            {
                pthread_cond_broadcast(&(pool->queue_not_full));  //队列非满。就能够通知threadpool_add_job函数,加入新任务
            }
            pthread_mutex_unlock(&(pool->mutex));
            
            (*(pjob->callback_function))(pjob->arg);   //线程真正要做的工作,回调函数的调用
            free(pjob);
            pjob = NULL;    
        }
    }
    int threadpool_destroy(struct threadpool *pool) {
    	if(pool == NULL) return -1;
        pthread_mutex_lock(&(pool->mutex));
        if (pool->queue_close || pool->pool_close)   //线程池已经退出了,就直接返回
        {
            pthread_mutex_unlock(&(pool->mutex));
            return -1;
        }
        
        pool->queue_close = 1;        //置队列关闭标志
        while (pool->queue_cur_num != 0)
        {
            pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));  //等待队列为空
        }    
        
        pool->pool_close = 1;      //置线程池关闭标志
        pthread_mutex_unlock(&(pool->mutex));
        pthread_cond_broadcast(&(pool->queue_not_empty));  //唤醒线程池中正在堵塞的线程
        pthread_cond_broadcast(&(pool->queue_not_full));   //唤醒加入任务的threadpool_add_job函数
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            pthread_join(pool->pthreads[i], NULL);    //等待线程池的全部线程运行完成
        }
        
        pthread_mutex_destroy(&(pool->mutex));          //清理资源
        pthread_cond_destroy(&(pool->queue_empty));
        pthread_cond_destroy(&(pool->queue_not_empty));   
        pthread_cond_destroy(&(pool->queue_not_full));    
        free(pool->pthreads);
        struct job *p;
        while (pool->head != NULL)
        {
            p = pool->head;
            pool->head = p->next;
            free(p);
        }
        free(pool);
        return 0;
    }

    3.进入client


    client.c

    /*************************************************************************
        > File Name: test.c
        > Author: wangzhicheng
        > Mail: 2363702560@qq.com 
        > Created Time: Fri 03 Oct 2014 09:43:59 PM WST
     ************************************************************************/
    
    #include "socket.h"
    const char * serveraddr = "127.0.0.1";
    #define TCPPORT 4001
    int main() {
    	int clientfd = -1;
    	char buf[256];
    	strcpy(buf, "1");
    	if(TCPClientInit(&clientfd) < 0) {
    		perror("client init failed...!
    ");
    		exit(EXIT_FAILURE);
    	}
    	if(TCPClientConnect(clientfd, serveraddr, TCPPORT)) {
    		perror("can not connect to server...!
    ");
    		exit(EXIT_FAILURE);
    	}
    	if(TCPWrite(clientfd, buf, strlen(buf) == 1)) {
    		printf("send successfully...!
    ");
    	}
    	else printf("send failed...!
    ");
    
    	return 0;
    }
    

    Makefile

    CC=gcc
    LIBRARY=../lib
    CFLAGS=-I$(LIBRARY)
    CXXFLAGS=
    OBJS1=client.o  socket.o 
    
    all:	client 
    
    
    client: $(OBJS1)
    	$(CC) -o   $@ $(OBJS1) 
    
    socket.o: $(LIBRARY)/socket.c
    	$(CC) -c $(LIBRARY)/socket.c
    
    clean:
    	rm *.o client  > /dev/null 2>&1

    4. 进入server


    server.c

    /*************************************************************************
        > File Name: server.c
        > Author: ma6174
        > Mail: ma6174@163.com 
        > Created Time: Sat 04 Oct 2014 09:46:30 PM WST
     ************************************************************************/
    
    #include "socket.h"
    #include "threadpool.h"
    
    #define TCPPORT 4001
    #define SIZE 256
    #define N 10
    int array[N] = {1, 2, 6, 8, 12, 88, 208, 222, 688, 1018};
    int find(int low, int high, int m) {
    	int mid;
    	if(low <= high) {
    		mid = (low + high) >> 1;
    		if(array[mid] == m) return 1;
    		else if(array[mid] > m) return find(low, mid - 1, m);
    		else return find(mid + 1, high, m);
    	}
    	return 0;
    }
    void* work(void* arg)
    {
        int *p = (int *) arg;
    	int m = *p;
    	if(find(0, N - 1, m)) printf("%d has been found...!
    ", m);
    	else printf("%d has not been found...!
    ", m);
        sleep(1);
    }
    int main() {
    	int serverfd = -1, clientfd = -1;
    	char clientaddr[SIZE];
    	char buf[SIZE];
    	int num;
        struct threadpool *pool = NULL;
    	TCPServerInit(TCPPORT, &serverfd);
    	if(serverfd < 0) {
    		perror("server init failed...!
    ");
    		exit(EXIT_FAILURE);
    	}
        pool = threadpool_init(10, 20);
    	while(1) {
    		TCPServerWaitConnection(serverfd, &clientfd, clientaddr);
    		if(clientfd < 0) {
    			perror("can not connect the clients...!
    ");
    			exit(EXIT_FAILURE);
    		}
    		if(TCPBlockRead(clientfd, buf, SIZE) <= 0) {
    			perror("can not read from client...!
    ");
    			sleep(1);
    		}
    		else {
    			num = atoi(buf);
    			threadpool_add_job(pool, work, &num);
    		}
    	}
        threadpool_destroy(pool);
    
    	return 0;
    }
    

    Makefile

    CC=gcc
    LIBRARY=../lib
    CFLAGS=-I$(LIBRARY)
    CXXFLAGS=
    OBJS1=server.o  socket.o threadpool.o
    
    all:	server
    
    
    server: $(OBJS1)
    	$(CC) -o   $@ $(OBJS1) -lpthread
    
    socket.o: $(LIBRARY)/socket.c
    	$(CC) -c $(LIBRARY)/socket.c
    
    threadpool.o: $(LIBRARY)/threadpool.c
    	$(CC) -c $(LIBRARY)/threadpool.c
    clean:
    	rm *.o client  > /dev/null 2>&1
    三、測试


    四、有关线程池的说明


    当线程池被创建时,线程池中有些“空”的线程。即不运行任务,每当一个任务被增加进来时,任务就被组织成任务队列,线程依照队列队头出。队尾进的原则取出头任务运行。

    任务队列中所含任务数必须控制在一个上限内。超过上限时。任务被堵塞。当全部任务被运行完,销毁线程池。

  • 相关阅读:
    使用gdb跟踪Linux内核启动过程(从start_kernel到init进程启动)
    对一个简单的时间片轮转多道程序内核代码的浅析
    初识计算机工作过程
    React 中 路由 react-router-dom 的用法
    Vue Nuxt.js项目启动后可以在局域网内访问的配置方法
    node express async regeneratorRuntime is not defined (已解决)
    node+vue实现微信支付(沙箱)完整版,亲测可用
    node+vue实现支付宝支付(沙箱)完整版,亲测可用
    Vue.js中Line第三方登录api实现[亲测可用]
    React中WebSocket使用以及服务端崩溃重连
  • 原文地址:https://www.cnblogs.com/jhcelue/p/7110479.html
Copyright © 2011-2022 走看看