NGINX EVENTS 机制
目录
1 源码分析
auto/configure 生成 obj/ngx_modules.c:
char *ngx_module_names[] = "ngx_http_upstream_module{ "ngx_core_module", "ngx_errlog_module", "ngx_conf_module", "ngx_regex_module", "ngx_events_module", "ngx_event_core_module", "ngx_epoll_module", "ngx_http_module", "ngx_http_core_module", "ngx_http_log_module", , "ngx_http_static_module", /* ... */ }";
nginx.c main function 调用 ngx_preinit_modules() 设置模块数组.
ngx_int_t ngx_preinit_modules(void) { ngx_uint_t i; for (i = 0; ngx_modules[i]; i++) { ngx_modules[i]->index = i; ngx_modules[i]->name = ngx_module_names[i]; } ngx_modules_n = i; ngx_max_module = ngx_modules_n + NGX_MAX_DYNAMIC_MODULES; return NGX_OK; }
ngx_init_cycle 函数调用 ngx_init_cycle(cycle) 初始化相关内存.
ngx_int_t ngx_cycle_modules(ngx_cycle_t *cycle) { /* * create a list of modules to be used for this cycle, * copy static modules to it */ cycle->modules = ngx_pcalloc(cycle->pool, (ngx_max_module + 1) * sizeof(ngx_module_t *)); if (cycle->modules == NULL) { return NGX_ERROR; } ngx_memcpy(cycle->modules, ngx_modules, ngx_modules_n * sizeof(ngx_module_t *)); cycle->modules_n = ngx_modules_n; return NGX_OK; }
检测是否需要调用create_conf:
for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->type != NGX_CORE_MODULE) { continue; } module = cycle->modules[i]->ctx; if (module->create_conf) { rv = module->create_conf(cycle); if (rv == NULL) { ngx_destroy_pool(pool); return NULL; } cycle->conf_ctx[cycle->modules[i]->index] = rv; } }
ngx_events_module, create_conf 是 null, init_conf 是 ngx_event_init_conf.
ngx_event_core_module, create_conf 是 ngx_event_core_create_conf, init_conf 是 ngx_event_core_init_conf.
ngx_epoll_module_ctx, create_conf 是 ngx_epoll_create_conf, init_conf 是 ngx_epoll_init_conf.
nginx 单进程模式下, 进入ngx_single_process_cycle, 初始化各模块, 之后进入事件处理:
void ngx_single_process_cycle(ngx_cycle_t *cycle) { // ... for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } for ( ;; ) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); ngx_process_events_and_timers(cycle); // ... } }
nginx 多进程模式下, 进入 ngx_master_process_cycle, ngx_start_worker_processes, ngx_worker_process_cycle,
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) { // ... for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) { /* fatal */ exit(2); } } } // ... } static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { // ... ngx_int_t worker = (intptr_t) data; ngx_worker_process_init(cycle, worker); for ( ;; ) { // ... ngx_process_events_and_timers(cycle); // ... } }
ngx_process_events_and_timers 获取 accept 锁, 处理各类event事件.
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { if (ngx_use_accept_mutex) { // ... } (void) ngx_process_events(cycle, timer, flags); ngx_event_process_posted(cycle, &ngx_posted_accept_events); ngx_event_expire_timers(); ngx_event_process_posted(cycle, &ngx_posted_events); }
ngx_process_events是个宏定义, 桥接到 ngx_event_actions.process_events:
#define ngx_process_events ngx_event_actions.process_events #define ngx_done_events ngx_event_actions.done #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn #define ngx_notify ngx_event_actions.notify #define ngx_add_timer ngx_event_add_timer #define ngx_del_timer ngx_event_del_timer
ngx_event_actions在ngx_epoll_init内部, 初始化为 ngx_epoll_module_ctx.actions.
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_epoll_conf_t *epcf; epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); if (nevents < epcf->events) { if (event_list) { ngx_free(event_list); } event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log); } nevents = epcf->events; ngx_io = ngx_os_io; ngx_event_actions = ngx_epoll_module_ctx.actions; return NGX_OK; }
每个模块都有commands, 处理特定event, 这是在conf_parse里完成的:
ngx_conf_handler
至此, 主循环就和 event 处理函数接驳到一起.
1.1 ngx_events_module
类型 NGX_CORE_MODULE. 处理配置相关的命令.
ngx_events_block 处理 events 命令:
- 计算 NGX_EVENT_MODULE 模块的数量;
- 给 NGX_EVENT_MODULE 模块执行 create_conf 操作;
- 调用 ngx_conf_parse 解析配置;
- 给 NGX_EVENT_MODULE 模块执行 init_conf 操作;
1.2 ngx_event_core_module
ngx_event_core_create_conf 初始化 ngx_event_conf_t, 例如 connections, use 等.
ngx_event_core_init_conf 识别具体的异步IO模块, 例如 epoll/poll/select 等. 并根据配置确定 connections 等配置.
1.3 ngx_epoll_module
处理异步IO.
2 侦听流程
ngx_init_cycle 函数里:
- 模块初始化时候在cycle里生成了listening port信息. 在ngx_open_listening_sockets侦听所有的端口;
- ngx_event_process_init 时, 把侦听设置好的fd加到epoll中.
- 调用者在侦听之前已经设置好了accept之后connection handler, 这个handler会设置rpc context, 同时把rev/wev handler设置为预期值.
- accept时还会设置connection recv/send 函数. handler调用这两个函数获取数据.
设置accept rev = ngx_event_acceptex for IOCP, others use ngx_event_accept or ngx_event_recvmsg.
3 RPC实现思考
3.1 Async IO Layer
用epoll/select/poll等实现异步IO, 处理IO事件. 提供事件处理句柄, 提供accept处理句柄. Connection处理函数由上层设置.
RpcClient/RpcServer通用.
线程唤醒, Timer唤醒的Eventfd加入到Epoll.
3.2 IO Handler Layer
数据分片, 协议完整相关逻辑, 处理IO收发, 组包, 编解码. 最终生成完成的数据包(请求/应答).
RpcClient/RpcServer通用.
编解码可由上层提供函数接入.
3.3 应用层
发送请求, 等待应答. 或者处理请求, 发送应答.