您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關nginx中worker進程循環的實現示例的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
worker進程啟動后,其首先會初始化自身運行所需要的環境,然后會進入一個循環,在該循環中不斷檢查是否有需要執行的事件,然后處理事件。在這個過程中,worker進程也是需要與master進程交互的,更有甚者,worker進程作為一個子進程,也是可以接收命令行指令(比如kill等)以進行相應邏輯的處理的。那么worker進程是如何與master或者命令行指令進行交互的呢?本文首先會對worker進程與master進程交互方式,以及worker進程如何處理命令行指令的流程進行講解,然后會從源碼上對worker進程交互的整個工作流程進行介紹。
1. worker與master進程交互方式
這里首先需要說明的是,無論是master還是外部命令的方式,nginx都是通過標志位的方式來處理相應的指令的,也即在接收到一個指令(無論是master還是外部命令)的時候,worker會在其回調方法中設置與該指令相對應的標志位,然后在worker進程在其自身的循環中處理完事件之后會依次檢查這些標志位是否為真,是則根據該標志位的作用執行相應的邏輯。
對于worker進程與master進程的交互,其是通過socket管道的方式進行的。在ngx_process.h文件中聲明了一個ngx_process_t結構體,這里我們主要關注其channel屬性:
typedef struct { // 其余屬性... ngx_socket_t channel[2]; } ngx_process_t;
這里的ngx_process_t結構體的作用是存儲某個進程相關的信息的,比如pid、channel、status等。每個進程中都有一個ngx_processes數組,數組元素就是這里的ngx_process_t結構體,也就是說每個進程都會通過ngx_processes數組保存其余進程的基本信息。其聲明如下:
// 存儲了nginx中所有的子進程數組,每個子進程都有一個對應的ngx_process_t結構體進行標記
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
這里我們就可以看出,每個進程都會一個與之對應的channel數組,這個數組的長度為2,其是與master進程進行交互的管道流。在master進程創建每一個子進程的之前,都會創建一個channel數組,該數組的創建方法為:
int socketpair(int domain, int type, int protocol, int sv[2]);
這個方法的主要作用是創建一對匿名的已經連接的套接字,也就是說,如果在一個套接字中寫入數據,那么在另一個套接字中就可以接收到寫入的數據。通過這種方式,如果在父進程中往管道的一邊寫入數據,那么在子進程就可以在另一邊接收到數據,這樣就可以實現父子進程的數據通信了。
在master進程啟動完子進程之后,子進程會保有master進程中相應的數據,也包括這里的channel數組。如此,master進程就可以通過channel數組實現與子進程的通信了。
2. worker處理外部命令
對于外部命令,其本質上是通過signals數組中定義的各個信號以及回調方法進行處理的。在master進程初始化基本環境的時候,會將signals數組中指定的信號回調方法設置到對應的信號中。由于worker進程會繼承master進程的基本環境,因而worker進程在接收到這里設置的信號之后,也會調用對應的回調方法。而該回調方法的主要邏輯也僅僅只是設置相應的標志位的值。關于nginx接收到信號之后如何設置對應的標志位,可以參照本人前面的文章(nginx master工作循環 超鏈接),這里不再贅述。
3. 源碼講解
master進程是通過ngx_start_worker_processes()方法啟動各個子進程的,如下是該方法源碼:
/** * 啟動n個worker子進程,并設置好每個子進程與master父進程之間使用socketpair * 系統調用建立起來的socket句柄通信機制 */ static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch; ngx_memzero(&ch, sizeof(ngx_channel_t)); ch.command = NGX_CMD_OPEN_CHANNEL; for (i = 0; i < n; i++) { // spawn是產卵的意思,這里就是生成一個子進程的意思,而該子進程所進行的事件循環就是 // ngx_worker_process_cycle()方法,這里的ngx_worker_process_cycle是worker進程處理事件的循環, // worker進程在一個無限for循環中,不斷的檢查相應的事件模型中是否存在對應的事件, // 然后將accept事件和read、write事件分開放入兩個隊列中,最后在事件循環中不斷的處理事件 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); // 下面的這段代碼的主要作用是將新建進程這個事件通知到其他的進程,上面的 // ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是當前是新建了一個進程, // 而ngx_process_slot存儲的就是該新建進程所存放的數組位置,這里需要進行廣播的原因在于, // 每個子進程被創建后,其內存數據都是復制的父進程的,但是ngx_processes數組是每個進程都有一份的, // 因而數組中先創建的子進程是沒有后創建的子進程的數據的,但是master進程是有所有子進程的數據的, // 因而這里master進程創建子進程之后,其就會向ngx_processes數組的每個進程的channel[0]上 // 寫入當前廣播的事件,也即這里的ch,通過這種方式,每個子進程接收到這個事件之后, // 都會嘗試更新其所保存的ngx_processes數據信息 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; // 廣播事件 ngx_pass_open_channel(cycle, &ch); } }
這里我們主要需要關注上面的啟動子進程的方法調用,也即這里的ngx_spawn_process()方法,該方法的第二個參數是一個方法,在啟動子進程之后,子進程就會進入該方法所指定的循環中。而在ngx_spawn_process()方法中,master進程會為當前新創建的子進程創建一個channel數組,以用于與當前子進程進行通信。如下是ngx_spawn_process()方法的源碼:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { // 在ngx_processes數組中存儲了當前創建的所有進程,而ngx_last_process則是當前當前記錄的最后一個 // process在ngx_processes中的下一個位置的索引,只不過ngx_processes中記錄的進程有可能有部分 // 已經失效了。當前循環就是從頭開始查找是否有某個進程已經失效了,如果已經失效了,則復用該進程位置, // 否則直接使用ngx_last_process所指向的位置 for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } } // 這里說明所創建的進程數達到了最大限度 if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } // NGX_PROCESS_DETACHED標志表示當前fork出來的進程與原來的父進程沒有任何關系,比如進行nginx升級時, // 新生成的master進程就與原先的master進程沒有關系 if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ // 這里的socketpair()方法的主要作用是生成一對套接字流,用于主進程和子進程的通信,這一對套接字會 // 存儲在ngx_processes[s].channel中,本質上這個字段是一個長度為2的整型數組。在主進程和子進程 // 進行通信的之前,主進程會關閉其中一個,而子進程會關閉另一個,然后相互之間往未關閉的另一個文件描述符中 // 寫入或讀取數據即可實現通信。 // AF_UNIX表示當前使用的是UNIX文件形式的socket地址族 // SOCK_STREAM指定了當前套接字建立的通信方式是管道流,并且這個管道流是雙向的, // 即管道雙方都可以進行讀寫操作 // 第三個參數protocol必須為0 if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); // 將ngx_processes[s].channel[0]設置為非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 將ngx_processes[s].channel[1]設置為非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; // 將ngx_processes[s].channel[0]套接字管道設置為異步模式 if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 當前還處于主進程中,這里的ngx_pid指向了主進程的進程id,當前方法的作用主要是將 // ngx_processes[s].channel[0]的操作權限設置給主進程,也就是說主進程通過向 // ngx_processes[s].channel[0]寫入和讀取數據來與子進程進行通信 if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示當前指定的套接字管道在子進程中可以使用,但是在execl()執行的程序中不可使用 if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示當前指定的套接字管道在子進程中可以使用,但是在execl()執行的程序中不可使用 if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // ngx_processes[s].channel[1]是用于給子進程監聽相關事件使用的,當父進程向 // ngx_processes[s].channel[0]發布事件之后,ngx_processes[s].channel[1]中就會接收到 // 對應的事件,從而進行相應的處理 ngx_channel = ngx_processes[s].channel[1]; } else { // 如果是NGX_PROCESS_DETACHED模式,則表示當前是另外新起的一個master進程,因而將其管道值都置為-1 ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; // fork()方法將產生一個新的進程,這個進程與父進程的關系是子進程的內存數據將完全復制父進程的。 // 還需要注意的是,fork()出來的子進程執行的代碼是從fork()之后開始執行的,而對于父進程而言, // 該方法的返回值為父進程id,而對于子進程而言,該方法返回值為0,因而通過if-else語句就可以讓父進程 // 和子進程分別調用后續不同的代碼片段 pid = fork(); switch (pid) { case -1: // fork出錯 ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; case 0: // 子進程執行的分支,這里的proc()方法是外部傳進來的,也就是說,當前方法只是創建一個新的進程, // 具體的進程處理邏輯,將交由外部代碼塊進行定義ngx_getpid()方法獲取的就是當前新創建的子進程的進程id ngx_pid = ngx_getpid(); proc(cycle, data); break; default: // 父進程會走到這里 break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); // 父進程會走到這里,當前的pid是fork()之后父進程得到的新創建的子進程的pid ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; if (respawn >= 0) { return pid; } // 設置當前進程的各個屬性,并且存儲到ngx_processes數組中的對應位置 ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; }
ngx_spawn_process()方法最后會fork()一個子進程以執行其第二個參數所指定的回調方法。但是在這之前,我們需要說明的是,其通過socketpair()方法調用會創建一對匿名的socket,然后將其存儲在當前進程的channel數組中,如此就完成了channel數組的創建。
worker進程啟動之后會執行ngx_worker_process_cycle()方法,該方法首先會對worker進程進行初始化,其中就包括對繼承而來的channel數組的處理。由于master進程和worker進程都保有channel數組所指代的socket描述符,而本質上master進程和各個worker進程只需要保有該數組的某一邊的描述符即可。因而這里worker進程在初始化過程中,會關閉其所保存的另一邊的描述符。在nginx中,master進程統一的會保留channel數組的0號位的socket描述符,關閉1號位的socket描述符,而worker進程則會關閉0號位的socket描述符,保留1號位的描述符。這樣master進程需要與worker進程通信時,就只需要往channel[0]中寫入數據,而worker進程則會監聽channel[1],從而接收到master進程的數據寫入。這里我們首先看一下worker進程的初始化方法ngx_worker_process_init()的源碼:
/** * 這里主要是對當前進程進行初始化,為其設置優先級和打開的文件限制等參數。 * 最后會為當前進程添加一個監聽channel[1]的連接,以不斷讀取master進程的消息,從而進行相應的處理 */ static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) { sigset_t set; ngx_int_t n; ngx_time_t *tp; ngx_uint_t i; ngx_cpuset_t *cpu_affinity; struct rlimit rlmt; ngx_core_conf_t *ccf; ngx_listening_t *ls; // 設置時區相關的信息 if (ngx_set_environment(cycle, NULL) == NULL) { /* fatal */ exit(2); } ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 設置當前進程的優先級 if (worker >= 0 && ccf->priority != 0) { if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setpriority(%d) failed", ccf->priority); } } // 設置當前進程能夠打開的文件句柄數 if (ccf->rlimit_nofile != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile; rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile; if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_NOFILE, %i) failed", ccf->rlimit_nofile); } } // Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes. // 簡而言之就是設置核心文件能夠使用的最大大小 if (ccf->rlimit_core != NGX_CONF_UNSET) { rlmt.rlim_cur = (rlim_t) ccf->rlimit_core; rlmt.rlim_max = (rlim_t) ccf->rlimit_core; if (setrlimit(RLIMIT_CORE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setrlimit(RLIMIT_CORE, %O) failed", ccf->rlimit_core); } } // geteuid()返回執行當前程序的用戶id,這里的0表示是否為root用戶 if (geteuid() == 0) { // setgid()方法的作用是更改組的id if (setgid(ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setgid(%d) failed", ccf->group); /* fatal */ exit(2); } // initgroups()是更改附加組的id if (initgroups(ccf->username, ccf->group) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "initgroups(%s, %d) failed", ccf->username, ccf->group); } // 更改用戶的id if (setuid(ccf->user) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, "setuid(%d) failed", ccf->user); /* fatal */ exit(2); } } // 需要注意的是,對于cache manager和cache loader進程,這里的worker傳入的是-1, // 表示這兩個進程不需要設置親核性 if (worker >= 0) { // 獲取當前worker的CPU親核性 cpu_affinity = ngx_get_cpu_affinity(worker); if (cpu_affinity) { // 設置worker的親核心 ngx_setaffinity(cpu_affinity, cycle->log); } } #if (NGX_HAVE_PR_SET_DUMPABLE) if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "prctl(PR_SET_DUMPABLE) failed"); } #endif if (ccf->working_directory.len) { // chdir()的作用是將當前的工作目錄更改為其參數所傳入的路徑 if (chdir((char *) ccf->working_directory.data) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "chdir(\"%s\") failed", ccf->working_directory.data); /* fatal */ exit(2); } } // 初始化空的set指令集合 sigemptyset(&set); // ◆ SIG_BLOCK:將 set 參數指向信號集中的信號加入到信號掩碼中。 // ◆ SIG_UNBLOCK:將 set 參數指向的信號集中的信號從信號掩碼中刪除。 // ◆ SIG_SETMASK:將 set 參數指向信號集設置為信號掩碼。 // 這里就是直接初始化要阻塞的信號集,默認為空集 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); } tp = ngx_timeofday(); srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec); ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ls[i].previous = NULL; } // 這里調用各個模塊的init_process()方法進行進程模塊的初始化 for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } // 這里主要是關閉當前進程中其他各個進程的channel[1]管道句柄 for (n = 0; n < ngx_last_process; n++) { if (ngx_processes[n].pid == -1) { continue; } if (n == ngx_process_slot) { continue; } if (ngx_processes[n].channel[1] == -1) { continue; } if (close(ngx_processes[n].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } } // 關閉當前進程的channel[0]管道句柄 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } #if 0 ngx_last_process = 0; #endif // ngx_channel指向的是當前進程的channel[1]句柄,也即監聽master進程發送消息的句柄。 // 當前方法中,首先會為當前的句柄創建一個connection對象,并且將其封裝為一個事件,然后將該事件添加到 // 對應的事件模型隊列中以監聽當前句柄的事件,事件的處理邏輯則主要有這里的ngx_channel_handler() // 方法進行。這里的ngx_channel_handler的主要處理邏輯是,根據當前收到的消息設置當前進程的一些標志位, // 或者更新某些緩存數據,如此,在當前進行的事件循環中,通過不斷檢查這些標志位,從而實現在事件進程中 // 處理真正的邏輯。因而這里的ngx_channel_handler的處理效率是非常高的 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } }
該方法主要是對worker進程進行初始化,這里我們主要需要關注最后會遍歷ngx_processes數組,這個數組中保存了當前nginx中各個進程的相關信息。在遍歷過程中,會關閉當前進程保有的其余進程的channel[1]句柄,而保留有channel[0]句柄,這樣當前進程如果需要與其他進程通信,也只需要往目標進程的channel[0]中寫入數據即可。在遍歷完成之后,當前進程就會關閉自身的channel[0]句柄,而保留channel[1]句柄。最后,會通過ngx_add_channel_event()方法為當前進程添加對channel[1]的監聽事件,這里在調用ngx_add_channel_event()方法時傳入的第二個參數是ngx_channel,該參數是在前面的ngx_spawn_process()方法中賦值的,指向的就是當前進程的channel[1]的socket句柄。
關于ngx_add_channel_event()方法,其本質就是創建一個ngx_event_t結構體的事件,然后將其添加到當前所使用的事件模型(比如epoll)句柄中。這里不再贅述該方法的實現源碼,不過我們需要關注的是該事件觸發時的回調方法,即調用ngx_add_channel_event()方法時傳入的第三個參數ngx_channel_handler()方法。如下是該方法的源碼:
static void ngx_channel_handler(ngx_event_t *ev) { ngx_int_t n; ngx_channel_t ch; ngx_connection_t *c; if (ev->timedout) { ev->timedout = 0; return; } c = ev->data; for (;;) { // 在無限for循環中不斷讀取master進程發過來的消息 n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log); // 如果讀取消息出錯,說明當前的句柄可能失效了,就需要關閉當前連接 if (n == NGX_ERROR) { if (ngx_event_flags & NGX_USE_EPOLL_EVENT) { ngx_del_conn(c, 0); } ngx_close_connection(c); return; } if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) { if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) { return; } } if (n == NGX_AGAIN) { return; } // 對發送過來的消息進行處理 switch (ch.command) { // 如果是quit消息,則設置quit標志位 case NGX_CMD_QUIT: ngx_quit = 1; break; // 如果terminate消息,則設置terminate標志位 case NGX_CMD_TERMINATE: ngx_terminate = 1; break; // 如果是reopen消息,則設置reopen標志位 case NGX_CMD_REOPEN: ngx_reopen = 1; break; // 如果是新建進程消息,則更新當前ngx_processes數組對應位置的數據 case NGX_CMD_OPEN_CHANNEL: ngx_processes[ch.slot].pid = ch.pid; ngx_processes[ch.slot].channel[0] = ch.fd; break; // 如果是關閉channel的消息,則關閉ngx_processes數組對應位置的句柄 case NGX_CMD_CLOSE_CHANNEL: if (close(ngx_processes[ch.slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "close() channel failed"); } ngx_processes[ch.slot].channel[0] = -1; break; } } }
在ngx_channel_handler()方法中,主要是讀取所監聽的socket句柄中的數據,而數據是以一個ngx_channel_t結構體所承載的,這個ngx_channel_t是nginx所統一使用的master與worker進程進行通信的結構體,其會指定當前發生的事件類型,以及發生該事件的進程信息。如下是ngx_channel_t結構體的聲明:
typedef struct { // 當前發生的事件類型 ngx_uint_t command; // 發生事件的pid ngx_pid_t pid; // 發生事件的進程在ngx_processes數組中的下標 ngx_int_t slot; // 發生事件的進程的channel[0]描述符的值 ngx_fd_t fd; } ngx_channel_t;
在從當前進程的channel[1]中讀取了ngx_channel_t結構體的數據之后,ngx_channel_handler()方法會根據發生的事件類型更新相應的標志位的狀態,并且會更新當前進程的ngx_processes數組中對應的發生事件的進程的狀態信息。
在處理了master進程所發送的事件之后,worker進程就會繼續其循環,在該循環中會檢查其所關注的標志位的狀態,然后會根據這些狀態執行對應的邏輯。如下是worker進程工作的循環的源碼:
/** * 進入worker進程工作的循環 */ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { ngx_int_t worker = (intptr_t) data; ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker; // 初始化worker進程,前面對該方法的源碼進行了講解 ngx_worker_process_init(cycle, worker); ngx_setproctitle("worker process"); for (;;) { if (ngx_exiting) { // 這里主要是檢查有沒有事件是非cancelable狀態的,也就是說是否所有的事件都已經取消了,如果取消了, // 就會返回NGX_OK。這里的邏輯可以理解為,如果被標記為了ngx_exiting,那么此時,如果還有未取消的 // 事件存在,則會走到下面的ngx_process_events_and_timers()方法,如此就會處理未完成的事件, // 然后在循環中再次走到這個位置,最終if條件為true,從而執行退出worker進程的工作 if (ngx_event_no_timers_left() == NGX_OK) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); // 這里通過檢查相應的事件模型中是否存在對應的事件,然后將其放入隊列中進行處理, // 這里是worker進程處理事件的核心方法 ngx_process_events_and_timers(cycle); // 這里ngx_terminate是強制關閉nginx的選項,如果向nginx發送了強制關閉nginx命令,則當前進程會直接退出 if (ngx_terminate) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } // 這里ngx_quit是優雅退出的選項。這里主要是將ngx_exiting置為1,用于表征當前進程需要退出, // 然后會執行如下三個工作: // 1. 往事件隊列中添加一個事件,用于處理當前處于活躍狀態的連接,將其close標志位置為1,并且執行該連接 // 當前的處理方法,以盡快完成連接事件; // 2. 關閉當前cycle中監聽的socket句柄; // 3. 將當前所有處于空閑狀態的連接的close狀態標記為1,然后調用其連接處理方法. if (ngx_quit) { ngx_quit = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down"); ngx_setproctitle("worker process is shutting down"); if (!ngx_exiting) { ngx_exiting = 1; ngx_set_shutdown_timer(cycle); ngx_close_listening_sockets(cycle); ngx_close_idle_connections(cycle); } } // ngx_reopen主要是重新打開nginx的所有文件,比如切換nginx的日志文件等等 if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } } }
可以看到,worker進程主要處理了nginx是否退出相關的標志位,還處理了nginx是否重新讀取了配置文件的標志位。
感謝各位的閱讀!關于“nginx中worker進程循環的實現示例”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。