zoukankan      html  css  js  c++  java
  • 第一个C语言的小项目

    这里先写下主要的业务代码,一些库代码稍后补充上

    /**
     * Feed新闻个性化推送
     */
    
    #include "push_service_news.h"
    
    /**
     * 保证单进程运行
     */
    void single_process() {
        lock_fd = open("logs/lock", O_CREAT | O_RDWR | O_TRUNC, 00664);
        if (push_trylock_fd(lock_fd) == FAILURE) {
            push_sys_notice("push_service_news already exists
    ");
            close(lock_fd);
            exit(0);
        } else {
            push_sys_notice("push_service_news lock success");
        }
    }
    
    /**
     * 线程数据回收函数,单线程结束的时候,系统自动调用
     */
    void *push_thread_data_del(void *data) {
        if (data) {
            free(data);
            push_sys_notice("thread data free success");
        } else {
            push_sys_notice("thread data free fail");
        }
    }
    
    /**
     * 初始化
     *    连接mysql、线程池、日志打印目录
     */
    void init_main(int argc, char **argv) {
        a_tpool = push_init_tpool(PUSH_TPOOL_THREAD_COUNT, PUSH_TPOOL_MAX_TASK);
        if (!a_tpool) {
            push_sys_error("push_init_tpool error");
            exit(1);
        }
        ThreadData_create(push_thread_data, push_thread_data_del);
    }
    
    /**
     * 获取待执行的任务
     */
    void get_task() {
        int ret = push_mysql_connect(PUSH_DB_HOST, PUSH_DB_USER, PUSH_DB_PASSWD,
                PUSH_DB_PORT, PUSH_DB_DBNAME);
        if (ret) {
            push_sys_error("mysql_connect error");
            exit(1);
        }
        ret = push_dao_newstask_getone(&a_newstask);
        if (ret == FAILURE) {
            push_sys_error("push_dao_newstask_getone error");
            exit(1);
        }
        if (ret == 0) {
            push_sys_notice("have no task");
            exit(1);
        }
        task_id = atol(a_newstask.info[0][F_news_task_id]);
        if (!task_id) {
            push_sys_error("task_id error,:%d", task_id);
            exit(1);
        }
        ret = push_dao_newstask_update_status(task_id, STATUS_DOING);
        if (!ret) {
            push_sys_error("update status doing error:%d", task_id);
            exit(1);
        }
        push_mysql_close();
    }
    
    /**
     * 初始化一些基础路径
     */
    void init_path(int platform) {
        //重新初始化日志打印目录
        char log_path[PUSH_MAXLEN_PATH] = {0};
        snprintf(log_path, sizeof (log_path), "logs/%u", task_id);
        push_init_logger(log_path);
        //base_path
        snprintf(base_path, sizeof (base_path), "logs/%u/", task_id);
        //data_path
        snprintf(data_path, sizeof (data_path), "%s%s", base_path, "data/");
        char *ptr = platform == PUSH_PLATFORM_ANDROID ? "android" : "iphone";
        //cuids_path
        snprintf(cuids_path, sizeof (cuids_path), "%s%s_cuids", data_path, ptr);
        //bigdata_path
        snprintf(bigdata_path, sizeof (bigdata_path), "%s%s_bigdata", data_path, ptr);
        //mkdir
        push_mkdir(data_path);
        push_sys_notice("cuids_path[%s] bigdata_path[%s]", cuids_path, bigdata_path);
        /**
         * 记录一下task的详细信息
         */
        push_sys_notice("task_name[%s]", a_newstask.info[0][F_news_task_task_name]);
        push_sys_notice("android_cuid_files[%s]", a_newstask.info[0][F_news_task_android_cuid_files]);
        push_sys_notice("iphone_cuid_files[%s]", a_newstask.info[0][F_news_task_iphone_cuid_files]);
        push_sys_notice("platform[%s]", a_newstask.info[0][F_news_task_platform]);
    
    }
    
    /**
     * 下载Ftp文件
     * @notice 此函数中不能使用strtok会造成iPhone只解析一个
     * @todo 一篇文章重复推送
     * @param src_file
     * @param platform
     */
    void download_files(char *src_file) {
        char *ptr = NULL, *save_ptr = NULL;
        char wget_cmd[PUSH_LEN_256] = {0};
        char uniq_cmd[PUSH_LEN_256] = {0};
        ptr = strtok_r(src_file, ",", &save_ptr);
        int num = 0, ret = 0;
        //下载所有cuid的文件
        while (ptr) {
            snprintf(wget_cmd, sizeof (wget_cmd), "wget -c -O %s_%d %s >> %swget.log 2>&1", cuids_path, num, ptr, base_path);
            push_sys_notice("wget_cmd start");
            ret = system(wget_cmd);
            push_sys_notice("wget_cmd end:[%s]", wget_cmd);
            if (ret) {
                push_sys_error("wget file error:[%s] [%s]", strerror(errno), wget_cmd);
            }
            ptr = strtok_r(NULL, ",", &save_ptr);
            num++;
        }
        /**
         * 对文件进行去重
         * ① 如果选择的是一个tag则不需要处理了,针对全量用户效果是显著的
         */
        if (num <= 1) {
            snprintf(uniq_cmd, sizeof (uniq_cmd), "cat %s_* > %s", cuids_path, bigdata_path);
        } else {
            snprintf(uniq_cmd, sizeof (uniq_cmd), "sort %s_* -u -T /home/work/tmp/sort_tmp > %s", cuids_path, bigdata_path);
        }
        push_sys_notice("uniq_cmd start");
        system(uniq_cmd);
        push_sys_notice("uniq_cmd end:[%s]", uniq_cmd);
    }
    
    /**
     * 工作线程函数
     * @param arg
     */
    void worker(void *arg) {
        /*-------------------------------qps控制ST---------------------------------*/
        push_thread_data_t *thread_data = (push_thread_data_t *) ThreadData_get(push_thread_data);
        //第一次初始化
        if (!thread_data) {
            thread_data = (push_thread_data_t *) calloc(1, sizeof (push_thread_data_t));
            if (!thread_data) {
                push_sys_error("push_thread_data calloc error:%s", strerror(errno));
                pthread_exit(NULL);
            }
            thread_data->count = 0;
            thread_data->timestamp = push_timestamp();
            ThreadData_set(push_thread_data, thread_data);
        }
        while (1) {
            int now_timestamp = push_timestamp();
            if (now_timestamp == thread_data->timestamp) {
                //触发了qps限制
                if (thread_data->count >= PUSH_EVERY_THREAD_QPS) {
                    push_sys_notice("qps limit: %d >= %d", thread_data->count, PUSH_EVERY_THREAD_QPS);
                    usleep(10000);
                    continue;
                } else { //没有触发qps限制
                    thread_data->count++;
                    push_sys_notice("qps:%d", thread_data->count);
                    break;
                }
            } else { //时间不相等,说明肯定没有触发qps限制
                thread_data->count = 1;
                thread_data->timestamp = now_timestamp;
                break;
            }
        }
        /*-------------------------------qps控制SE----------------------------------*/
        //工作流程
        push_news_worker_param_t *param = (push_news_worker_param_t *) arg;
        char *response = push_mapi_news_batch(param->platform, task_id, param->cuids, &a_newstask);
        if (!response) {
            push_error("response error cuids[%s]", param->cuids);
        } else if (strstr(response, "{"errno":0")) {
            push_notice("send success:response[%s]", response);
        } else {
            push_error("send fail:response[%s] cuids[%s]", response, param->cuids);
        }
        free(param);
    }
    
    /**
     * 读取文件,向线程池添加任务
     */
    int read_file(int platform) {
        FILE *fp = fopen(bigdata_path, "r");
        if (!fp) {
            push_sys_error("open %s error:%s", bigdata_path, strerror(errno));
            return FAILURE;
        }
        char line_data[PUSH_LEN_256] = {0};
        char cuid_data[PUSH_LEN_256] = {0};
        int count;
        push_news_worker_param_t *worker_param;
        for (count = 0; !feof(fp) && fgets(line_data, sizeof (line_data), fp) != NULL; count++) {
            if (count % PUSH_ONCE_CUID_COUNT == 0) {
                worker_param = calloc(1, sizeof (push_news_worker_param_t));
                if (!worker_param) {
                    push_sys_error("calloc worker_param error:%s", strerror(errno));
                    continue;
                }
                worker_param->platform = platform;
            }
            sscanf(line_data, "%[^
    ]", cuid_data);
            strncat(worker_param->cuids, cuid_data, PUSH_MAX_CUID_LEN);
            strncat(worker_param->cuids, ",", 1);
    
            if (count % PUSH_ONCE_CUID_COUNT == (PUSH_ONCE_CUID_COUNT - 1)) {
                if (push_tpool_add_task(a_tpool, worker, worker_param)) {
                    push_sys_error("bigdata_path[%s] tpool add task error", bigdata_path);
                } else {
                    push_sys_notice("bigdata_path[%s] tpool add task success", bigdata_path);
                }
            }
        }
        //不是整数的需要特殊处理的
        if ((count % PUSH_ONCE_CUID_COUNT != (PUSH_ONCE_CUID_COUNT - 1)) && (count % PUSH_ONCE_CUID_COUNT != 0)) {
            if (push_tpool_add_task(a_tpool, worker, worker_param)) {
                push_sys_error("bigdata_path[%s] tpool add task error", bigdata_path);
            } else {
                push_sys_notice("bigdata_path[%s] tpool add task success", bigdata_path);
            }
        }
        fclose(fp);
        return count;
    }
    
    /**
     * 更新数据库数据
     */
    void update_mysql() {
        if (task_id == 0) {
            return;
        }
        push_sys_notice("---android_send_count[%d]---", android_send_count);
        push_sys_notice("---iphone_send_count[%d]---", iphone_send_count);
        int ret = push_mysql_connect(PUSH_DB_HOST, PUSH_DB_USER, PUSH_DB_PASSWD,
                PUSH_DB_PORT, PUSH_DB_DBNAME);
        if (ret) {
            push_sys_error("mysql_connect error");
            return;
        }
        int affect;
        affect = push_dao_newstask_update_send_count(task_id, android_send_count, iphone_send_count);
        if (!affect) {
            push_sys_error("update send_count error");
        }
        affect = push_dao_newstask_update_status(task_id, STATUS_SUSS);
        if (affect > 0) {
            push_sys_error("update status success, status[%d]", STATUS_SUSS);
        }
        push_mysql_close();
    }
    
    /**
     * 收尾工作 
     *    关闭mysql连接、销毁线程池、释放查询结果集
     */
    void end_main(void) {
        push_tpool_destroy(a_tpool);
        push_dao_newstask_free_result(&a_newstask);
        update_mysql();
        push_unlock_fd(lock_fd);
        close(lock_fd);
        ThreadData_delete(push_thread_data);
        push_sys_notice("--------------------task:%d end----------------
    ", task_id);
    }
    
    /**
     * 主函数
     */
    int main(int argc, char **argv) {
        push_init_logger("logs");
        single_process();
        atexit(end_main);
        /**
         *@todo应该先获取任务在实例化线程池 
         */
        init_main(argc, argv);
        get_task();
    
        /**
         * 分平台去处理各自的逻辑 
         */
        platform = atoi(a_newstask.info[0][F_news_task_platform]);
        //Android平台
        if (platform == PUSH_PLATFORM_ANDROID || platform == PUSH_PLATFORM_ALL) {
            init_path(PUSH_PLATFORM_ANDROID);
            download_files(a_newstask.info[0][F_news_task_android_cuid_files]);
            android_send_count = read_file(PUSH_PLATFORM_ANDROID);
        }
        //iphone平台
        if (platform == PUSH_PLATFORM_IPHONE || platform == PUSH_PLATFORM_ALL) {
            init_path(PUSH_PLATFORM_IPHONE);
            download_files(a_newstask.info[0][F_news_task_iphone_cuid_files]);
            iphone_send_count = read_file(PUSH_PLATFORM_IPHONE);
        }
        //错误的平台设置
        if (platform != PUSH_PLATFORM_ANDROID && platform != PUSH_PLATFORM_ALL &&
                platform != PUSH_PLATFORM_IPHONE) {
            push_sys_error("platform error:%d", platform);
            exit(1);
        }
    }
  • 相关阅读:
    Message高级特性 & 内嵌Jetty实现文件服务器
    springboot中使用kindeditor富文本编辑器实现博客功能&vue-elementui使用vue-kindeditor
    Embarcadero RAD Studio XE5
    经典营销故事
    百度竞价教程 借助百度热力图让你的效果翻10倍
    无本借力:他是如何实现年收入70万?
    不用软件快速拥有几百个QQ群并都是管理员
    质保、保修、包修:含义不同
    域名反向解析在自建邮件群发服务器中的应用
    2014年1月1日,马年
  • 原文地址:https://www.cnblogs.com/bai-jimmy/p/5642706.html
Copyright © 2011-2022 走看看