QEMU下的eventfd机制及源代码分析
代码版本linux-3.16.37-git, qemu-v2.8-git
一. Eventfd在QEMU下的使用
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); e->rfd = e->wfd = ret;
eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the
kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is
maintained by the kernel. This counter is initialized with the value specified in the argument initval.
而event_notifier_get_fd就是返回EventNotifier fd值而已。
event_notifier_set_handler则是将handler挂到AIO线程上,
aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external, (IOHandler *)handler, NULL, e);
在aio_set_fd_handler下,每个AIO的调度单元是以node形式存在,
node = find_aio_handler(ctx, fd); node->io_read = io_read; node->io_write = io_write; node->opaque = opaque; node->is_external = is_external;
在aio_epoll_update中,AioContext下的epoll_enabled被置1,获取ctl的值,然后将node->pfd.fd即eventfd加入epoll队列:
ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; //fd是在find_aio_handler下进行的设置 epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);
然后使用aio_notify通知AIO进行调度,而AioContext的通知功能本质上又是一个eventfd直接在userspace的应用,下面再提:
int event_notifier_set(EventNotifier *e) { static const uint64_t value = 1; ssize_t ret; do { //所谓通知就是在ctx->notifier下写入eventfd的值 ret = write(e->wfd, &value, sizeof(value)); } while (ret < 0 && errno == EINTR); /* EAGAIN is fine, a read must be pending. */ if (ret < 0 && errno != EAGAIN) { return -errno; } return 0; } //设置为已发送通知 atomic_mb_set(&ctx->notified, true);
看一下eventfd调用的过程:
在iothread初始化过程中调用了iothread_complete,创建了iothread_run线程
qemu_thread_create(&iothread->thread, thread_name, iothread_run, iothread, QEMU_THREAD_JOINABLE);
只要AIO thread没有被停掉,线程就会一直被epoll
while (!atomic_read(&iothread->stopping)) { aio_poll(iothread->ctx, true); }
在aio_poll下,
AioHandler epoll_handler; epoll_handler.pfd.fd = ctx->epollfd; epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; npfd = 0; add_pollfd(&epoll_handler); ret = aio_epoll(ctx, pollfds, npfd, timeout);
在aio_epoll下,完成对event事件的调度
//epoll_wait主要是等待新的event,没有新的event才会等待 ret = epoll_wait(ctx->epollfd, events, sizeof(events) / sizeof(events[0]), timeout); for (i = 0; i < ret; i++) { int ev = events[i].events; node = events[i].data.ptr; node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) | (ev & EPOLLOUT ? G_IO_OUT : 0) | (ev & EPOLLHUP ? G_IO_HUP : 0) | (ev & EPOLLERR ? G_IO_ERR : 0); }
此处只是将node进行了设置,但是仍然没有进行真正的调度执行,真正的执行是在aio_dispatch下的
if (!node->deleted && (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && aio_node_check(ctx, node->is_external) && node->io_read) { node->io_read(node->opaque); /* aio_notify() does not count as progress */ if (node->opaque != &ctx->notifier) { progress = true; } } if (!node->deleted && (revents & (G_IO_OUT | G_IO_ERR)) && aio_node_check(ctx, node->is_external) && node->io_write) { node->io_write(node->opaque); progress = true; }
回头看一下aio_notify的eventfd的使用,在aio_context_new下,
ret = event_notifier_init(&ctx->notifier, false);
即给eventfd写入了新值。
aio_notify_accept则负责接收它,表示自己已经收到对应的通知并完成处理。
len = read(e->rfd, buffer, sizeof(buffer));
二. Eventfd在kernel下的机制
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)创建eventfd.
在eventfd_file_create中,其中
struct eventfd_ctx { struct kref kref; wait_queue_head_t wqh; /* * Every time that a write(2) is performed on an eventfd, the * value of the __u64 being written is added to "count" and a * wakeup is performed on "wqh". A read(2) will return the "count" * value to userspace, and will reset "count" to zero. The kernel * side eventfd_signal() also, adds to the "count" counter and * issue a wakeup. */ __u64 count; unsigned int flags; };
注释对count的意义说清楚了.
ctx = kmalloc(sizeof(*ctx), GFP_KERNEL); kref_init(&ctx->kref); init_waitqueue_head(&ctx->wqh); ctx->count = count; ctx->flags = flags;
anon_inode_getfile获取匿名fd,重点在file->private_data = priv;另外还有eventfd_fops
static const struct file_operations eventfd_fops = { #ifdef CONFIG_PROC_FS .show_fdinfo= eventfd_show_fdinfo, #endif .release= eventfd_release, .poll= eventfd_poll, .read= eventfd_read, .write= eventfd_write, .llseek= noop_llseek, };
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt) { ssize_t res; DECLARE_WAITQUEUE(wait, current); spin_lock_irq(&ctx->wqh.lock); *cnt = 0; res = -EAGAIN; if (ctx->count > 0) res = 0; else if (!no_wait) { __add_wait_queue(&ctx->wqh, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (ctx->count > 0) { res = 0; break; } if (signal_pending(current)) { res = -ERESTARTSYS; break; } spin_unlock_irq(&ctx->wqh.lock); schedule(); spin_lock_irq(&ctx->wqh.lock); } __remove_wait_queue(&ctx->wqh, &wait); __set_current_state(TASK_RUNNING); } if (likely(res == 0)) { eventfd_ctx_do_read(ctx, cnt); if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLOUT); } spin_unlock_irq(&ctx->wqh.lock); return res; } static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos) { struct eventfd_ctx *ctx = file->private_data; ssize_t res; __u64 ucnt; DECLARE_WAITQUEUE(wait, current); if (count < sizeof(ucnt)) return -EINVAL; if (copy_from_user(&ucnt, buf, sizeof(ucnt))) return -EFAULT; if (ucnt == ULLONG_MAX) return -EINVAL; spin_lock_irq(&ctx->wqh.lock); res = -EAGAIN; if (ULLONG_MAX - ctx->count > ucnt) res = sizeof(ucnt); else if (!(file->f_flags & O_NONBLOCK)) { __add_wait_queue(&ctx->wqh, &wait); for (res = 0;;) { set_current_state(TASK_INTERRUPTIBLE); if (ULLONG_MAX - ctx->count > ucnt) { res = sizeof(ucnt); break; } if (signal_pending(current)) { res = -ERESTARTSYS; break; } spin_unlock_irq(&ctx->wqh.lock); schedule(); spin_lock_irq(&ctx->wqh.lock); } __remove_wait_queue(&ctx->wqh, &wait); __set_current_state(TASK_RUNNING); } if (likely(res > 0)) { ctx->count += ucnt; if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLIN); } spin_unlock_irq(&ctx->wqh.lock); return res; }
用户态对eventfd的控制是通过上面两个控制的,而kernel对userspace则是eventfd_signal
struct eventfd_ctx *eventfd_ctx_fileget(struct file *file) { if (file->f_op != &eventfd_fops) return ERR_PTR(-EINVAL); return eventfd_ctx_get(file->private_data); } __u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n) { unsigned long flags; spin_lock_irqsave(&ctx->wqh.lock, flags); if (ULLONG_MAX - ctx->count < n) n = ULLONG_MAX - ctx->count; ctx->count += n; if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLIN); spin_unlock_irqrestore(&ctx->wqh.lock, flags); return n; }
QEMU下的eventfd机制及源代码分析来自于OenHan
链接为:https://oenhan.com/qemu-eventfd-kvm
博主你好,非常感谢你的分享,对我有很大启发,发现两个问题想跟你交流下:
1. qemu用了glib的的一些api,在函数aio_set_fd_handler中调用了g_source_add_poll,查看API后发现这个函数是添加GPollFD到GSource中。
Adds a file descriptor to the set of file descriptors polled for this source
2. aio_epoll_update—>epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);只是将事件event和node->pfd.fd关联起来。
以上两点是想跟你确认,我的理解是否正确?
另外:qemu的这种做法,是将glib与epoll结合起来用么?如果单用其中一种是否也可以完成eventfd的机制。
@HEAVY 这两点是这么理解的。eventfd机制上比较容易实现,只要有线程轮询它即可,其中任何一种都可以实现。epoll和eventfd是非常契合的配置,GMainLoop应该是比较容易控制流程。