当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP网络处理模块FPM源码分析

发布:smiling 来源: PHP粉丝网  添加日期:2023-09-16 16:20:10 浏览: 评论:0 

一个请求从浏览器到达PHP脚本执行中间有个必要模块是网络处理模块,FPM是这个模块的一部分,配合fastcgi协议实现对请求的从监听到转发到PHP处理,并将结果返回这条流程。

FPM采用多进程模型,就是创建一个master进程,在master进程中创建并监听socket,然后fork多个子进程,然后子进程各自accept请求,子进程在启动后阻塞在accept上,有请求到达后开始读取请求 数据,读取完成后开始处理然后再返回,在这期间是不会接收其它请求的,也就是说fpm的子进程同时只能响应 一个请求,只有把这个请求处理完成后才会accept下一个请求,这是一种同步阻塞的模型。master进程负责管理子进程,监听子进程的状态,控制子进程的数量。master进程与worker进程之间通过共享变量同步信息。

从main函数开始

  1. int main(int argc, char *argv[]) 
  2.     zend_signal_startup(); 
  3.     // 将全局变量sapi_module设置为cgi_sapi_module 
  4.     sapi_startup(&cgi_sapi_module); 
  5.     fcgi_init(); 
  6.     // 获取命令行参数,其中php-fpm -D、-i等参数都是在这里被解析出来的 
  7.     // ... 
  8.     cgi_sapi_module.startup(&cgi_sapi_module); 
  9.     fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr); 
  10.     // master进程会在这一步死循环,后面的流程都是子进程在执行。 
  11.     fcgi_fd = fpm_run(&max_requests); 
  12.     fcgi_fd = fpm_run(&max_requests); 
  13.     request = fpm_init_request(fcgi_fd); 
  14.     // accept请求 
  15.     // .... 

main()函数展现了这个fpm运行完整的框架,可见整个fpm主要分为三个部分:

1、运行前的fpm_init();

2、运行函数fpm_run();

3、子进程accept请求处理。

FPM中的事件监听机制

在详细了解fpm工作过程前,我们要先了解fpm中的事件机制。在fpm中事件的监听默认使用kqueue来实现,关于kqueue的介绍可以看看我之前整理的这篇文章kqueue用法简介。

  1. // fpm中的事件结构体 
  2. struct fpm_event_s { 
  3.     // 事件的句柄 
  4.     int fd; 
  5.     // 下一次触发的事件 
  6.     struct timeval timeout; 
  7.     // 频率:多久执行一次 
  8.     struct timeval frequency; 
  9.     // 事件触发时调用的函数 
  10.     void (*callback)(struct fpm_event_s *, short, void *); 
  11.     void *arg;                // 调用callback时的参数 
  12.     // FPM_EV_READ:读;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:; 
  13.     int flags; 
  14.     int index;                // 在fd句柄数组中的索引 
  15.     // 事件的类型 FPM_EV_READ:读;FPM_EV_TIMEOUT:计时器;FPM_EV_PERSIST:;FPM_EV_EDGE:; 
  16.     short which; 
  17. }; 
  18. // 事件队列 
  19. typedef struct fpm_event_queue_s { 
  20.     struct fpm_event_queue_s *prev; 
  21.     struct fpm_event_queue_s *next; 
  22.     struct fpm_event_s *ev; 
  23. } fpm_event_queue; 

以fpm_run()中master进程注册的一个sp[0]的可读事件为例:

  1. void fpm_event_loop(int err) 
  2.     static struct fpm_event_s signal_fd_event; 
  3.     // 创建一个事件:管道sp[0]可读时触发 
  4.     fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL); 
  5.     // 将事件添加进queue 
  6.     fpm_event_add(&signal_fd_event, 0); 
  7.     // 处理定时器等逻辑 
  8.     // 以阻塞的方式获取事件 
  9.     // module->wait()是一个接口定义的方法签名,下面展示kqueue的实现 
  10.     ret = module->wait(fpm_event_queue_fd, timeout); 
  11. int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency) 
  12.     // ... 
  13.     // 如果事件是触发事件则之间添加进queue中 
  14.     // 对于定时器事件先根据事件的frequency设置事件的触发频率和下一次触发的事件 
  15.     if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) { 
  16.         return -1; 
  17.     } 
  18.     return 0; 
  19. static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev) 
  20.     // ... 
  21.     // 构建并将当前事件插入事件队列queue中 
  22.     if (*queue == fpm_event_queue_fd && module->add) { 
  23.         // module->add(ev)是一个接口定义的方法签名,下面展示kqueue的实现 
  24.         module->add(ev); 
  25.     } 
  26.     return 0; 
  27. // kqueue关于添加事件到kqueue的实现 
  28. static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */ 
  29.     struct kevent k; 
  30.     int flags = EV_ADD; 
  31.     if (ev->flags & FPM_EV_EDGE) { 
  32.             flags = flags | EV_CLEAR; 
  33.     } 
  34.     EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev); 
  35.     if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) { 
  36.         zlog(ZLOG_ERROR, "kevent: unable to add event"); 
  37.         return -1; 
  38.     } 
  39.     /* mark the event as registered */ 
  40.     ev->index = ev->fd; 
  41.     return 0; 

FPM中关于kqueue的实现

  1. // kqueue关于从kqueue中监听事件的实现 
  2. static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */ 
  3.     struct timespec t; 
  4.     int ret, i; 
  5.     /* ensure we have a clean kevents before calling kevent() */ 
  6.     memset(kevents, 0, sizeof(struct kevent) * nkevents); 
  7.     /* convert ms to timespec struct */ 
  8.     t.tv_sec = timeout / 1000; 
  9.     t.tv_nsec = (timeout % 1000) * 1000 * 1000; 
  10.     /* wait for incoming event or timeout */ 
  11.     ret = kevent(kfd, NULL, 0, kevents, nkevents, &t); 
  12.     if (ret == -1) { 
  13.         /* trigger error unless signal interrupt */ 
  14.         if (errno != EINTR) { 
  15.             zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno); 
  16.             return -1; 
  17.         } 
  18.     } 
  19.     /* fire triggered events */ 
  20.     for (i = 0; i < ret; i++) { 
  21.         if (kevents[i].udata) { 
  22.             struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata; 
  23.             fpm_event_fire(ev); 
  24.             /* sanity check */ 
  25.             if (fpm_globals.parent_pid != getpid()) { 
  26.                 return -2; 
  27.             } 
  28.         } 
  29.     } 
  30.     return ret; 

fpm_init

fpm_init()负责启动前的初始化工作,包括注册各个模块的销毁时用于清理变量的callback。下面只介绍几个重要的init。

fpm_conf_init_main

负责解析php-fpm.conf配置文件,分配worker pool内存结构并保存到全局变量fpm_worker_all_pools中,各worker pool配置解析到 fpm_worker_pool_s->config 中。

所谓worker pool 是fpm可以同时监听多个端口,每个端口对应一个worker pool。

fpm_scoreboard_init_main

为每个worker pool分配一个fpm_scoreboard_s结构的内存空间scoreboard,用于记录worker进程运行信息。

  1. // fpm_scoreboard_s 结构 
  2. struct fpm_scoreboard_s { 
  3.     union { 
  4.         atomic_t lock; 
  5.         char dummy[16]; 
  6.     }; 
  7.     char pool[32]; 
  8.     int pm;                    // 进程的管理方式 static、dynamic、ondemand 
  9.     time_t start_epoch; 
  10.     int idle;                // 空闲的worker进程数 
  11.     int active;                // 繁忙的worker进程数 
  12.     int active_max;            // 最大繁忙进程数 
  13.     unsigned long int requests; 
  14.     unsigned int max_children_reached; 
  15.     int lq; 
  16.     int lq_max; 
  17.     unsigned int lq_len; 
  18.     unsigned int nprocs; 
  19.     int free_proc; 
  20.     unsigned long int slow_rq; 
  21.     struct fpm_scoreboard_proc_s *procs[]; 
  22. }; 

fpm_signals_init_main

fpm注册自己的信号量,并设置监听函数的处理逻辑。

  1. int fpm_signals_init_main() /* {{{ */ 
  2.     struct sigaction act; 
  3.     // 创建一个全双工套接字 
  4.     // 全双工的套接字是一个可以读、写的socket通道[0]和[1],每个进程固定一个管道。 
  5.     // 写数据时:管道不满不会被阻塞;读数据时:管道里没有数据会阻塞(可设置) 
  6.     // 向sp[0]写入数据时,sp[0]的读取将会被阻塞,sp[1]的写管道会被阻塞,sp[1]中此时读取sp[0]的数据 
  7.     if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) { 
  8.         zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()"); 
  9.         return -1; 
  10.     } 
  11.     if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) { 
  12.         zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()"); 
  13.         return -1; 
  14.     } 
  15.     if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) { 
  16.         zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)"); 
  17.         return -1; 
  18.     } 
  19.     memset(&act, 0, sizeof(act)); 
  20.     act.sa_handler = sig_handler;       // 监听到信号调用这个函数 
  21.     sigfillset(&act.sa_mask); 
  22.     if (0 > sigaction(SIGTERM,  &act, 0) || 
  23.         0 > sigaction(SIGINT,   &act, 0) || 
  24.         0 > sigaction(SIGUSR1,  &act, 0) || 
  25.         0 > sigaction(SIGUSR2,  &act, 0) || 
  26.         0 > sigaction(SIGCHLD,  &act, 0) || 
  27.         0 > sigaction(SIGQUIT,  &act, 0)) { 
  28.         zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()"); 
  29.         return -1; 
  30.     } 
  31.     return 0; 
  32. // 所有信号共用同一个处理函数 
  33. static void sig_handler(int signo) /* {{{ */ 
  34.     static const char sig_chars[NSIG + 1] = { 
  35.         [SIGTERM] = 'T'
  36.         [SIGINT]  = 'I'
  37.         [SIGUSR1] = '1'
  38.         [SIGUSR2] = '2'
  39.         [SIGQUIT] = 'Q'
  40.         [SIGCHLD] = 'C' 
  41.     }; 
  42.     char s; 
  43.     int saved_errno; 
  44.     if (fpm_globals.parent_pid != getpid()) { 
  45.         return
  46.     } 
  47.     saved_errno = errno; 
  48.     s = sig_chars[signo]; 
  49.     zend_quiet_write(sp[1], &s, sizeof(s)); // 将信息对应的字节写进管道sp[1]端,此时sp[1]端的读数据会阻塞;数据可以从sp[0]端读取 
  50.     errno = saved_errno; 

fpm_sockets_init_main

每个worker pool 开启一个socket套接字。

fpm_event_init_main

这里启动master的事件管理器。用于管理IO、定时事件,其中IO事件通过kqueue、epoll、 poll、select等管理,定时事件就是定时器,一定时间后触发某个事件。同样,我们以kqueue的实现为例看下源码。

  1. int fpm_event_init_main() 
  2.     // ... 
  3.     if (module->init(max) < 0) { 
  4.         zlog(ZLOG_ERROR, "Unable to initialize the event module %s", module->name); 
  5.         return -1; 
  6.     } 
  7.     // ... 
  8. // max用于指定kqueue事件数组的大小 
  9. static int fpm_event_kqueue_init(int max) /* {{{ */ 
  10.     if (max < 1) { 
  11.         return 0; 
  12.     } 
  13.     kfd = kqueue(); 
  14.     if (kfd < 0) { 
  15.         zlog(ZLOG_ERROR, "kqueue: unable to initialize"); 
  16.         return -1; 
  17.     } 
  18.     kevents = malloc(sizeof(struct kevent) * max); 
  19.     if (!kevents) { 
  20.         zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max); 
  21.         return -1; 
  22.     } 
  23.     memset(kevents, 0, sizeof(struct kevent) * max); 
  24.     nkevents = max; 
  25.     return 0; 

fpm_run

fpm_init到此结束,下面进入fpm_run阶段,在这个阶段master进程会根据配置fork出多个子进程然后master进程会进入fpm_event_loop(0)函数,并在这个函数内部死循环,也就是说master进程将不再执行后面的代码,后面的逻辑全部是子进程执行的操作。

master进程在fpm_event_loop里通过管道sp来监听子进程的各个事件,同时也要处理自身产生的一些事件、定时器等任务,来响应的管理子进程。内部的逻辑在介绍事件监听机制时已经详细说过。

  1. int fpm_run(int *max_requests) /* {{{ */ 
  2.     struct fpm_worker_pool_s *wp; 
  3.     /* create initial children in all pools */ 
  4.     for (wp = fpm_worker_all_pools; wp; wp = wp->next) { 
  5.         int is_parent; 
  6.         is_parent = fpm_children_create_initial(wp); 
  7.         if (!is_parent) { 
  8.             goto run_child; 
  9.         } 
  10.     } 
  11.     /* run event loop forever */ 
  12.     fpm_event_loop(0); 
  13. run_child: /* only workers reach this point */ 
  14.     fpm_cleanups_run(FPM_CLEANUP_CHILD); 
  15.     *max_requests = fpm_globals.max_requests; 
  16.     return fpm_globals.listening_socket; 

子进程处理请求

回到main函数,fpm_run后面的逻辑都是子进程在运行。首先会初始化一个fpm的request结构的变量,然后子进程会阻塞在fcgi_accept_request(request)函数上等待请求。关于fcgi_accept_request函数就是死循环一个socket编程的accept函数来接收请求,并将请求数据全部取出。

  1. ... 
  2. // 初始化request 
  3. request = fpm_init_request(fcgi_fd); 
  4. zend_first_try { 
  5.     // accept接收请求 
  6.     while (EXPECTED(fcgi_accept_request(request) >= 0)) { 
  7.         init_request_info(); 
  8.         fpm_request_info(); 
  9.         if (UNEXPECTED(php_request_startup() == FAILURE)) { 
  10.             // ... 
  11.         } 
  12.         if (UNEXPECTED(fpm_status_handle_request())) { 
  13.             goto fastcgi_request_done; 
  14.         } 
  15.         ... 
  16.         // 打开配置文件中DOCUMENT_ROOT设置的脚本 
  17.         if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) { 
  18.             ... 
  19.         } 
  20.         fpm_request_executing(); 
  21.         // 执行脚本 
  22.         php_execute_script(&file_handle); 
  23.         ... 
  24.     } 
  25.     // 销毁请求request 
  26.     fcgi_destroy_request(request); 
  27.     // fcgi退出 
  28.     fcgi_shutdown(); 
  29.     if (cgi_sapi_module.php_ini_path_override) { 
  30.         free(cgi_sapi_module.php_ini_path_override); 
  31.     } 
  32.     if (cgi_sapi_module.ini_entries) { 
  33.         free(cgi_sapi_module.ini_entries); 
  34.     } 
  35. } zend_catch { 
  36.     ... 
  37. } zend_end_try();

Tags: PHP网络处理模块 FPM源码分析

分享到: