nginx 事件模块初始化(2) - ngx_event_module_init、ngx_event_process_init

2018-07-27 11:52:49

ngx_event_module_init 在 ngx_init_cycle里调用,fork子进程之前,创建共享内存,存放负载均衡锁和统计用的原子变量。

static ngx_int_t
ngx_event_module_init(ngx_cycle_t *cycle){
    void              ***cf;
    u_char              *shared;
    size_t               size, cl;
    ngx_shm_t            shm;
    ngx_time_t          *tp;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;

    // events模块的配置结构体
    cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module);

    // event_core模块的配置结构体
    ecf = (*cf)[ngx_event_core_module.ctx_index];

    if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) {
        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                      "using the \"%s\" event method", ecf->name);
    }

    // core模块的配置结构体
    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

    // 获取核心配置的时间精度,用在epoll里更新缓存时间
    ngx_timer_resolution = ccf->timer_resolution;

    // unix专用代码, 可打开的最多文件描述符
#if !(NGX_WIN32)
    {
    ngx_int_t      limit;
    struct rlimit  rlmt;

    // 系统调用getrlimit,Linux内核对进程的限制
    // RLIMIT_NOFILE,进程可打开的最大文件描述符数量,超出将产生EMFILE错误
    if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {

        // 系统调用失败则记录alert级别日志
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "getrlimit(RLIMIT_NOFILE) failed, ignored");

    } else {
        // 成功获取内核参数
        //
        // rlmt.rlim_cur是系统的软限制
        // event里配置的连接数不能超过系统内核限制
        // 或者是配置的rlimit_nofile限制
        if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur
            && (ccf->rlimit_nofile == NGX_CONF_UNSET
                || ecf->connections > (ngx_uint_t) ccf->rlimit_nofile))
        {
            // 如果超过了报警告级别日志
            // limit就是上限
            limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ?
                         (ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;

            ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
                          "%ui worker_connections exceed "
                          "open file resource limit: %i",
                          ecf->connections, limit);
        }
    }
    }
#endif /* !(NGX_WIN32) */


    // 如果非master/worker进程,即只启动一个进程,那么就没必要使用负载均衡锁
    if (ccf->master == 0) {
        return NGX_OK;
    }

    // 已经有了负载均衡锁,已经初始化过了,就没必要再做操作
    if (ngx_accept_mutex_ptr) {
        return NGX_OK;
    }

    // cl是一个基本长度,可以容纳原子变量
    // 对齐到cache line,操作更快
    cl = 128;

    // 最基本的三个:负载均衡锁,连接计数器,
    size = cl            /* ngx_accept_mutex */
           + cl          /* ngx_connection_counter */
           + cl;         /* ngx_temp_number */

    // 创建共享内存,存放负载均衡锁和统计用的原子变量
    shm.size = size;
    ngx_str_set(&shm.name, "nginx_shared_zone");
    shm.log = cycle->log;

    // 利用 mmap 分配一块共享内存
    if (ngx_shm_alloc(&shm) != NGX_OK) {
        return NGX_ERROR;
    }

    // shared是共享内存的地址指针
    shared = shm.addr;

    // 第一个就是负载均衡锁
    ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;

    // spin是-1则不使用信号量
    // 只会自旋,不会导致进程睡眠等待
    // 这样避免抢accept锁时的性能降低
    ngx_accept_mutex.spin = (ngx_uint_t) -1;

    // 初始化互斥锁
    // spin是-1则不使用信号量
    // 只会自旋,不会导致进程睡眠等待
    if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
                         cycle->lock_file.data) != NGX_OK){
        return NGX_ERROR;
    }

    // 连接计数器
    ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);

    //原子操作,把 ngx_connection_counter 从0设置为1
    (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);

    // 临时文件用
    ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);

    tp = ngx_timeofday();

    // 随机数
    // 每个进程不同
    ngx_random_number = (tp->msec << 16) + ngx_pid;

    return NGX_OK;
}


ngx_event_process_init 是在 fork() 之后,worker进程初始化时调用,即每个worker里都会执行。初始化两个延后处理的事件队列、初始化定时器红黑树、发送定时信号、更新时间用、初始化cycle里的连接和事件数组、设置接受连接的回调函数为 ngx_event_accept,可以接受连接。

static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ngx_uint_t           m, i;
    ngx_event_t         *rev, *wev;
    ngx_listening_t     *ls;
    ngx_connection_t    *c, *next, *old;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;
    ngx_event_module_t  *module;

    // core模块的配置结构体
    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

    // event_core模块的配置结构体
    ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);

    // 使用master/worker多进程,使用负载均衡
    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {

        // 设置全局变量
        // 使用负载均衡,刚开始未持有锁,设置抢锁的间隔等待时间
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay;

    } else {
        // 单进程、未明确指定负载均衡,就不使用负载均衡
        ngx_use_accept_mutex = 0;
    }

    // 初始化两个延后处理的事件队列
    // 如果使用了reuseport,或者不使用负载均衡
    // 那么这两个队列就完全不会用到
    ngx_queue_init(&ngx_posted_accept_events);
    ngx_queue_init(&ngx_posted_events);

    // 初始化定时器红黑树
    if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
        return NGX_ERROR;
    }

    // 遍历事件模块,但只执行实际使用的事件模块对应初始化函数比如 ngx_epoll_module
    for (m = 0; cycle->modules[m]; m++) {
        if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }

        // 找到use指令使用的事件模型,或者是默认事件模型
        if (cycle->modules[m]->ctx_index != ecf->use) {
            continue;
        }

        module = cycle->modules[m]->ctx;

        // 调用事件模块的事件初始化函数比如 ngx_epoll_init
        // 调用epoll_create初始化epoll机制
        // 设置全局变量,操作系统提供的底层数据收发接口
        // 初始化全局的事件模块访问接口,指向epoll的函数
        // 默认使用et模式,边缘触发,高速
        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
            exit(2);
        }

        break;
    }

如果设置了时间精度,那么每隔该时间就会发送定时信号来设置全局变量 ngx_event_timer_alarm,这样其他程序只要检查该变量就可以知道是否需要更新时间。

// NGX_USE_TIMER_EVENT标志量 epoll 无此标志位
    // ngx_timer_resolution = ccf->timer_resolution;默认值是0
    // 所以只有使用了timer_resolution指令才会发信号
    if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
        struct sigaction  sa;
        struct itimerval  itv;

        // 设置信号掩码,sigalarm
        ngx_memzero(&sa, sizeof(struct sigaction));
        sa.sa_handler = ngx_timer_signal_handler;
        sigemptyset(&sa.sa_mask);

        if (sigaction(SIGALRM, &sa, NULL) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "sigaction(SIGALRM) failed");
            return NGX_ERROR;
        }

        // 设置信号发送的时间间隔,也就是nginx的时间精度
        // 收到信号会设置设置ngx_event_timer_alarm变量
        // 在epoll的ngx_epoll_process_events里检查,更新时间的标志
        itv.it_interval.tv_sec = ngx_timer_resolution / 1000;
        itv.it_interval.tv_usec = (ngx_timer_resolution % 1000) * 1000;
        itv.it_value.tv_sec = ngx_timer_resolution / 1000;
        itv.it_value.tv_usec = (ngx_timer_resolution % 1000 ) * 1000;

        if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "setitimer() failed");
        }
    }

下面是初始化连接池和监听端口初始化

// 创建连接池数组,大小是cycle->connection_n
    // 直接使用malloc分配内存,没有使用内存池
    cycle->connections =
        ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
    if (cycle->connections == NULL) {
        return NGX_ERROR;
    }

    c = cycle->connections;

    // 创建读事件池数组,大小是cycle->connection_n
    cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                   cycle->log);
    if (cycle->read_events == NULL) {
        return NGX_ERROR;
    }

    // 读事件对象初始化
    rev = cycle->read_events;
    for (i = 0; i < cycle->connection_n; i++) {
        rev[i].closed = 1;
        rev[i].instance = 1;
    }

    // 创建写事件池数组,大小是cycle->connection_n
    cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                    cycle->log);
    if (cycle->write_events == NULL) {
        return NGX_ERROR;
    }

    // 写事件对象初始化
    wev = cycle->write_events;
    for (i = 0; i < cycle->connection_n; i++) {
        wev[i].closed = 1;
    }

    // i是数组的末尾
    i = cycle->connection_n;
    next = NULL;

    // 把连接对象与读写事件关联起来
    // 注意i是数组的末尾,从最后遍历
    do {
        i--;

        // 使用data成员,把连接对象串成链表
        c[i].data = next;

        // 读写事件
        c[i].read = &cycle->read_events[i];
        c[i].write = &cycle->write_events[i];

        // 连接的描述符是-1,表示无效
        c[i].fd = (ngx_socket_t) -1;

        // next指针指向数组的前一个元素
        next = &c[i];
    } while (i);

    // 连接对象已经串成链表,现在设置空闲链表指针
    // 此时next指向连接对象数组的第一个元素
    cycle->free_connections = next;

    // 连接没有使用,全是空闲连接
    cycle->free_connection_n = cycle->connection_n;

    /* for each listening socket */
    // 监听端口初始化
    // 为每个监听端口分配一个连接对象
    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
        // 注意这里
        // 只有worker id是本worker的listen才会enable
        // 也就是说虽然克隆了多个listening,但只有一个会enable
        // 即reuseport的端口只会在某个worker进程监听
        if (ls[i].reuseport && ls[i].worker != ngx_worker) {
            continue;
        }
#endif

        // 获取一个空闲连接
        c = ngx_get_connection(ls[i].fd, cycle->log);

        if (c == NULL) {
            return NGX_ERROR;
        }

        c->type = ls[i].type;
        c->log = &ls[i].log;

        // 连接的listening对象
        // 两者相互连接
        c->listening = &ls[i];
        ls[i].connection = c;

        // 监听端口只关心读事件
        rev = c->read;

        rev->log = c->log;

        // 设置accept标志,接受连接
        rev->accept = 1;

#if (NGX_HAVE_DEFERRED_ACCEPT)
        rev->deferred_accept = ls[i].deferred_accept;
#endif
        // 重要!!
        // 设置接受连接的回调函数为ngx_event_accept
        // 监听端口上收到连接请求时的回调函数,即事件handler
        // 从cycle的连接池里获取连接
        // 关键操作 ls->handler(c);调用其他模块的业务handler
        rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                                : ngx_event_recvmsg;

#if (NGX_HAVE_REUSEPORT)

        if (ls[i].reuseport) {
            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }

            continue;
        }

#endif

        // 如果使用负载均衡,不向epoll添加事件,只有抢到锁才添加
        if (ngx_use_accept_mutex) {
            continue;
        }

        // 单进程、或未指定负载均衡,则不使用负载均衡
        // 直接加入epoll事件,开始监听,可以接受请求
        if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }


epoll 初始化

ngx_epoll_init() 执行完之后,事件处理函数和数据收发函数都已经准备好了,nginx 事件机制可以说是初步建立了。

// 调用epoll_create初始化epoll机制, timer无意义
// 设置全局变量,操作系统提供的底层数据收发接口
// 初始化全局的事件模块访问接口,指向epoll的函数
// 默认使用et模式,边缘触发,高速
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    // 获取epoll模块的配置
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    // ep == -1表示还没有创建epoll句柄,需要初始化
    if (ep == -1) {
        // 创建epoll句柄
        // Linux2.6.8 以后该参数会被忽略,
        ep = epoll_create(cycle->connection_n / 2);

        // epoll初始化失败
        if (ep == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }

#if (NGX_HAVE_EVENTFD)
        // 初始化多线程通知用的描述符和事件/连接
        if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {

            // 如果初始化失败,那么notify指针置空
            ngx_epoll_module_ctx.actions.notify = NULL;
        }
#endif

// aio暂不研究
#if (NGX_HAVE_FILE_AIO)
        ngx_epoll_aio_init(cycle, epcf);
#endif

#if (NGX_HAVE_EPOLLRDHUP)
        ngx_epoll_test_rdhup(cycle);
#endif
    }

    // 检查当前事件数组的大小,最开始nevents是0
    if (nevents < epcf->events) {

        // 如果是reload,那么就先释放,再重新分配内存
        if (event_list) {
            ngx_free(event_list);
        }

        // 默认为512个事件
        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    // 设置正确的数组长度
    nevents = epcf->events;

    // 设置全局变量,操作系统提供的底层数据收发接口
    // ngx_posix_init.c里初始化为linux的底层接口
    ngx_io = ngx_os_io;

    // 初始化全局的事件模块访问接口,指向epoll的函数
    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    // 默认使用et模式,边缘触发,高速
    ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
                      |NGX_USE_GREEDY_EVENT
                      |NGX_USE_EPOLL_EVENT;

    return NGX_OK;
}

流程图大致如下


备注

1.测试环境centos7 64位,nginx版本为 1.14.0。
2..原文地址http://www.freecls.com/a/2712/dc


©著作权归作者所有
收藏
推荐阅读
简介
天降大任于斯人也,必先苦其心志。