代码版本linux-3.16.37-git, qemu-v2.8-git

因为eventfd要与epoll配合使用,不清楚epoll的请参考epoll的linux内核工作机制

一. Eventfd在QEMU下的使用

Eventfd在QEMU下的使用以这三个函数为基础:event_notifier_init和event_notifier_get_fd,以及event_notifier_set_handler。
在event_notifier_init中,初始化EventNotifier:
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
e->rfd = e->wfd = ret;
ret即是此次的fd的值,man手册中有对应的介绍:
eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the
kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is
maintained by the kernel. This counter is initialized with the value specified in the argument initval.

而event_notifier_get_fd就是返回EventNotifier fd值而已。

event_notifier_set_handler则是将handler挂到AIO线程上,

aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external, (IOHandler *)handler, NULL, e);

在aio_set_fd_handler下,每个AIO的调度单元是以node形式存在,

node = find_aio_handler(ctx, fd);

node->io_read = io_read;
node->io_write = io_write;
node->opaque = opaque;
node->is_external = is_external;

在aio_epoll_update中,AioContext下的epoll_enabled被置1,获取ctl的值,然后将node->pfd.fd即eventfd加入epoll队列:

ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
//fd是在find_aio_handler下进行的设置
epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);

然后使用aio_notify通知AIO进行调度,而AioContext的通知功能本质上又是一个eventfd直接在userspace的应用,下面再提:

int event_notifier_set(EventNotifier *e)
{
    static const uint64_t value = 1;
    ssize_t ret;

    do {
        //所谓通知就是在ctx->notifier下写入eventfd的值
        ret = write(e->wfd, &value, sizeof(value));
    } while (ret < 0 && errno == EINTR);

    /* EAGAIN is fine, a read must be pending. */    if (ret < 0 && errno != EAGAIN) {
        return -errno;
    }
    return 0;
}
//设置为已发送通知
atomic_mb_set(&ctx->notified, true);

看一下eventfd调用的过程:

iothread初始化过程中调用了iothread_complete,创建了iothread_run线程

qemu_thread_create(&iothread->thread, thread_name, iothread_run, iothread, QEMU_THREAD_JOINABLE);

只要AIO thread没有被停掉,线程就会一直被epoll

while (!atomic_read(&iothread->stopping)) {
    aio_poll(iothread->ctx, true);
}

在aio_poll下,

AioHandler epoll_handler;
epoll_handler.pfd.fd = ctx->epollfd;
epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
npfd = 0;
add_pollfd(&epoll_handler);
ret = aio_epoll(ctx, pollfds, npfd, timeout);

在aio_epoll下,完成对event事件的调度

//epoll_wait主要是等待新的event,没有新的event才会等待
 ret = epoll_wait(ctx->epollfd, events,
 sizeof(events) / sizeof(events[0]),
 timeout);

 for (i = 0; i < ret; i++) {
     int ev = events[i].events;
     node = events[i].data.ptr;
     node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
         (ev & EPOLLOUT ? G_IO_OUT : 0) |
         (ev & EPOLLHUP ? G_IO_HUP : 0) |
         (ev & EPOLLERR ? G_IO_ERR : 0);
 }

此处只是将node进行了设置,但是仍然没有进行真正的调度执行,真正的执行是在aio_dispatch下的

if (!node->deleted &&
 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 aio_node_check(ctx, node->is_external) &&
 node->io_read) {
     node->io_read(node->opaque);

     /* aio_notify() does not count as progress */     if (node->opaque != &ctx->notifier) {
          progress = true;
     }
}
if (!node->deleted &&
 (revents & (G_IO_OUT | G_IO_ERR)) &&
 aio_node_check(ctx, node->is_external) &&
 node->io_write) {
      node->io_write(node->opaque);
      progress = true;
}

回头看一下aio_notify的eventfd的使用,在aio_context_new下,

ret = event_notifier_init(&ctx->notifier, false);

即给eventfd写入了新值。

aio_notify_accept则负责接收它,表示自己已经收到对应的通知并完成处理。

len = read(e->rfd, buffer, sizeof(buffer));

 

二. Eventfd在kernel下的机制

SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)创建eventfd.

在eventfd_file_create中,其中

struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
/*
* Every time that a write(2) is performed on an eventfd, the
* value of the __u64 being written is added to "count" and a
* wakeup is performed on "wqh". A read(2) will return the "count"
* value to userspace, and will reset "count" to zero. The kernel
* side eventfd_signal() also, adds to the "count" counter and
* issue a wakeup.
*/
__u64 count;
unsigned int flags;
};

注释对count的意义说清楚了.

ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
kref_init(&ctx->kref);
init_waitqueue_head(&ctx->wqh);
ctx->count = count;
ctx->flags = flags;

anon_inode_getfile获取匿名fd,重点在file->private_data = priv;另外还有eventfd_fops

static const struct file_operations eventfd_fops = {
#ifdef CONFIG_PROC_FS
.show_fdinfo= eventfd_show_fdinfo,
#endif
.release= eventfd_release,
.poll= eventfd_poll,
.read= eventfd_read,
.write= eventfd_write,
.llseek= noop_llseek,
};
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)

{
    ssize_t res;
    DECLARE_WAITQUEUE(wait, current);
    spin_lock_irq(&ctx->wqh.lock);
    *cnt = 0;
    res = -EAGAIN;
    if (ctx->count > 0)
        res = 0;
    else if (!no_wait) {
        __add_wait_queue(&ctx->wqh, &wait);
        for (;;) {
            set_current_state(TASK_INTERRUPTIBLE);
            if (ctx->count > 0) {
                res = 0;
                break;
            }

            if (signal_pending(current)) {
                res = -ERESTARTSYS;
                break;
            }

            spin_unlock_irq(&ctx->wqh.lock);
            schedule();
            spin_lock_irq(&ctx->wqh.lock);
        }
        __remove_wait_queue(&ctx->wqh, &wait);
        __set_current_state(TASK_RUNNING);
    }

    if (likely(res == 0)) {
        eventfd_ctx_do_read(ctx, cnt);
        if (waitqueue_active(&ctx->wqh))
            wake_up_locked_poll(&ctx->wqh, POLLOUT);
    }

    spin_unlock_irq(&ctx->wqh.lock);
    return res;
}

static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)
{

    struct eventfd_ctx *ctx = file->private_data;
    ssize_t res;
    __u64 ucnt;

    DECLARE_WAITQUEUE(wait, current);
    if (count < sizeof(ucnt))
        return -EINVAL;

    if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
        return -EFAULT;

    if (ucnt == ULLONG_MAX)
        return -EINVAL;

    spin_lock_irq(&ctx->wqh.lock);
    res = -EAGAIN;

    if (ULLONG_MAX - ctx->count > ucnt)
        res = sizeof(ucnt);
    else if (!(file->f_flags & O_NONBLOCK)) {
        __add_wait_queue(&ctx->wqh, &wait);
        for (res = 0;;) {
            set_current_state(TASK_INTERRUPTIBLE);
            if (ULLONG_MAX - ctx->count > ucnt) {
                res = sizeof(ucnt);
                break;
            }

            if (signal_pending(current)) {
                res = -ERESTARTSYS;
                break;
            }

            spin_unlock_irq(&ctx->wqh.lock);
            schedule();
            spin_lock_irq(&ctx->wqh.lock);
        }

        __remove_wait_queue(&ctx->wqh, &wait);
        __set_current_state(TASK_RUNNING);
    }

    if (likely(res > 0)) {
        ctx->count += ucnt;
        if (waitqueue_active(&ctx->wqh))
            wake_up_locked_poll(&ctx->wqh, POLLIN);
    }

    spin_unlock_irq(&ctx->wqh.lock);
    return res;
}

用户态对eventfd的控制是通过上面两个控制的,而kernel对userspace则是eventfd_signal

struct eventfd_ctx *eventfd_ctx_fileget(struct file *file)
{
    if (file->f_op != &eventfd_fops)
        return ERR_PTR(-EINVAL);
    return eventfd_ctx_get(file->private_data);
}

__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{
    unsigned long flags;
    spin_lock_irqsave(&ctx->wqh.lock, flags);
    if (ULLONG_MAX - ctx->count < n)
        n = ULLONG_MAX - ctx->count;

    ctx->count += n;
    if (waitqueue_active(&ctx->wqh))
        wake_up_locked_poll(&ctx->wqh, POLLIN);
    spin_unlock_irqrestore(&ctx->wqh.lock, flags);

    return n;
}

QEMU下的eventfd机制及源代码分析来自于OenHan

链接为:http://oenhan.com/qemu-eventfd-kvm

2 thoughts on “QEMU下的eventfd机制及源代码分析”

  1. 博主你好,非常感谢你的分享,对我有很大启发,发现两个问题想跟你交流下:
    1. qemu用了glib的的一些api,在函数aio_set_fd_handler中调用了g_source_add_poll,查看API后发现这个函数是添加GPollFD到GSource中。
    Adds a file descriptor to the set of file descriptors polled for this source
    2. aio_epoll_update—>epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);只是将事件event和node->pfd.fd关联起来。
    以上两点是想跟你确认,我的理解是否正确?

    另外:qemu的这种做法,是将glib与epoll结合起来用么?如果单用其中一种是否也可以完成eventfd的机制。

    1. @HEAVY 这两点是这么理解的。eventfd机制上比较容易实现,只要有线程轮询它即可,其中任何一种都可以实现。epoll和eventfd是非常契合的配置,GMainLoop应该是比较容易控制流程。

发表回复