首页 > Virtualization > QEMU下的eventfd机制及源代码分析

QEMU下的eventfd机制及源代码分析

Virtualization 2016-09-28

代码版本linux-3.16.37-git, qemu-v2.8-git

因为eventfd要与epoll配合使用,不清楚epoll的请参考epoll的linux内核工作机制

一. Eventfd在QEMU下的使用

Eventfd在QEMU下的使用以这三个函数为基础:event_notifier_init和event_notifier_get_fd,以及event_notifier_set_handler。
在event_notifier_init中,初始化EventNotifier:
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

e->rfd = e->wfd = ret;
ret即是此次的fd的值,man手册中有对应的介绍:
eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the
kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is
maintained by the kernel. This counter is initialized with the value specified in the argument initval.

而event_notifier_get_fd就是返回EventNotifier fd值而已。

event_notifier_set_handler则是将handler挂到AIO线程上,

 aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external,
 (IOHandler *)handler, NULL, e);

在aio_set_fd_handler下,每个AIO的调度单元是以node形式存在,

 node = find_aio_handler(ctx, fd);

 node->io_read = io_read;
 node->io_write = io_write;
 node->opaque = opaque;
 node->is_external = is_external;

在aio_epoll_update中,AioContext下的epoll_enabled被置1,获取ctl的值,然后将node->pfd.fd即eventfd加入epoll队列:

ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
//fd是在find_aio_handler下进行的设置
epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);

然后使用aio_notify通知AIO进行调度,而AioContext的通知功能本质上又是一个eventfd直接在userspace的应用,下面再提:

int event_notifier_set(EventNotifier *e)
{
 static const uint64_t value = 1;
 ssize_t ret;

do {
//所谓通知就是在ctx->notifier下写入eventfd的值
ret = write(e->wfd, &value, sizeof(value));
 } while (ret < 0 && errno == EINTR);

/* EAGAIN is fine, a read must be pending. */
 if (ret < 0 && errno != EAGAIN) {
 return -errno;
 }
 return 0;
}
//设置为已发送通知
atomic_mb_set(&ctx->notified, true);

看一下eventfd调用的过程:

iothread初始化过程中调用了iothread_complete,创建了iothread_run线程

 qemu_thread_create(&iothread->thread, thread_name, iothread_run,
 iothread, QEMU_THREAD_JOINABLE);

只要AIO thread没有被停掉,线程就会一直被epoll

 while (!atomic_read(&iothread->stopping)) {
 aio_poll(iothread->ctx, true);
 }

在aio_poll下,

AioHandler epoll_handler;
epoll_handler.pfd.fd = ctx->epollfd;
epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
npfd = 0;
add_pollfd(&epoll_handler);
ret = aio_epoll(ctx, pollfds, npfd, timeout);

在aio_epoll下,完成对event事件的调度

//epoll_wait主要是等待新的event,没有新的event才会等待
 ret = epoll_wait(ctx->epollfd, events,
 sizeof(events) / sizeof(events[0]),
 timeout);

 for (i = 0; i < ret; i++) {
 int ev = events[i].events;
 node = events[i].data.ptr;
 node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
 (ev & EPOLLOUT ? G_IO_OUT : 0) |
 (ev & EPOLLHUP ? G_IO_HUP : 0) |
 (ev & EPOLLERR ? G_IO_ERR : 0);
 }

此处只是将node进行了设置,但是仍然没有进行真正的调度执行,真正的执行是在aio_dispatch下的

if (!node->deleted &&
 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 aio_node_check(ctx, node->is_external) &&
 node->io_read) {
 node->io_read(node->opaque);

/* aio_notify() does not count as progress */
 if (node->opaque != &ctx->notifier) {
 progress = true;
 }
 }
 if (!node->deleted &&
 (revents & (G_IO_OUT | G_IO_ERR)) &&
 aio_node_check(ctx, node->is_external) &&
 node->io_write) {
 node->io_write(node->opaque);
 progress = true;
 }

回头看一下aio_notify的eventfd的使用,在aio_context_new下,

ret = event_notifier_init(&ctx->notifier, false);

即给eventfd写入了新值。

aio_notify_accept则负责接收它,表示自己已经收到对应的通知并完成处理。

len = read(e->rfd, buffer, sizeof(buffer));

 

二.  Eventfd在kernel下的机制

SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)创建eventfd.

在eventfd_file_create中,其中

struct eventfd_ctx {

struct kref kref;

wait_queue_head_t wqh;

/*

* Every time that a write(2) is performed on an eventfd, the

* value of the __u64 being written is added to "count" and a

* wakeup is performed on "wqh". A read(2) will return the "count"

* value to userspace, and will reset "count" to zero. The kernel

* side eventfd_signal() also, adds to the "count" counter and

* issue a wakeup.

*/

__u64 count;

unsigned int flags;

};

注释对count的意义说清楚了.

ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);

kref_init(&ctx->kref);

init_waitqueue_head(&ctx->wqh);

ctx->count = count;

ctx->flags = flags;

anon_inode_getfile获取匿名fd,重点在file->private_data = priv;另外还有eventfd_fops

static const struct file_operations eventfd_fops = {

#ifdef CONFIG_PROC_FS

.show_fdinfo= eventfd_show_fdinfo,

#endif

.release= eventfd_release,

.poll= eventfd_poll,

.read= eventfd_read,

.write= eventfd_write,

.llseek= noop_llseek,

};
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)

{

ssize_t res;

DECLARE_WAITQUEUE(wait, current);

spin_lock_irq(&ctx->wqh.lock);

*cnt = 0;

res = -EAGAIN;

if (ctx->count > 0)

res = 0;

else if (!no_wait) {

__add_wait_queue(&ctx->wqh, &wait);

for (;;) {

set_current_state(TASK_INTERRUPTIBLE);

if (ctx->count > 0) {

res = 0;

break;

}

if (signal_pending(current)) {

res = -ERESTARTSYS;

break;

}

spin_unlock_irq(&ctx->wqh.lock);

schedule();

spin_lock_irq(&ctx->wqh.lock);

}

__remove_wait_queue(&ctx->wqh, &wait);

__set_current_state(TASK_RUNNING);

}

if (likely(res == 0)) {

eventfd_ctx_do_read(ctx, cnt);

if (waitqueue_active(&ctx->wqh))

wake_up_locked_poll(&ctx->wqh, POLLOUT);

}

spin_unlock_irq(&ctx->wqh.lock);

return res;

}

static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,

loff_t *ppos)

{

struct eventfd_ctx *ctx = file->private_data;

ssize_t res;

__u64 ucnt;

DECLARE_WAITQUEUE(wait, current);

if (count < sizeof(ucnt))

return -EINVAL;

if (copy_from_user(&ucnt, buf, sizeof(ucnt)))

return -EFAULT;

if (ucnt == ULLONG_MAX)

return -EINVAL;

spin_lock_irq(&ctx->wqh.lock);

res = -EAGAIN;

if (ULLONG_MAX - ctx->count > ucnt)

res = sizeof(ucnt);

else if (!(file->f_flags & O_NONBLOCK)) {

__add_wait_queue(&ctx->wqh, &wait);

for (res = 0;;) {

set_current_state(TASK_INTERRUPTIBLE);

if (ULLONG_MAX - ctx->count > ucnt) {

res = sizeof(ucnt);

break;

}

if (signal_pending(current)) {

res = -ERESTARTSYS;

break;

}

spin_unlock_irq(&ctx->wqh.lock);

schedule();

spin_lock_irq(&ctx->wqh.lock);

}

__remove_wait_queue(&ctx->wqh, &wait);

__set_current_state(TASK_RUNNING);

}

if (likely(res > 0)) {

ctx->count += ucnt;

if (waitqueue_active(&ctx->wqh))

wake_up_locked_poll(&ctx->wqh, POLLIN);

}

spin_unlock_irq(&ctx->wqh.lock);

return res;

}

用户态对eventfd的控制是通过上面两个控制的,而kernel对userspace则是eventfd_signal

struct eventfd_ctx *eventfd_ctx_fileget(struct file *file)

{

if (file->f_op != &eventfd_fops)

return ERR_PTR(-EINVAL);

return eventfd_ctx_get(file->private_data);

}

__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)

{

unsigned long flags;

spin_lock_irqsave(&ctx->wqh.lock, flags);

if (ULLONG_MAX - ctx->count < n)

n = ULLONG_MAX - ctx->count;

ctx->count += n;

if (waitqueue_active(&ctx->wqh))

wake_up_locked_poll(&ctx->wqh, POLLIN);

spin_unlock_irqrestore(&ctx->wqh.lock, flags);

return n;

}

 


QEMU下的eventfd机制及源代码分析来自于OenHan,链接为:http://oenhan.com/qemu-eventfd-kvm
更多阅读
2条评论
  • Heavy

    2017-06-12 20:40

    博主你好,非常感谢你的分享,对我有很大启发,发现两个问题想跟你交流下:
    1. qemu用了glib的的一些api,在函数aio_set_fd_handler中调用了g_source_add_poll,查看API后发现这个函数是添加GPollFD到GSource中。
    Adds a file descriptor to the set of file descriptors polled for this source
    2. aio_epoll_update--->epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);只是将事件event和node->pfd.fd关联起来。
    以上两点是想跟你确认,我的理解是否正确?

    另外:qemu的这种做法,是将glib与epoll结合起来用么?如果单用其中一种是否也可以完成eventfd的机制。

    1. oenhan

      2017-06-15 21:23

      这两点是这么理解的。eventfd机制上比较容易实现,只要有线程轮询它即可,其中任何一种都可以实现。epoll和eventfd是非常契合的配置,GMainLoop应该是比较容易控制流程。