libevent cpp 5 封装epoll及epoll内核实现原理






	: event_base()
	if ((_epfd = epoll_create(1)) == -1)

	_epevents = new struct epoll_event[_nfds];

typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */

事件的添加比较容易,主要是通过epoll_ctl接口,并设置相应的参数就能够进行添加和删除,在事件分发时,epoll接口调用的是epoll_wait函数来等待事件通知,有事件时会将一个struct epoll_event的数组传回,然后根据返回值就可以进行相应的事件处理,总的来说epoll的使用非常方便,但是其实现原理却不这么容易。


首先需要注意的是eventpoll是一个文件系统,作为内核的一个模块需要有着相应的初始化以及文件系统的注册操作,这些都是由eventpoll_init进行处理的,这里值得注意的是,在初始化的时候eventpoll虚拟文件系统在内核中创建了两块高速缓存,分别用于分配struct epitemstruct eppoll_entry结构,这两个结构是内核中进行epoll相关事件管理非常中要的结构,基本上struct epitem与要处理的fd相对应,在内核中使用高速缓存进行分配,可以极大的提升执行的效率。

static int __init eventpoll_init(void)
    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0,
	pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0,
    error = register_filesystem(& eventpoll_fs_type);
	eventpoll_mnt = kern_mount(& eventpoll_fs_type);



首先用户程序调用epoll_create会在内核中创建一个struct eventpoll *ep实例,用来管理相关的epoll事件,因为eventpoll是内核中的一个虚拟文件系统,所以实际上是创建了一个文件,并分配给系统中没有使用文件描述符进行关联。这里值得注意的是在文件的一个成员指针private_data用来保存ep,这样就能够将ep和eventpoll文件真正的关联在一起,以后通过文件描述符就可以获取到这个结构。

 * It opens an eventpoll file descriptor by suggesting a storage of "size"
 * file descriptors. The size parameter is just an hint about how to size
 * data structures. It won't prevent the user to store more than "size"
 * file descriptors inside the epoll interface. It is the kernel part of
 * the userspace epoll_create(2).
asmlinkage long sys_epoll_create(int size)
	struct inode *inode;
	struct file *file;
	 * Creates all the items needed to setup an eventpoll file. That is,
	 * a file structure, and inode and a free file descriptor.
	error = ep_getfd(& fd, & inode, & file);

	/* Setup the file internal data structure ( "struct eventpoll" ) */
	error = ep_file_init(file, hashbits);
	return fd;

static int ep_file_init(struct file *file, unsigned int hashbits)
	struct eventpoll *ep;
    // new *ep
	error = ep_init(ep, hashbits);
	file->private_data = ep;
	return 0;

static int ep_init(struct eventpoll *ep, unsigned int hashbits)
	init_waitqueue_head(& ep->wq);
	init_waitqueue_head(& ep->poll_wait);
	INIT_LIST_HEAD(& ep->rdllist);
	return 0;

如上一系列的初始化主要就是创建了struct eventpoll 对象,并初始化了其各成员,如wqpoll_waitrdlist等分别对应着不同的队列或链表,内部实现其实都是链表。其中比较重要的如wqpoll_wait都是等待队列,为什么会有两个等待队列呢,首先wq这个等待队列保存的是eventpoll管理的事件的等待队列入口,而后者poll_wait则是用于管理eventpoll本身,因为eventpoll本身是一个虚拟文件系统,并且还实现了相应的f_op->poll()函数,说明eventpoll也能够被poll或者epoll所管理,所以其自身也需要一个等待队列管理自己。

 * This structure is stored inside the "private_data" member of the file
 * structure and rapresent the main data sructure for the eventpoll
 * interface.
struct eventpoll {
	wait_queue_head_t wq;           /* Wait queue used by sys_epoll_wait() */
	wait_queue_head_t poll_wait;    /* Wait queue used by file->poll() */

	struct list_head rdllist;       /* List of ready file descriptors */
	unsigned int hashbits;          /* Size of the hash */
	char *hpages[EP_MAX_HPAGES];    /* Pages for the "struct epitem" hash */


读写事件的添加和删除都是通过epoll_ctl来实现的,根据不同的参数提供不同的操作,对于添加和删除实际上是分别调用ep_insert等内部函数来实现的,当然这里也会有将用户态数据struct epoll_event *拷贝到内核的操作,epoll中这个拷贝相对poll和select内容要少的多。从如下可以看到,先根据传入的eventpoll文件的描述符epfd来获取管理结构ep,然后根据ep来将对应事件加入。

 * The following function implements the controller interface for
 * the eventpoll file that enables the insertion/removal/change of
 * file descriptors inside the interest set.  It represents
 * the kernel part of the user space epoll_ctl(2).
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
	struct file *file, *tfile;
	struct eventpoll *ep;
	struct epoll_event epds;
	if (copy_from_user(& epds, event, sizeof(struct epoll_event)))

	file = fget(epfd);
	tfile = fget(fd);
	ep = file->private_data;

	switch (op) {
        case EPOLL_CTL_ADD:
       |= POLLERR | POLLHUP;
                error = ep_insert(ep, & epds, tfile, fd);
        case EPOLL_CTL_DEL:
        case EPOLL_CTL_MOD:

传入内核的事件究竟以和中结构在内核中保存呢,上面提到传入的读写事件event通过复制内核并传给了ep_insert函数作为event参数。在内核epoll的实现中,实际上是以一个struct epitem结构来保存读写事件信息,这个结构如下所示,其中有着很多的链表入口,表示着这个结构可以作为多种不同链表的节点,这些链表分别包括由eventpoll管理的链表,事件准备好的链表,由文件管理的链表,以及最终传给用户空间的队列。

 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the hash.
struct epitem {
	struct list_head llink;     // 由eventpoll管理
	struct list_head rdllink;   // 准备好了的事件链表
	struct list_head fllink;    // 由epitem对应的文件管理
	struct list_head txlink;    // 用于transfer队列,即最终传给用户空间的队列

	struct list_head pwqlist;   // 用于管理与epitem有关的Linux进程等待队列的管理器 eppoll_entry

	int nwait;              /* Number of active wait queue attached to poll operations */
	struct eventpoll *ep;   /* The "container" of this item */
	int fd;                 /* The file descriptor this item refers to */
	struct file *file;      /* The file this item refers to */
	struct epoll_event event;


/* Wait structure used by the poll hooks */
struct eppoll_entry {
	struct list_head llink;     /* List header used to link this structure to the "struct epitem" */
	void *base;                 /* The "base" pointer is set to the container "struct epitem" */
	wait_queue_t wait;          // 进程等待队列
	wait_queue_head_t *whead;   // 将进程等待队列链接起来的链表入口



 * structures and helpers for f_op->poll implementations
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {
	poll_queue_proc qproc;
} poll_table;

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
	if (p & &  wait_address)
		p->qproc(filp, wait_address, p);

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
	pt->qproc = qproc;


static int ep_insert(struct eventpoll *ep, struct epoll_event *event,struct file *tfile, int fd)
	int error, revents, pwake = 0;
	unsigned long flags;
	struct epitem *epi;
	struct ep_pqueue epq;

	// 从高速缓存分配epitem并初始化相关链表入口及成员
    // epi = EPI_MEM_ALLOC()
    // 各个link,及file fd event nwait等

	epq.epi = epi;
	init_poll_funcptr(&, ep_ptable_queue_proc); // 注册epoll的poll机制回调函数 
	revents = tfile->f_op->poll(tfile, &;      // poll最终会调用注册的回调函数

	// 将epi加入到文件tfile控制的链表中,对应fllink
	// 将epi加入到eventpoll控制的哈希表中

	/* 如果epi对应的传入的事件已经发生 */
	if ((revents &  event->events) & &  !EP_IS_LINKED(& epi->rdllink)) {
		list_add_tail(& epi->rdllink, & ep->rdllist);

		/* Notify waiting tasks that events are available */
		if (waitqueue_active(& ep->wq))
			wake_up(& ep->wq);
		if (waitqueue_active(& ep->poll_wait))


上面谈到与eventpoll管理器有关的等待队列,这个队列却与ep管理的文件是没有关系的,什么意思呢,当ep管理的文件发生读写时,上述ep->wq队列是不会被唤醒的,那么与文件读写有关的等待队列到底是哪个队列呢。实际的文件等待队列是通过如下的poll回调函数ep_ptable_queue_proc设置的。如下设置中最关键的又涉及到了之前提到的等待队列管理结构struct eppoll_entry,而pwq作为等待队列的管理结构也是从内核高速缓存分配的。

 * This is the callback that is used to add our wait queue to the
 * target file wakeup lists.
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
	struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt);
	struct eppoll_entry *pwq;

	if (epi->nwait >= 0 & &  (pwq = PWQ_MEM_ALLOC())) {
		init_waitqueue_func_entry(& pwq->wait, ep_poll_callback);
		pwq->whead = whead;
		pwq->base = epi;
		add_wait_queue(whead, & pwq->wait);
		list_add_tail(& pwq->llink, & epi->pwqlist);
	} else {
		/* We have to signal that an error occurred */
		epi->nwait = -1;

这里比较重要的一个问题就是等待队列pwq->wait应该加到哪里,还有就是pwq->wait中有些什么内容。先看看等待队列中有些什么内容,这里给等待队列初始化了一个回调函数ep_poll_callback,这个回调函数会在等待队列被唤醒时被调用,但是pwq->wait中关于进程的信息现在其实是没有的。另外一个问题pwq->wait被加到哪里去了,这里传入了一个wait_queue_head_t *whead参数,这个参数追本溯源其实是文件的等待队列,如下为以管道文件pipe为例的调用经过。

static unsigned int pipe_poll(struct file *filp, poll_table *wait)
	struct inode *inode = filp->f_dentry->d_inode;
	poll_wait(filp, PIPE_WAIT(*inode), wait);




asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events,
			       int maxevents, int timeout)
	file = fget(epfd);
	ep = file->private_data;

	/* Time to fish for events ... */
	error = ep_poll(ep, events, maxevents, timeout);


static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
	wait_queue_t wait;
    // 获取事件jtimeout
	if (list_empty(& ep->rdllist)) { // 没有任何准备好的事件
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		init_waitqueue_entry(& wait, current);
		add_wait_queue(& ep->wq, & wait);

		for (;;) {
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			if (!list_empty(& ep->rdllist) || !jtimeout) break;
			if (signal_pending(current)) { res = -EINTR; break; }
			jtimeout = schedule_timeout(jtimeout); // schedule调度,释放cpu
		remove_wait_queue(& ep->wq, & wait);
    ep_events_transfer(ep, events, maxevents))
	return res;



struct __wait_queue {
	unsigned int flags;
	struct task_struct * task;
	wait_queue_func_t func;
	struct list_head task_list;


int default_wake_function(wait_queue_t *curr, unsigned mode, int sync)
	task_t *p = curr->task;
	return try_to_wake_up(p, mode, sync);

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int sync)
	struct list_head *tmp, *next;

	list_for_each_safe(tmp, next, & q->task_list) {
		wait_queue_t *curr;
		unsigned flags;
		curr = list_entry(tmp, wait_queue_t, task_list);
		flags = curr->flags;
		if (curr->func(curr, mode, sync) & & 
		    (flags &  WQ_FLAG_EXCLUSIVE) & & 




 * This is the callback that is passed to the wait queue wakeup
 * machanism. It is called by the stored file descriptors when they
 * have events to report.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync)
	struct epitem *epi = EP_ITEM_FROM_WAIT(wait);
	struct eventpoll *ep = epi->ep;

	/* If this file is already in the ready list we exit soon */
	if (EP_IS_LINKED(& epi->rdllink))
		goto is_linked;

	list_add_tail(& epi->rdllink, & ep->rdllist);

	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
	 * wait list.
	if (waitqueue_active(& ep->wq))
		wake_up(& ep->wq);
	if (waitqueue_active(& ep->poll_wait))

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(& psw, & ep->poll_wait);

	return 1;


上面知道现在等待进程已经被唤醒,唤醒之后继续在ep_poll函数中执行下一次循环,这次判断ep->rdllist是不为空的,因为在上面唤醒函数ep_poll_callback中已经将准备好的epi加入到了准备好的队列中了,所以这次有数据退出循环,先把当前进程从等待队列中移除,重新设置进程状态为可调度,再之后执行ep_events_transfer才真正的将获取到的事件传送给用户程序,这个events是用户程序的struct event_poll 指针。

// ep_poll
		for (;;) {
			if (!list_empty(& ep->rdllist) || !jtimeout) break;
			if (signal_pending(current)) { res = -EINTR; break; }
			jtimeout = schedule_timeout(jtimeout); // schedule调度,释放cpu
		remove_wait_queue(& ep->wq, & wait);
    ep_events_transfer(ep, events, maxevents))
	return res;

/* Perform the transfer of events to user space. */
static int ep_events_transfer(struct eventpoll *ep,
			      struct epoll_event __user *events, int maxevents)
	int eventcnt = 0;
	struct list_head txlist;

	INIT_LIST_HEAD(& txlist);

	/* Collect/extract ready items 收集item的时候将epi加入txlist,然后会从rdllist移除 */
	if (ep_collect_ready_items(ep, & txlist, maxevents) > 0) {
		/* Build result set in userspace */
		eventcnt = ep_send_events(ep, & txlist, events);

		/* Reinject ready items into the ready list */
		ep_reinject_items(ep, & txlist);

	return eventcnt;

传送事件简单来说就是首先收集ep中的准备好的链表,然后用txlist链表来汇总,然后遍历txlist来将数据都复制到用户空间指针struct event_poll *event




在内核中epoll的两种模式的实现其实比较容易,主要区别就是判断在将事件传给用户程序之后是否继续让该epitem留在准备好的队列中,上面ep_events_transfer中,使用ep_send_events传给用户程序事件之后有一个重新插入的操作,也就是ep_reinject_items函数。这个函数在将epi从传送队列中一个个解下来的同时会判断是否要重新插入rdllist,关键就在于判断的条件,首先判断epi->llink,即判断该epi是否还在eventpoll管理器中,然后判断没有设置EPOLLET,EPOLLET也就是边缘触发标志(Edge Triggering),然后判断返回的事件中包括注册的事件,最后判断这个epi还不在准备队列中,全部满足的话就将其重新加入rdllist中,这一过程说明如果没有在用户程序中主动设置EPOLLET标志的话,使用的是水平触发,epi会重新加入到rdllist中,然后再次执行ep_poll的时候就不会进入for循环了,因为rdllist不空,需要将其中的事件传给用户程序。那么问题来了,水平触发的话什么时候rdllist中才会删除该epi呢,其实还是看这个判断条件,删除的话在前面的收集操作ep_collect_ready_items中会从rdllist中删除epi,现在只要poll操作返回的事件revents中不包含epi注册的事件epi->,epi就不会再次加入rdllist中,也就是说文件描述符不再可读或可写rdllist最终会为空。这里的一个问题就是,只要用户程序读取文件描述符的速度追不上其它程序往里面写入的数据的话,epoll机制就会一直不停的通知用户程序文件描述符可读。如果设置成边缘触发的话就会在通知一次之后,等到下次其他程序写入的时候epoll才会通知当前进程可以读了,避免了不停地通知。

 * Walk through the transfer list we collected with ep_collect_ready_items()
 * and, if 1) the item is still "alive" 2) its event set is not empty 3) it's
 * not already linked, links it to the ready list. Same as above, we are holding
 * "sem" so items cannot vanish underneath our nose.
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
	while (!list_empty(txlist)) { // 遍历传输链表
		epi = list_entry(txlist->next, struct epitem, txlink);
		EP_LIST_DEL(& epi->txlink); // 从传输链表中删除

		 * If the item is no more linked to the interest set, we don't
		 * have to push it inside the ready list because the following
		 * ep_release_epitem() is going to drop it. Also, if the current
		 * item is set to have an Edge Triggered behaviour, we don't have
		 * to push it back either.
		if (EP_IS_LINKED(& epi->llink) & &  !(epi-> &  EPOLLET) & & 
		    (epi->revents &  epi-> & &  !EP_IS_LINKED(& epi->rdllink)) {
			list_add_tail(& epi->rdllink, & ep->rdllist);
	if (ricnt)  
		// 存在再次加入准备好了的队列的epitem,所以rdllist有数据,唤醒 ep->wq 


