`
wsql
  • 浏览: 11775923 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

nginx 源码学习笔记(二十二)—— event 模块(三) ——epoll模块

 
阅读更多

上一节我们讲到了事件驱动的模块,它把我们引入epoll模块,今天我们主要学习下nginx如何使用epoll完成时间驱动,实现高并发;这里不详细讲解epoll原理,如果有机会再做一次单独的epoll的学习。

本文来自于:http://blog.csdn.net/lengzijian

回忆一下上一节的内容,在我们讲到ngx_process_events_and_timers时,在源码最后提到了ngx_process_events,这里是把我们引入epoll的入口:

1.先来看下ngx_process_events的宏定义:

src/event/ngx_event.h

#define ngx_process_events   ngx_event_actions.process_events


2.继续查找ngx_event_actions,我们找到如下结构体:

src/event/ngx_event.h

typedef struct {
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*add_conn)(ngx_connection_t *c);
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                   ngx_uint_t flags);

    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;


a.我们去源代码中搜索下关键字ngx_event_actions

modules/ngx_epoll_module.c:    ngx_event_actions = ngx_epoll_module_ctx.actions;
modules/ngx_select_module.c:    ngx_event_actions = ngx_select_module_ctx.actions;
modules/ngx_poll_module.c:    ngx_event_actions = ngx_poll_module_ctx.actions;

ngx_event.c:ngx_event_actions_t   ngx_event_actions;

前面三行表示:所有event模块对象中的actions就是ngx_event_actions_t对象,而ngx_event_action在第四行定义为全局变量,用于同一接口,下面又存在一个疑问,event模块到底做了些什么?

b.先找到ngx_event_module_t的结构体:

src/event/ngx_event.h

typedef struct {
    ngx_str_t              *name;           //模块名

    void                 *(*create_conf)(ngx_cycle_t *cycle);  //钩子函数,之前讲过
    char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);//同上

    ngx_event_actions_t     actions;       //接下来主要看
} ngx_event_module_t;


我们找一个例子来详细讲解下

src/event/modules/ngx_epoll_module.c

ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
        NULL,                            /* process the changes */
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

这里有注释就不详细讲解了。

ngx_process_events这个函数就是我们要找的,要了好大一圈,ngx_process_events实际上就是调用这个函数,此处本人纠结,为什么作者不加点注释呢。

3.下面正式观察下ngx_epoll_init函数:

src/event/modules/ngx_epoll_module.c

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;
    //获取epoll模块的配置结构
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
    //ep是epoll模块定义的一个全局变量,初始化为-1
    if (ep == -1) {
        //创建一个epoll对象,容量为总连接数的一半
        ep = epoll_create(cycle->connection_n / 2);

        if (ep == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }
    }
    //nevents也是epoll模块的全局变量,初始化为0
    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }
        //event_list存储产生时间的数组
        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    nevents = epcf->events;
    /*初始化全局变量ngx_io,ngx_os_io定义为:
    ngx_os_io_t ngx_os_io = {
        ngx_unix_recv,
        ngx_readv_chain,
        ngx_udp_unix_recv,
        ngx_unix_send,
        ngx_writev_chain,
        0
    };(src/os/unix/ngx_posix_init.c)
    */
    ngx_io = ngx_os_io;
    //这里之前讲过
    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    //实现边沿触发
    ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
    //实现水平出发
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
                      |NGX_USE_GREEDY_EVENT  //io是知道收到DAGAIN为止
                      |NGX_USE_EPOLL_EVENT;  //epoll标志

    return NGX_OK;
}


4.下面观察下主要的函数ngx_epoll_process_events:

src/event/modules/ngx_epoll_module.c

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev, **queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);
    //一开支就等待时间,最长等待时间为timer,这里的timer下一节会详细讲解
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        //执行一次时间更新,nginx将时间缓存到一组全局变量中,方便程序高效获取事件
        ngx_time_update();
    }
    //wait出错
    if (err) {
        if (err == NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }
    //wait返回事件数0,可能是timeout返回,如果不是timeout返回,那么就是error
    if (events == 0) {
        //这里限定timer不是无线超时
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }
    //ngx_posted_events_mutex上锁
    ngx_mutex_lock(ngx_posted_events_mutex);
    //
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        instance = (uintptr_t) c & 1;
        //从发生的epoll事件对象中取得ngx_connection_t对象
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
        //取出读事件
        rev = c->read;
        //....
        //取得发生的时间
        revents = event_list[i].events;
        //记录wait的错误返回状态
        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);
        }
        
        if ((revents & (EPOLLERR|EPOLLHUP))
             && (revents & (EPOLLIN|EPOLLOUT)) == 0)
        {
            /*
             * if the error events were returned without EPOLLIN or EPOLLOUT,
             * then add these flags to handle the events at least in one
             * active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }
        //读取一个事件,并且该连接上注册的读时间是active的
        if ((revents & EPOLLIN) && rev->active) {
            if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
                rev->posted_ready = 1;

            } else {
                rev->ready = 1;
            }
            //时间放入相应的队列中;之前有说过nginx是先放入队列,释放锁之后再做处理
            if (flags & NGX_POST_EVENTS) {
                queue = (ngx_event_t **) (rev->accept ?
                               &ngx_posted_accept_events : &ngx_posted_events);
                /*
                这里根据accept状态
                如果accept为真:加入到ngx_posted_accept_events事件队列中
                如果accept为假:加入到ngx_posted_events事件队列中
                */
                ngx_locked_post_event(rev, queue);//加入到ngx_posted_accept_events队里面

            } else {
                rev->handler(rev);//调用读事件处理函数,通常就是读取事件了
            }
        }
        //取出写事件
        wev = c->write;
        //如果是写事件并且active状态
        if ((revents & EPOLLOUT) && wev->active) {
        
            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            if (flags & NGX_POST_THREAD_EVENTS) {
                //写事件设为posted_ready状态
                wev->posted_ready = 1;

            } else {
                写事件设为ready状态
                wev->ready = 1;
            }

            if (flags & NGX_POST_EVENTS) {
                //不立即处理,要时间排队,加入队列
                ngx_locked_post_event(wev, &ngx_posted_events);

            } else {
                //调用写事件处理函数,通常就是写入数据
                wev->handler(wev);
            }
        }
    }
    //ngx_mutex_unlock解锁
    ngx_mutex_unlock(ngx_posted_events_mutex);

    return NGX_OK;
}
//其中用到了ngx_locked_post_event()这个宏,它把事件放到事件队列的头部。










分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics