zoukankan      html  css  js  c++  java
  • Php5.6.15-fpm的运行机制源码剖析

    源码版本:Php5.6.15

    源码目录:sapi/fpm/fpm

    说明:源码的主要功能在上面直接注解

    =============>>start<<================================

    主进程信号初始化,依据收到的信号类型,进行处理

    int fpm_signals_init_main() /* {{{ */
    {
    struct sigaction act; //create sigaction structure and bind signal handle function

    // 创建sockpair,多路IO复用
    if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {
    zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");
    return -1;
    }
    // 设置非阻塞
    if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {
    zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");
    return -1;
    }
    //设置fd为保持状态,不关闭
    if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {
    zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");
    return -1;
    }

    memset(&act, 0, sizeof(act));
    act.sa_handler = sig_handler; //设置信号处理函数
    sigfillset(&act.sa_mask); //初始化信号集

    //添加处理的信号集合
    if (0 > sigaction(SIGTERM, &act, 0) ||
    0 > sigaction(SIGINT, &act, 0) ||
    0 > sigaction(SIGUSR1, &act, 0) ||
    0 > sigaction(SIGUSR2, &act, 0) ||
    0 > sigaction(SIGCHLD, &act, 0) ||
    0 > sigaction(SIGQUIT, &act, 0)) {

    zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");
    return -1;
    }
    return 0;
    }
    /* }}} */

    信号处理函数,根据相应的信号,把相对应的字符写入sp[1]
    static void sig_handler(int signo) /* {{{ */
    {
    static const char sig_chars[NSIG + 1] = {
    [SIGTERM] = 'T',
    [SIGINT] = 'I',
    [SIGUSR1] = '1',
    [SIGUSR2] = '2',
    [SIGQUIT] = 'Q',
    [SIGCHLD] = 'C'
    };
    char s;
    int saved_errno;

    if (fpm_globals.parent_pid != getpid()) {
    /* prevent a signal race condition when child process
    have not set up it's own signal handler yet */
    return;
    }

    saved_errno = errno;
    s = sig_chars[signo];
    write(sp[1], &s, sizeof(s));
    errno = saved_errno;
    }
    /* }}} */


    子进程信号初始化,依据收到的信号类型,进行处理

    int fpm_signals_init_child() /* {{{ */
    {
    struct sigaction act, act_dfl;

    memset(&act, 0, sizeof(act));
    memset(&act_dfl, 0, sizeof(act_dfl));

    act.sa_handler = &sig_soft_quit; //调用信号处理函数,只处理SIGQUIT信号
    act.sa_flags |= SA_RESTART;

    act_dfl.sa_handler = SIG_DFL; //默认信号处理方法

    //关闭socket对
    close(sp[0]);
    close(sp[1]);

    if (0 > sigaction(SIGTERM, &act_dfl, 0) ||
    0 > sigaction(SIGINT, &act_dfl, 0) ||
    0 > sigaction(SIGUSR1, &act_dfl, 0) ||
    0 > sigaction(SIGUSR2, &act_dfl, 0) ||
    0 > sigaction(SIGCHLD, &act_dfl, 0) ||
    0 > sigaction(SIGQUIT, &act, 0)) {

    zlog(ZLOG_SYSERROR, "failed to init child signals: sigaction()");
    return -1;
    }
    return 0;
    }
    /* }}} */

    //子进程SIGQUIT信号处理函数
    static void sig_soft_quit(int signo) /* {{{ */
    {
    int saved_errno = errno;

    /* closing fastcgi listening socket will force fcgi_accept() exit immediately */
    close(0);
    if (0 > socket(AF_UNIX, SOCK_STREAM, 0)) {
    zlog(ZLOG_WARNING, "failed to create a new socket");
    }
    fpm_php_soft_quit();
    errno = saved_errno;
    }
    /* }}} */

    void fpm_event_loop(int err) /* {{{ */
    {
    static struct fpm_event_s signal_fd_event;

    /* sanity check */
    if (fpm_globals.parent_pid != getpid()) {
    return;
    }

    //创建一个fd集合,把fd[0]放入fd集合用于监听fd变化,并设置相应处理函数fpm_got_signal
    fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
    fpm_event_add(&signal_fd_event, 0);

    /* add timers */
    if (fpm_globals.heartbeat > 0) {
    fpm_pctl_heartbeat(NULL, 0, NULL);
    }

    if (!err) {
    fpm_pctl_perform_idle_server_maintenance_heartbeat(NULL, 0, NULL);

    zlog(ZLOG_DEBUG, "%zu bytes have been reserved in SHM", fpm_shm_get_size_allocated());
    zlog(ZLOG_NOTICE, "ready to handle connections");

    #ifdef HAVE_SYSTEMD
    fpm_systemd_heartbeat(NULL, 0, NULL);
    #endif
    }

    while (1) {
    struct fpm_event_queue_s *q, *q2;
    struct timeval ms;
    struct timeval tmp;
    struct timeval now;
    unsigned long int timeout;
    int ret;

    /* sanity check */
    if (fpm_globals.parent_pid != getpid()) {
    return;
    }

    fpm_clock_get(&now);
    timerclear(&ms);

    /* search in the timeout queue for the next timer to trigger */
    q = fpm_event_queue_timer;
    while (q) {
    if (!timerisset(&ms)) {
    ms = q->ev->timeout;
    } else {
    if (timercmp(&q->ev->timeout, &ms, <)) {
    ms = q->ev->timeout;
    }
    }
    q = q->next;
    }

    /* 1s timeout if none has been set */
    if (!timerisset(&ms) || timercmp(&ms, &now, <) || timercmp(&ms, &now, ==)) {
    timeout = 1000;
    } else {
    timersub(&ms, &now, &tmp);
    timeout = (tmp.tv_sec * 1000) + (tmp.tv_usec / 1000) + 1;
    }

    ret = module->wait(fpm_event_queue_fd, timeout);

    /* is a child, nothing to do here */
    if (ret == -2) {
    return;
    }

    if (ret > 0) {
    zlog(ZLOG_DEBUG, "event module triggered %d events", ret);
    }

    /* trigger timers */
    q = fpm_event_queue_timer;
    while (q) {
    fpm_clock_get(&now);
    if (q->ev) {
    if (timercmp(&now, &q->ev->timeout, >) || timercmp(&now, &q->ev->timeout, ==)) {
    fpm_event_fire(q->ev);
    /* sanity check */
    if (fpm_globals.parent_pid != getpid()) {
    return;
    }
    if (q->ev->flags & FPM_EV_PERSIST) {
    fpm_event_set_timeout(q->ev, now);
    } else { /* delete the event */
    q2 = q;
    if (q->prev) {
    q->prev->next = q->next;
    }
    if (q->next) {
    q->next->prev = q->prev;
    }
    if (q == fpm_event_queue_timer) {
    fpm_event_queue_timer = q->next;
    if (fpm_event_queue_timer) {
    fpm_event_queue_timer->prev = NULL;
    }
    }
    q = q->next;
    free(q2);
    continue;
    }
    }
    }
    q = q->next;
    }
    }
    }
    /* }}} */

    static void fpm_got_signal(struct fpm_event_s *ev, short which, void *arg) /* {{{ */
    {
    char c;
    int res, ret;
    int fd = ev->fd;

    do {
    do {
    res = read(fd, &c, 1); //从fd[0]中取值,放入变量c
    } while (res == -1 && errno == EINTR);

    if (res <= 0) {
    if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
    zlog(ZLOG_SYSERROR, "unable to read from the signal pipe");
    }
    return;
    }
    //依据信号c的不同,执行不同的逻辑
    switch (c) {
    // 子信号退出或者暂停
    case 'C' : /* SIGCHLD */
    zlog(ZLOG_DEBUG, "received SIGCHLD");
    fpm_children_bury();
    break;
    // 收到终止进程的信号
    case 'I' : /* SIGINT */
    zlog(ZLOG_DEBUG, "received SIGINT");
    zlog(ZLOG_NOTICE, "Terminating ...");
    fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
    break;
    // 同上
    case 'T' : /* SIGTERM */
    zlog(ZLOG_DEBUG, "received SIGTERM");
    zlog(ZLOG_NOTICE, "Terminating ...");
    fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
    break;
    // 进程退出信号
    case 'Q' : /* SIGQUIT */
    zlog(ZLOG_DEBUG, "received SIGQUIT");
    zlog(ZLOG_NOTICE, "Finishing ...");
    fpm_pctl(FPM_PCTL_STATE_FINISHING, FPM_PCTL_ACTION_SET);
    break;
    // 重新打开日志信号
    case '1' : /* SIGUSR1 */
    zlog(ZLOG_DEBUG, "received SIGUSR1");
    if (0 == fpm_stdio_open_error_log(1)) {
    zlog(ZLOG_NOTICE, "error log file re-opened");
    } else {
    zlog(ZLOG_ERROR, "unable to re-opened error log file");
    }

    ret = fpm_log_open(1);
    if (ret == 0) {
    zlog(ZLOG_NOTICE, "access log file re-opened");
    } else if (ret == -1) {
    zlog(ZLOG_ERROR, "unable to re-opened access log file");
    }
    /* else no access log are set */

    break;
    // reload信号
    case '2' : /* SIGUSR2 */
    zlog(ZLOG_DEBUG, "received SIGUSR2");
    zlog(ZLOG_NOTICE, "Reloading in progress ...");
    fpm_pctl(FPM_PCTL_STATE_RELOADING, FPM_PCTL_ACTION_SET);
    break;
    }

    if (fpm_globals.is_child) {
    break;
    }
    } while (1);
    return;
    }
    /* }}} */

    下面对各种信号的处理做一下分析

    子信号退出或者暂停是会发送信号SIGCHLD,最懂会调用fpm_children_bury函数,在Fpm_children.c中

    void fpm_children_bury() /* {{{ */
    {
    int status;
    pid_t pid;
    struct fpm_child_s *child;
    //子进程结束,返回pid执行循环,否则退出
    while ( (pid = waitpid(-1, &status, WNOHANG | WUNTRACED)) > 0) {
    char buf[128];
    int severity = ZLOG_NOTICE;
    int restart_child = 1;

    child = fpm_child_find(pid); //获取退出的子进程

    // 正常退出
    if (WIFEXITED(status)) {

    snprintf(buf, sizeof(buf), "with code %d", WEXITSTATUS(status));

    /* if it's been killed because of dynamic process management
    * don't restart it automaticaly
    */
    if (child && child->idle_kill) {
    restart_child = 0;
    }

    if (WEXITSTATUS(status) != FPM_EXIT_OK) {
    severity = ZLOG_WARNING;
    }
    // 收到退出信号
    } else if (WIFSIGNALED(status)) {
    const char *signame = fpm_signal_names[WTERMSIG(status)];
    const char *have_core = WCOREDUMP(status) ? " - core dumped" : "";

    if (signame == NULL) {
    signame = "";
    }

    snprintf(buf, sizeof(buf), "on signal %d (%s%s)", WTERMSIG(status), signame, have_core);

    /* if it's been killed because of dynamic process management
    * don't restart it automaticaly
    */
    if (child && child->idle_kill && WTERMSIG(status) == SIGQUIT) {
    restart_child = 0;
    }

    if (WTERMSIG(status) != SIGQUIT) { /* possible request loss */
    severity = ZLOG_WARNING;
    }
    // 暂停退出信号
    } else if (WIFSTOPPED(status)) {

    zlog(ZLOG_NOTICE, "child %d stopped for tracing", (int) pid);

    if (child && child->tracer) {
    //主进程通过tracer恢复子进程
    child->tracer(child);
    }

    continue;
    }

    if (child) {
    struct fpm_worker_pool_s *wp = child->wp;
    struct timeval tv1, tv2;

    fpm_child_unlink(child);

    fpm_scoreboard_proc_free(wp->scoreboard, child->scoreboard_i);

    fpm_clock_get(&tv1);

    timersub(&tv1, &child->started, &tv2);

    if (restart_child) {
    if (!fpm_pctl_can_spawn_children()) {
    severity = ZLOG_DEBUG;
    }
    zlog(severity, "[pool %s] child %d exited %s after %ld.%06d seconds from start", child->wp->config->name, (int) pid, buf, tv2.tv_sec, (int) tv2.tv_usec);
    } else {
    zlog(ZLOG_DEBUG, "[pool %s] child %d has been killed by the process management after %ld.%06d seconds from start", child->wp->config->name, (int) pid, tv2.tv_sec, (int) tv2.tv_usec);
    }

    fpm_child_close(child, 1 /* in event_loop */);

    fpm_pctl_child_exited(); //主进程退出
    //异常退出线程数大于配置项,reload
    if (last_faults && (WTERMSIG(status) == SIGSEGV || WTERMSIG(status) == SIGBUS)) {
    time_t now = tv1.tv_sec;
    int restart_condition = 1;
    int i;

    last_faults[fault++] = now;

    if (fault == fpm_global_config.emergency_restart_threshold) {
    fault = 0;
    }

    for (i = 0; i < fpm_global_config.emergency_restart_threshold; i++) {
    if (now - last_faults[i] > fpm_global_config.emergency_restart_interval) {
    restart_condition = 0;
    break;
    }
    }

    if (restart_condition) {

    zlog(ZLOG_WARNING, "failed processes threshold (%d in %d sec) is reached, initiating reload", fpm_global_config.emergency_restart_threshold, fpm_global_config.emergency_restart_interval);

    fpm_pctl(FPM_PCTL_STATE_RELOADING, FPM_PCTL_ACTION_SET);
    }
    }

    if (restart_child) {
    // 如果子进程非正常退出,重新fork一个
    fpm_children_make(wp, 1 /* in event loop */, 1, 0);

    if (fpm_globals.is_child) {
    break;
    }
    }
    } else {
    zlog(ZLOG_ALERT, "oops, unknown child (%d) exited %s. Please open a bug report (https://bugs.php.net).", pid, buf);
    }
    }
    }
    /* }}} */

    PHP用的最多的是reload操作,对应于SIGUSR2信号,对应处理函数fpm_pctl

    void fpm_pctl(int new_state, int action) /* {{{ */
    {
    switch (action) {
    case FPM_PCTL_ACTION_SET :
    if (fpm_state == new_state) { /* already in progress - just ignore duplicate signal */
    return;
    }
    // fpm_state
    switch (fpm_state) { /* check which states can be overridden */
    case FPM_PCTL_STATE_NORMAL :
    /* 'normal' can be overridden by any other state */
    break;
    case FPM_PCTL_STATE_RELOADING :
    /* 'reloading' can be overridden by 'finishing' */
    if (new_state == FPM_PCTL_STATE_FINISHING) break;
    case FPM_PCTL_STATE_FINISHING :
    /* 'reloading' and 'finishing' can be overridden by 'terminating' */
    if (new_state == FPM_PCTL_STATE_TERMINATING) break;
    case FPM_PCTL_STATE_TERMINATING :
    /* nothing can override 'terminating' state */
    zlog(ZLOG_DEBUG, "not switching to '%s' state, because already in '%s' state",
    fpm_state_names[new_state], fpm_state_names[fpm_state]);
    return;
    }

    fpm_signal_sent = 0;
    fpm_state = new_state;

    zlog(ZLOG_DEBUG, "switching to '%s' state", fpm_state_names[fpm_state]);
    /* fall down */
    //由于没有break语句,fpm_pctl_action_next将会执行
    case FPM_PCTL_ACTION_TIMEOUT :
    fpm_pctl_action_next();
    break;
    case FPM_PCTL_ACTION_LAST_CHILD_EXITED :
    fpm_pctl_action_last();
    break;

    }
    }
    /* }}} */


    /**功能描述
    *1.向所有进程发送信号
    *2.注册一个定时器,在子进程结束超时时重发信号,如果正常结束会向sp[1]写入C,执行fpm_children_bury
    *最终调用fpm_pctl_child_exited,直到最后一个子进程结束。
    **/
    static void fpm_pctl_action_next() /* {{{ */
    {
    int sig, timeout;

    if (!fpm_globals.running_children) {
    fpm_pctl_action_last();
    }

    if (fpm_signal_sent == 0) {
    if (fpm_state == FPM_PCTL_STATE_TERMINATING) {
    sig = SIGTERM;
    } else {
    sig = SIGQUIT;
    }
    timeout = fpm_global_config.process_control_timeout;
    } else {
    if (fpm_signal_sent == SIGQUIT) {
    sig = SIGTERM;
    } else {
    sig = SIGKILL;
    }
    timeout = 1;
    }

    fpm_pctl_kill_all(sig); //发送信号
    fpm_signal_sent = sig;
    fpm_pctl_timeout_set(timeout); //设置超时函数,最终会调用fpm_pctl(FPM_PCTL_STATE_UNSPECIFIED, FPM_PCTL_ACTION_TIMEOUT),继续发信号
    }
    /* }}} */


    static void fpm_pctl_action(struct fpm_event_s *ev, short which, void *arg) /* {{{ */
    {
    fpm_pctl(FPM_PCTL_STATE_UNSPECIFIED, FPM_PCTL_ACTION_TIMEOUT);
    }
    /* }}} */


    如果子进程正常退出,会想sp[1]写入sigchild信号,进行新一轮的循环,执行 fpm_children_bury-->fpm_pctl_child_exited
    当处理完所有子进程时,调用fpm_pctl_action_last 里,则执行reload操作

    static void fpm_pctl_action_last() /* {{{ */
    {
    switch (fpm_state) {
    case FPM_PCTL_STATE_RELOADING: //执行reload操作
    fpm_pctl_exec();
    break;

    case FPM_PCTL_STATE_FINISHING:
    case FPM_PCTL_STATE_TERMINATING:
    fpm_pctl_exit();
    break;
    }
    }

    static void fpm_pctl_exec() /* {{{ */
    {

    zlog(ZLOG_NOTICE, "reloading: execvp("%s", {"%s""
    "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s"
    "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s"
    "})",
    saved_argv[0], saved_argv[0],
    optional_arg(1),
    optional_arg(2),
    optional_arg(3),
    optional_arg(4),
    optional_arg(5),
    optional_arg(6),
    optduiional_arg(7),
    optional_arg(8),
    optional_arg(9),
    optional_arg(10)
    );

    fpm_cleanups_run(FPM_CLEANUP_PARENT_EXEC);
    execvp(saved_argv[0], saved_argv); //进程替换,execvp会清空老进程代码区、数据器、堆栈,重新载入
    zlog(ZLOG_SYSERROR, "failed to reload: execvp() failed");
    exit(FPM_EXIT_SOFTWARE);
    }
    /* }}} */

    以上源码追踪,最后总结一下reload逻辑过程,其信号处理逻辑不再追述。

    1.fpm主进程收到SIGUSR2信号,回调函数向sp[1] 写入2这个字符;
    2.fpm fpm_event_loop 监听到数据写入(IO复用)调用回调函数fpm_got_signal读取信号标识符2;
    3.执行 fpm_pctl--> fpm_pctl_action_next,向每个子进程发信号并注册一个定时器;
    4.如果子进程退出超时,主进程会重发信号,正常退出则主进程收到sigchild信号,写入标识符C对应sigchild;
    5.重复2,取出C信号标识符执行fpm_children_bury-->fpm_pctl_child_exited;
    6.所有子进程都结束后,进入fpm_pctl_action_last中断reload操作,主要是调用execvp创建新进程(execvp执行方式是代码空间、数据空间、堆栈的替换);

  • 相关阅读:
    什么是tomcat集群?
    cmd黑客入侵命令大全
    Linix基本命令
    Windows CMD命令大全
    python 函数1
    Python 集合(set)使用
    python 数据字典应用
    python 数据运算
    python 数据类型(元组(不可变列表),字符串
    python 数据类型(列表)学习笔记
  • 原文地址:https://www.cnblogs.com/virgree/p/5111220.html
Copyright © 2011-2022 走看看