一. epoll用户态使用规范

epoll有2种工作方式:LT和ET。

LT(level triggered,水平触发)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET (edge-triggered,边缘触发)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

epoll相关的系统调用有3个:epoll_create, epoll_ctl和epoll_wait。在头文件<sys/epoll.h>

1. int epoll_create(int size)

创建一个epoll句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll的事件注册函数,注册要监听的事件类型,比如某个被监控的fd被读了,触发epoll_wait.

OP的值有:

#define EPOLL_CTL_ADD 1

#define EPOLL_CTL_DEL 2

#define EPOLL_CTL_MOD 3

epoll_event结构体则是:

typedef union epoll_data {

     void *ptr;

     int fd;

     __uint32_t u32;

     __uint64_t u64;

   } epoll_data_t; 

struct epoll_event {

     __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */   

};

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

用于轮询I/O事件的发生,其中epfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait函数操作成功之后,events里面将储存所有的读写事件。maxevents是当前需要监听的所有socket句柄数。最后一个timeout参 数指示 epoll_wait的超时条件,为0时表示马上返回;为-1时表示函数会一直等下去直到有事件返回;为任意正整数时表示等这么长的时间,如果一直没有事 件,则会返回。一般情况下如果网络主循环是单线程的话,可以用-1来等待,这样可以保证一些效率,如果是和主循环在同一个线程的话,则可以用0来保证主循 环的效率.epoll_wait返回之后,应该进入一个循环,以便遍历所有的事件。

二. epool在内核中的代码分析

代码版本:linux-3.16.37-git.

1. epoll_create

SYSCALL_DEFINE1(epoll_create, int, size)和SYSCALL_DEFINE1(epoll_create1, int, flags)没什么区别,epoll_create的入参的size值在内核就直接被废掉了,变成了epoll_create1(0)。在epoll_create1下,引入了一些结构体eventpoll和epitem:

struct eventpoll {

spinlock_t lock;

/* This mutex is used to ensure that files are not removed

* while epoll is using them. This is held during the event

* collection loop, the file cleanup path, the epoll file exit

* code and the ctl operations.

        */
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

/* List of ready file descriptors */
struct list_head rdllist;

/* RB tree root used to store monitored fd structs */
struct rb_root rbr;

/*

* This is a single linked list that chains all the "struct epitem" that

* happened while transferring ready events to userspace w/out

* holding ->lock.

*/
struct epitem *ovflist;

/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;

/* The user that created the eventpoll descriptor */
struct user_struct *user;

struct file *file;

/* used to optimize loop detection check */
int visited;

struct list_head visited_list_link;

};

struct epitem {

union {

/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;

/* Used to free the struct epitem */
struct rcu_head rcu;

};

/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;

/*

* Works together "struct eventpoll"->ovflist in keeping the

* single linked chain of items.

*/
struct epitem *next;

/* The file descriptor information this item refers to */
struct epoll_filefd ffd;

/* Number of active wait queue attached to poll operations */
int nwait;

/* List containing poll wait queues */
struct list_head pwqlist;

/* The "container" of this item */
struct eventpoll *ep;

/* List header used to link this item to the "struct file" items list */
struct list_head fllink;

/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;

/* The structure that describe the interested events and the source fd */
struct epoll_event event;

};

先不管这些结构体,先看eventpoll结构体的初始化

error = ep_alloc(&ep);


//获取一个fd使用权

fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

//获取file并填充该结构体

file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,

O_RDWR | (flags & O_CLOEXEC));

ep->file = file;

//合并fd和file到一体

fd_install(fd, file);

上面仅仅提到fd的获取而已,需要注意的是eventpoll_fops

static const struct file_operations eventpoll_fops = {

.release= ep_eventpoll_release,

.poll= ep_eventpoll_poll,

.llseek= noop_llseek,

};

2. epoll_ctl

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event)

//copy到epds中

copy_from_user(&epds, event, sizeof(struct epoll_event))

f = fdget(epfd);

tf = fdget(fd);

//此处表现出来的是对于epool而言,存储在file下的private_data中

ep = f.file->private_data;

if (op == EPOLL_CTL_ADD)

//将目标fd链接到tfile_check_list列表中

    list_add(&tf.file->f_tfile_llink, &tfile_check_list);

/*

* Try to lookup the file inside our RB tree, Since we grabbed "mtx"

* above, we can be sure to be able to use the item looked up by

* ep_find() till we release the mutex.

*/
epi = ep_find(ep, tf.file, fd);

switch (op) {

case EPOLL_CTL_ADD:

if (!epi) {

epds.events |= POLLERR | POLLHUP;

error = ep_insert(ep, &epds, tf.file, fd, full_check);

} 

if (full_check)

clear_tfile_check_list();

break;

case EPOLL_CTL_DEL:

if (epi)

error = ep_remove(ep, epi);

break;

case EPOLL_CTL_MOD:

if (epi)  {

epds.events |= POLLERR | POLLHUP;

error = ep_modify(ep, epi, &epds);

                }

break;

先看ep_insert函数:

epi->ep = ep;

//将target file和target fd填入到epi下的ffd中

ep_set_ffd(&epi->ffd, tfile, fd);

epi->event = *event;

epi->nwait = 0;

epi->next = EP_UNACTIVE_PTR;

ep_create_wakeup_source注册了一个唤醒源...

太复杂了,看不下去了。

3.epoll_wait

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,int, maxevents, int, timeout)

在ep_poll下,

//初始化一个等待entry,并将其加入到ep下

init_waitqueue_entry(&wait, current);

__add_wait_queue_exclusive(&ep->wq, &wait);

//下面的for循环就是epoll等待的结点了

for (;;) {

/*

* We don't want to sleep if the ep_poll_callback() sends us

 * a wakeup in between. That's why we set the task state

 * to TASK_INTERRUPTIBLE before doing the checks.

 */
set_current_state(TASK_INTERRUPTIBLE);

//收到新的ep event了或者超时

if (ep_events_available(ep) || timed_out)

break;

if (signal_pending(current)) {

res = -EINTR;

break;

}

spin_unlock_irqrestore(&ep->lock, flags);


//下面的函数负责进行sleep,当前task睡到超时,如果没有超时,

//则一般是为ep_poll_callback给中断进行了唤醒,继续进行循环处理。

if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))

timed_out = 1;

spin_lock_irqsave(&ep->lock, flags);


}

4.ep_poll_callback的通知

在看一下ep_pqueue队列

struct ep_pqueue {

poll_table pt;

struct epitem *epi;

};

typedef struct poll_table_struct {

poll_queue_proc _qproc;

unsigned long _key;

} poll_table;

在ep_insert中


epq.epi = epi;

init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

revents = ep_item_poll(epi, &epq.pt);

static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)

{

pt->_key = epi->event.events;

//poll最终指向还是eventpoll_fops下的ep_eventpoll_poll

return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;

}

在ep_eventpoll_poll下,调用了poll_wait(file, &ep->poll_wait, wait),即是:

if (p && p->_qproc && wait_address)

p->_qproc(filp, wait_address, p);

而_qproc则为ep_ptable_queue_proc,其下的wait_queue_head_t就来自于&ep->poll_wait,将ep_poll_callback加到队列里面

init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

等ep_poll_callback被调度后,通过唤醒源将ep_poll唤醒。

 


epoll的linux内核工作机制来自于OenHan

链接为:https://oenhan.com/epoll-linux-kernel

发表回复