QEMU下的eventfd机制及源代码分析
代码版本linux-3.16.37-git, qemu-v2.8-git
一. Eventfd在QEMU下的使用
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); e->rfd = e->wfd = ret;
eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the
kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is
maintained by the kernel. This counter is initialized with the value specified in the argument initval.
而event_notifier_get_fd就是返回EventNotifier fd值而已。
event_notifier_set_handler则是将handler挂到AIO线程上,
aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external, (IOHandler *)handler, NULL, e);
在aio_set_fd_handler下,每个AIO的调度单元是以node形式存在,
node = find_aio_handler(ctx, fd); node->io_read = io_read; node->io_write = io_write; node->opaque = opaque; node->is_external = is_external;
在aio_epoll_update中,AioContext下的epoll_enabled被置1,获取ctl的值,然后将node->pfd.fd即eventfd加入epoll队列:
ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; //fd是在find_aio_handler下进行的设置 epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);
然后使用aio_notify通知AIO进行调度,而AioContext的通知功能本质上又是一个eventfd直接在userspace的应用,下面再提:
int event_notifier_set(EventNotifier *e)
{
static const uint64_t value = 1;
ssize_t ret;
do {
//所谓通知就是在ctx->notifier下写入eventfd的值
ret = write(e->wfd, &value, sizeof(value));
} while (ret < 0 && errno == EINTR);
/* EAGAIN is fine, a read must be pending. */
if (ret < 0 && errno != EAGAIN) {
return -errno;
}
return 0;
}
//设置为已发送通知
atomic_mb_set(&ctx->notified, true);
看一下eventfd调用的过程:
在iothread初始化过程中调用了iothread_complete,创建了iothread_run线程
qemu_thread_create(&iothread->thread, thread_name, iothread_run, iothread, QEMU_THREAD_JOINABLE);
只要AIO thread没有被停掉,线程就会一直被epoll
while (!atomic_read(&iothread->stopping)) {
aio_poll(iothread->ctx, true);
}
在aio_poll下,
AioHandler epoll_handler; epoll_handler.pfd.fd = ctx->epollfd; epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; npfd = 0; add_pollfd(&epoll_handler); ret = aio_epoll(ctx, pollfds, npfd, timeout);
在aio_epoll下,完成对event事件的调度
//epoll_wait主要是等待新的event,没有新的event才会等待
ret = epoll_wait(ctx->epollfd, events,
sizeof(events) / sizeof(events[0]),
timeout);
for (i = 0; i < ret; i++) {
int ev = events[i].events;
node = events[i].data.ptr;
node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
(ev & EPOLLOUT ? G_IO_OUT : 0) |
(ev & EPOLLHUP ? G_IO_HUP : 0) |
(ev & EPOLLERR ? G_IO_ERR : 0);
}
此处只是将node进行了设置,但是仍然没有进行真正的调度执行,真正的执行是在aio_dispatch下的
if (!node->deleted &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_read) {
node->io_read(node->opaque);
/* aio_notify() does not count as progress */
if (node->opaque != &ctx->notifier) {
progress = true;
}
}
if (!node->deleted &&
(revents & (G_IO_OUT | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_write) {
node->io_write(node->opaque);
progress = true;
}
回头看一下aio_notify的eventfd的使用,在aio_context_new下,
ret = event_notifier_init(&ctx->notifier, false);
即给eventfd写入了新值。
aio_notify_accept则负责接收它,表示自己已经收到对应的通知并完成处理。
len = read(e->rfd, buffer, sizeof(buffer));
二. Eventfd在kernel下的机制
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)创建eventfd.
在eventfd_file_create中,其中
struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
/*
* Every time that a write(2) is performed on an eventfd, the
* value of the __u64 being written is added to "count" and a
* wakeup is performed on "wqh". A read(2) will return the "count"
* value to userspace, and will reset "count" to zero. The kernel
* side eventfd_signal() also, adds to the "count" counter and
* issue a wakeup.
*/
__u64 count;
unsigned int flags;
};
注释对count的意义说清楚了.
ctx = kmalloc(sizeof(*ctx), GFP_KERNEL); kref_init(&ctx->kref); init_waitqueue_head(&ctx->wqh); ctx->count = count; ctx->flags = flags;
anon_inode_getfile获取匿名fd,重点在file->private_data = priv;另外还有eventfd_fops
static const struct file_operations eventfd_fops = {
#ifdef CONFIG_PROC_FS
.show_fdinfo= eventfd_show_fdinfo,
#endif
.release= eventfd_release,
.poll= eventfd_poll,
.read= eventfd_read,
.write= eventfd_write,
.llseek= noop_llseek,
};
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{
ssize_t res;
DECLARE_WAITQUEUE(wait, current);
spin_lock_irq(&ctx->wqh.lock);
*cnt = 0;
res = -EAGAIN;
if (ctx->count > 0)
res = 0;
else if (!no_wait) {
__add_wait_queue(&ctx->wqh, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ctx->count > 0) {
res = 0;
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res == 0)) {
eventfd_ctx_do_read(ctx, cnt);
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLOUT);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt;
DECLARE_WAITQUEUE(wait, current);
if (count < sizeof(ucnt))
return -EINVAL;
if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
return -EFAULT;
if (ucnt == ULLONG_MAX)
return -EINVAL;
spin_lock_irq(&ctx->wqh.lock);
res = -EAGAIN;
if (ULLONG_MAX - ctx->count > ucnt)
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) {
__add_wait_queue(&ctx->wqh, &wait);
for (res = 0;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ULLONG_MAX - ctx->count > ucnt) {
res = sizeof(ucnt);
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res > 0)) {
ctx->count += ucnt;
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLIN);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}
用户态对eventfd的控制是通过上面两个控制的,而kernel对userspace则是eventfd_signal
struct eventfd_ctx *eventfd_ctx_fileget(struct file *file)
{
if (file->f_op != &eventfd_fops)
return ERR_PTR(-EINVAL);
return eventfd_ctx_get(file->private_data);
}
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{
unsigned long flags;
spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ULLONG_MAX - ctx->count < n)
n = ULLONG_MAX - ctx->count;
ctx->count += n;
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLIN);
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
return n;
}
QEMU下的eventfd机制及源代码分析来自于OENHAN
链接为:https://oenhan.com/qemu-eventfd-kvm/
博主你好,非常感谢你的分享,对我有很大启发,发现两个问题想跟你交流下:
1. qemu用了glib的的一些api,在函数aio_set_fd_handler中调用了g_source_add_poll,查看API后发现这个函数是添加GPollFD到GSource中。
Adds a file descriptor to the set of file descriptors polled for this source
2. aio_epoll_update—>epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);只是将事件event和node->pfd.fd关联起来。
以上两点是想跟你确认,我的理解是否正确?
另外:qemu的这种做法,是将glib与epoll结合起来用么?如果单用其中一种是否也可以完成eventfd的机制。
@HEAVY 这两点是这么理解的。eventfd机制上比较容易实现,只要有线程轮询它即可,其中任何一种都可以实现。epoll和eventfd是非常契合的配置,GMainLoop应该是比较容易控制流程。
博主你好,非常感谢你的分享,对我有很大启发,发现两个问题想跟你交流下:
1. qemu用了glib的的一些api,在函数aio_set_fd_handler中调用了g_source_add_poll,查看API后发现这个函数是添加GPollFD到GSource中。
Adds a file descriptor to the set of file descriptors polled for this source
2. aio_epoll_update—>epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);只是将事件event和node->pfd.fd关联起来。
以上两点是想跟你确认,我的理解是否正确?
另外:qemu的这种做法,是将glib与epoll结合起来用么?如果单用其中一种是否也可以完成eventfd的机制。
@HEAVY 这两点是这么理解的。eventfd机制上比较容易实现,只要有线程轮询它即可,其中任何一种都可以实现。epoll和eventfd是非常契合的配置,GMainLoop应该是比较容易控制流程。