libvirt的Job队列机制分析
在libvirt下这两个函数配对使用,负责对任务建立队列关系
qemuDomainObjBeginJob(driver, vm, QEMU_JOB_MODIFY)
qemuDomainObjEndJob(driver, vm)
看qemuDomainObjBeginJob的入参,qemuDomainJob:
typedef enum { QEMU_JOB_NONE = 0, /* Always set to 0 for easy if (jobActive) conditions */ QEMU_JOB_QUERY, /* Doesn't change any state */ QEMU_JOB_DESTROY, /* Destroys the domain (cannot be masked out) */ QEMU_JOB_SUSPEND, /* Suspends (stops vCPUs) the domain */ QEMU_JOB_MODIFY, /* May change state */ QEMU_JOB_ABORT, /* Abort current async job */ QEMU_JOB_MIGRATION_OP, /* Operation influencing outgoing migration */ /* The following two items must always be the last items before JOB_LAST */ QEMU_JOB_ASYNC, /* Asynchronous job */ QEMU_JOB_ASYNC_NESTED, /* Normal job within an async job */ QEMU_JOB_LAST } qemuDomainJob;
继续看qemuDomainObjBeginJob即qemuDomainObjBeginJobInternal(driver, obj, job, QEMU_ASYNC_JOB_NONE)
virTimeMillisNow(&now)获取当前时间,then = now + QEMU_JOB_WAIT_TIME是超时时间,也就是30s后。
如果虚拟机设置的最多等待job个数,且当前等待超过最大值后,新插入job直接失败.
if (cfg->maxQueuedJobs && priv->jobs_queued > cfg->maxQueuedJobs) goto error;
先插入qemuDomainNestedJobAllowed函数
static bool qemuDomainNestedJobAllowed(qemuDomainObjPrivatePtr priv, qemuDomainJob job) { //也就是当前没有异步job或者vm的job mask和新job的mask不重复 //新mask由qemuDomainObjSetAsyncJobMask设置,其他job会设置这个mask防止新的异步job相互冲突 return !priv->job.asyncJob || (priv->job.mask & JOB_MASK(job)) != 0; }
那么当新job不是QEMU_JOB_ASYNC_NESTED,且和其他异步job冲突时,新job需要等待完成。
bool nested = job == QEMU_JOB_ASYNC_NESTED; bool async = job == QEMU_JOB_ASYNC; while (!nested && !qemuDomainNestedJobAllowed(priv, job)) { VIR_DEBUG("Waiting for async job (vm=%p name=%s)", obj, obj->def->name); if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0) goto error; }
如果当前有正在执行的非异步job,其他任何job都要等待,再次while循环是因为只有同步才会更新priv->job.active
while (priv->job.active) { VIR_DEBUG("Waiting for job (vm=%p name=%s)", obj, obj->def->name); if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0) goto error; }
检查是不是新的异步job已经提前进入队列
if (!nested && !qemuDomainNestedJobAllowed(priv, job)) goto retry; //重置同步job信息 qemuDomainObjResetJob(priv); if (job != QEMU_JOB_ASYNC) { //非异步job更新 priv->job.active = job; priv->job.owner = virThreadSelfID(); priv->job.ownerAPI = virThreadJobGet(); priv->job.started = now; } else { //重置异步job信息 qemuDomainObjResetAsyncJob(priv); if (VIR_ALLOC(priv->job.current) < 0) goto cleanup; priv->job.asyncJob = asyncJob; priv->job.asyncOwner = virThreadSelfID(); priv->job.asyncOwnerAPI = virThreadJobGet(); priv->job.asyncStarted = now; priv->job.current->started = now; }
后面是error的处理,当前面virCondWaitUntil等待超时以后,就会走向error,一开始主要就是等待时间的计算,重点在:
if (nested || qemuDomainNestedJobAllowed(priv, job)) blocker = priv->job.ownerAPI; else blocker = priv->job.asyncOwnerAPI; if (errno == ETIMEDOUT) { if (blocker) { virReportError(VIR_ERR_OPERATION_TIMEOUT, _("cannot acquire state change lock (held by %s)"), blocker); }
可以看到当前的job执行的位置被谁占用了。
调用的栈
(gdb) bt
#0 qemuDomainObjBeginJobInternal (
driver=driver@entry=0x7fcdf01392c0, obj=0x7fcde4013d70,
job=job@entry=QEMU_JOB_QUERY,
asyncJob=asyncJob@entry=QEMU_ASYNC_JOB_NONE)
at qemu/qemu_domain.c:3483
#1 0x00007fcdfb0bffeb in qemuDomainObjBeginJob (
driver=driver@entry=0x7fcdf01392c0, obj=<optimized out>,
job=job@entry=QEMU_JOB_QUERY) at qemu/qemu_domain.c:3640
#2 0x00007fcdfb146298 in qemuDomainGetBlockInfo (
dom=0x7fcde4038bf0, path=0x7fcde4025380 "vda",
info=0x7fce0746eb00, flags=<optimized out>)
at qemu/qemu_driver.c:11495
#3 0x00007fce1865ae74 in virDomainGetBlockInfo (
domain=domain@entry=0x7fcde4038bf0, disk=0x7fcde4025380 "vda",
info=info@entry=0x7fce0746eb00, flags=0)
at libvirt-domain.c:6114
#4 0x0000558b449a4202 in remoteDispatchDomainGetBlockInfo (
server=0x558b46559f70, msg=0x558b465791b0, ret=0x7fcde4038ee0,
args=0x7fcde4038e80, rerr=0x7fce0746ec50,
client=<optimized out>) at remote_dispatch.h:5167
#5 remoteDispatchDomainGetBlockInfoHelper (server=0x558b46559f70,
client=<optimized out>, msg=0x558b465791b0,
rerr=0x7fce0746ec50, args=0x7fcde4038e80, ret=0x7fcde4038ee0)
at remote_dispatch.h:5141
#6 0x00007fce186cb0b2 in virNetServerProgramDispatchCall (
msg=0x558b465791b0, client=0x558b46575b70,
server=0x558b46559f70, prog=0x558b46572310)
at rpc/virnetserverprogram.c:437
#7 virNetServerProgramDispatch (prog=0x558b46572310,
server=server@entry=0x558b46559f70, client=0x558b46575b70,
msg=0x558b465791b0) at rpc/virnetserverprogram.c:307
#8 0x0000558b449c680d in virNetServerProcessMsg (
msg=<optimized out>, prog=<optimized out>,
client=<optimized out>, srv=0x558b46559f70)
at rpc/virnetserver.c:148
#9 virNetServerHandleJob (jobOpaque=<optimized out>,
opaque=0x558b46559f70) at rpc/virnetserver.c:169
#10 0x00007fce185b03b1 in virThreadPoolWorker (
opaque=opaque@entry=0x558b4654ee60) at util/virthreadpool.c:167
#11 0x00007fce185af738 in virThreadHelper (data=<optimized out>)
at util/virthread.c:206
#12 0x00007fce159c6e25 in start_thread ()
from /lib64/libpthread.so.0
#13 0x00007fce156f434d in clone () from /lib64/libc.so.6
对于qemuDomainObjEndJob就比较简单了
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj) { qemuDomainObjPrivatePtr priv = obj->privateData; qemuDomainJob job = priv->job.active; //计数器减一 priv->jobs_queued--; //重置job信息 qemuDomainObjResetJob(priv); //发信号唤醒其他使用virCondWaitUntil等待的job virCondSignal(&priv->job.cond); }
结论:调用libvirt API发起的request最迟有30s的延迟,甚至失败。
libvirt的Job队列机制分析来自于OenHan
链接为:https://oenhan.com/libvirt-domain-qemu-job
最近也在看这块代码,有些机制不太明白。在job以外已经对虚拟机的vm对象加锁了,已经可以保证任务的串行。这里对job的操作为什么还要加锁呢?
@月闻缘 “在job以外已经对虚拟机的vm对象加锁了”指的具体是什么,把代码示例贴一下?
@月闻缘 两方面考虑:
1、并发对一个vm进行操作的场景。如果所有的api都通过拿vm大锁来保证 数据一致性,那会严重影响一些api(如查询类操作)的体验,比如有些生命周期操作很耗时,此时并发去查询,就会卡住。
因此libvirt引入job锁,对vm的操作主要通过job锁来控制,也因此有了nestedjob的概念,查询类的job默认是可以嵌套的。
# define QEMU_JOB_DEFAULT_MASK \
(JOB_MASK(QEMU_JOB_QUERY) | \
JOB_MASK(QEMU_JOB_DESTROY) | \
JOB_MASK(QEMU_JOB_ABORT))
2、大部分api运行过程中会释放vm锁,典型的场景是通过qemu monitor与qemu进行交互的时候。所有的qmp/hmp下发,都会有类似qemuDomainObjEnterMonitor与qemuDomainObjExitMonitor操作,这些函数里就有释放和重新获取vm锁的操作。libvirt是由job锁来保证api的完整性。
@ZWL9 是的,后来这块代码看到了。在这部分job中是会暂时释放vm锁的。