libevent是一个轻量级的开源高性能事件驱动网络库,是一个典型的Reactor模型。其主要特点有事件驱动,高性能,跨平台,统一事件源等等。
网上关于libevent的源码分析有很多相关博客,本人在学习过程中也是借助了网络。所以,在此,关于libevent中的许多具体实现部分就不做介绍,主要是从相关数据结构层面上去分析。仅供参考。
libevent中的事件处理类型是event结构类型,event结构体封装了句柄,事件类型,回调函数,以及其他必要的标志和数据,是整个libevent库的核心。
该结构的定义如下:
struct event{ /* * ev_next, ev_active_next都是双向链表节点指针 * 它们是libevent对不同事件类型和在不同时期,对事件的管理时使用到的字段 * * libevent使用双向链表保存所有注册的IO和signal事件 * ev_next 就是该IO事件在链表中的位置,称此链表为已注册事件链表 * ev_active_next: libevent将所有激活事件放入链表active list中,然后遍历active list * 执行调度,ev_active_next就指明了event在active list中的位置 */ TAILQ_ENTRY(event) ev_next; TAILQ_ENTRY(event) ev_active_next; /* * _ev 是一个联合体,所有具有相同描述符的IO事件通过ev.ev_io.ev_io_next成员串联成一个 * 尾队列,称之为IO事件队列,所有具有相同信号值的信号事件通过ev.ev_signal.ev_signal_next * 串联成一个尾队列,称之为信号事件队列。ev.ev_signal.ev_ncalls成员指定时间发生时,Reactor * 需要执行多少次该事件对应的回调函数,ev.ev_signal.ev_pcalls要么是NULL,要么执行ev.ev_signal.ev_ncalls */ union{ struct { TAILQ_ENTRY(event) ev_io_next; struct timeval ev_timeout; }ev_io; struct { TAILQ_ENTRY(event) ev_signal_next; short ev_ncalls; short *ev_pcalls; }ev_signal; } _ev; /* * ev_timeout_pos是一个联合体,它仅用于定时事件处理器,老版本libevnet中使用最小堆管理 * 定时器,但是开发者认为有时简单链表的管理更加高效。所以新版本中引入了“通用定时器”的 * 概念。这些定时器不是存储在时间堆中,而是存储在尾队列中,我们称之为通用定时器队列。 * 对于通用定时器而言,ev_timeout_pos中的ev_next_with_common_timeout成员指出了该定时器 * 在队列中的位置;对于其他定时器,min_heap_idx成员指出了该定时器在时间堆中的位置。一个 * 定时器是否是通用定时器,取决于其超时值的大小。具体参考event.c中的is_common_timeout函数。 */ union{ TAILQ_ENTRY(event) ev_next_with_common_timeout; int min_heap_idx; }ev_timeout_pos; //如果是超时事件ev_timeout超时值 struct timeval ev_timeout; //ev_base :该事件所属的反应堆实例,这是一个event_base结构体 struct event_base *ev_base; //对于IO事件,是绑定的文件描述符,对于signal事件,是绑定的信号 int ev_fd; /* * ev_events : event关注的事件类型,它可以是以下三种类型: * IO事件:EV_WRITE / EV_READ * 定时事件: EV_TIMEOUT * 信号: EV_SIGANL *辅助选项: EV_PERSIST, 表明是一个永久事件 */ short ev_events; //各个事件可以使用 "|"运算符进行组合,信号和IO事件不能同时设置 //事件就绪执行时,调用ev_callback的次数,通常为1 short ev_ncalls; //指针,指向ev_ncalls或NULL short *ev_pncalls; //allows deletes in callback int ev_pri; //smaller numbers are higher priority //ev_callback:event回调函数,被ev_base调用,执行事件处理程序,这是一个函数指针 //其中fd对应ev_fd, events对应ev_events, arg对应ev_arg void (*ev_callback)(int , short, void *arg); //void* 表明可以是任意类型,在设置event时指定 void *ev_arg; //记录了当前激活事件的类型 int ev_res; //result passed to event callback /* * libevent用于标记event信息的字段,表明当前的状态 */ int ev_flags; };
从event结构的定义可以看出,event中封装了句柄,回调函数,和事件类型。包括该事件在相应链表或时间堆中的索引位置。宏TAILQ_ENTRY是尾队列的节点类型,其定义为:
#define TAILQ_ENTRY(type) \ struct { \ struct type *tqe_next; \ /*下一个元素*/ struct type **tqe_prev; \ /*前一个元素的地址*/ }
每当有事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级;接着libevent会根据自己的调度策略选择就绪事件,调用其cb_callback()函数执行事件处理。
结构体event_base是libevent的Reactor,其声明如下:
struct event_base { /* 初始化Reactor的时候选择一种后端IO复用机制,并记录在如下字段中*/ const struct eventop *evsel; /*指向IO复用机制真正存储的数据,它通过evsel成员的init函数来初始化*/ void *evbase; /* 指向信号的后端处理机制,目前仅在signal.h文件中定义了一种处理方法*/ const struct eventop *evsigsel; void *evsigbase; /*信号处理器使用到的数据结构,其中封装了一个socketpair创建的管道,它用于信号处理函数和事件多路分发器之间的通信。*/ struct evsig_info sig; /* 添加到该event_base的所有事件和激活事件的数量*/ int event_count; /**< counts number of total events */ int event_count_active; /**< counts number of active events */ /* 是否执行完活动事件队列上的剩余任务之后就退出事件循环 */ int event_gotterm; /**< Set to terminate loop once done * processing events. */ /* 是否立即退出事件循环,而不管是否还有任务需要处理 */ int event_break; /**< Set to exit loop immediately */ /* 活动事件队列数组。索引越小的队列,优先级越高。高优先级的活动事件队列中的事件处理器将被优先处理*/ struct event_list **activequeues; /* 活动事件队列数组的大小,即该event_base一共有nactivequeues个不同优先级的活动事件队列*/ int nactivequeues; /*是否应该启动一个新的事件循环*/ int event_continue; //目前正在处理的活动事件队列的优先级 int event_running_priority; //事件循环是否已经启动 int running_loop; /** Deferred callback management: a list of deferred callbacks to * run active the active events. */ TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; //文件描述符和IO事件之间的映射关系表 struct event_io_map io; /*信号值和信号事件之间的映射关系表*/ struct event_signal_map sigmap; /*注册时间队列,存放IO事件处理器和信号事件处理器*/ struct event_list eventqueue; /*时间堆*/ struct min_heap timeheap; //系统管理时间的一些成员 struct timeval event_tv; struct timeval tv_cache; };
其中:
evsel和evbase这两个字段的额设置可能会让人迷惑,可以将其看作是类和静态函数的关系,比如添加事件时的调用行为:evsel->add(evbase, ev),实际上执行操作的是evbase,这相当于class::add(instance, ev),instance就是class的一个实例。evsel指向全局变量static const struct eventop *eventops[]中的一个。eventops[]包含了select,poll,kequeue和epoll等等其中的若干个全局实例对象。evbase实际上是一个eventop实例对象。
eventop结构体,是一系列的函数指针,定义如下:
struct eventop{ const char* name; void *(*init)(struct event_base *); //初始化 int (*add)(void *, struct event *); //注册事件 int (*del)(void *, struct event *); //删除事件 int (*dispatch)(struct event_base*, void *, struct timeval *); //事件分发 void (*dealloc)(struct event_base*, void *); //注销,释放资源 //set if we need to reinitialize the event_base int need_reinit; };
在libevent中,每个IO事件分发机制的实现都必须提供这五个函数接口。
事件主循环主要是通过event_base_loop()函数来完成的。其代码如下:
int event_base_loop(struct event_base *base, int flags){ const struct eventop *evsel = base->evsel; void *evbase = base->evbase; struct timeval tv; struct timeval *tv_p; int res, done; //清空时间缓存 base->tv_cache.tv_sec = 0; //evsignal_base是全局变量,在处理signal时,用于指明signal所属的event_base实例 if(base->sig.ev_signal_added) evsignal_base = base; done = 0; //事件主循环 while(!done){ //查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记 //调用event_base_loopbreak设置event_break标志 if(base->event_gotterm){ base->event_gotterm = 0; break; } if(base->event_break){ base->event_break = 0; break; } //you cannot use this interface for multi-threaded apps while(event_gotsig){ event_gotsig = 0; if(event_sigcb){ res = (*event_sigcb)(); if(res == -1){ errno = EINTR; return -1; } } } //校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间 //在timeout_correct函数中,比较last wait time和当前事件,如果 //当前时间 < last wait time //表明时间有问题,这需要更新timer_heap中所有定时事件的超时时间 timeout_correct(base, &tv); //根据time heap中事件的最小超时时间,计算系统IO demultiplexer的最大等待时间 tp_p = &tv; if(!base->event_count_active && !(flags & EVLOOP_NONBLOCK)){ timeout_next(base, &tv_p); } else{ //依然有未处理的就绪时间,就让IO demultiplexer立即返回,不必等待 //下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理 evutil_timerclear(&tv); } //如果当前没有注册事件,就退出 if(!event_haveevents(base)){ event_debug("%s: no events registered.", __func__); return 1; } //更新last wait time,并清空time cache gettime(base, &base->event_tv); base->tv_cache.tv_sec = 0; //调用系统IO demultiplexer等待就绪IO events,可能是epoll_wait,或者select等 //在evsel->dispatch()中,会把就绪signal event / IO event插入到激活链表中 res = evsel->dispatch(base, evbase, tv_p); if(res == -1) return -1; //将time cache 赋值为当前系统时间 gettime(base, &base->tv_cache); //检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中 timeout_process(base); //调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理 //该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表 //然后处理链表中的所有就绪事件 //因此低优先级的就绪事件可能得不得及时处理 if(base->event_count_active){ event_process_active(base); if(!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1; } else if(flags & EVLOOP_NONBLOCK) done = 1; } //循环结束,清空时间缓存 base->tv_cache.tv_sec = 0; event_debug("%s: asked to terminate loop.", __func__); return 0; }
统一事件源,libevent将timer和signal事件都统一到了系统的IO demultiplex机制中
通过socketpair来实现的。即一个socket对,其中有两个socket,一个读,一个写。
将读socket在事件主循环实例中注册一个读事件,当信号发生时,往写socket中写入一个字符,通常为信号值,此时读socket上有读事件,触发IO demultiplex读事件,然后同普通的IO事件一起被处理即可。
timer和IO事件的统一。因为系统的IO机制,例如select()和epoll_wait()都允许程序制定一个最大的等待时间,根据所有timer事件中的最小超时时间来设置IO demultiplex的最大等待时间,当IO返回时,再激活所有就绪的timer事件就可以了,这样就将timer事件完美融合到了系统的IO机制中了。
IO和signal的统一。因为signal的出现对进程来说是完全随机的。所以当signal发生时,并不立即调用event的callback函数处理信号,而是设法通知系统的IO机制,让其返回,然后再统一和IO事件,以及timer一起处理。
事件主循环的流程如下
1) 开始 2) 调整系统时间与否 3) 根据timer heap中的event的最小超时时间计算系统IO demultiplexer的最大等待时间 4) 更新last wait time, 并清空time cache5) 调用系统I/O demultiplexer等待就绪I/O events6) 检查signal的激活标记,如果被设置,则检查激活signal event,并将event插入到激活链表中7) 将就绪的I/O event插入到激活链表中8) 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中9) 根据优先级处理激活链表中的就绪event,调用其回调函数执行事件处理(优先级越小越高)10) 结束
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。