首页 > Virtualization > libvirt的Job队列机制分析

libvirt的Job队列机制分析

Virtualization 2017-10-20

在libvirt下这两个函数配对使用,负责对任务建立队列关系

qemuDomainObjBeginJob(driver, vm, QEMU_JOB_MODIFY)

qemuDomainObjEndJob(driver, vm)

看qemuDomainObjBeginJob的入参,qemuDomainJob:

typedef enum {

    QEMU_JOB_NONE = 0,  /* Always set to 0 for easy if (jobActive) conditions */

    QEMU_JOB_QUERY,         /* Doesn't change any state */

    QEMU_JOB_DESTROY,       /* Destroys the domain (cannot be masked out) */

    QEMU_JOB_SUSPEND,       /* Suspends (stops vCPUs) the domain */

    QEMU_JOB_MODIFY,        /* May change state */

    QEMU_JOB_ABORT,         /* Abort current async job */

    QEMU_JOB_MIGRATION_OP,  /* Operation influencing outgoing migration */

    /* The following two items must always be the last items before JOB_LAST */

    QEMU_JOB_ASYNC,         /* Asynchronous job */

    QEMU_JOB_ASYNC_NESTED,  /* Normal job within an async job */

    QEMU_JOB_LAST

} qemuDomainJob;

继续看qemuDomainObjBeginJob即qemuDomainObjBeginJobInternal(driver, obj, job, QEMU_ASYNC_JOB_NONE)

virTimeMillisNow(&now)获取当前时间,then = now + QEMU_JOB_WAIT_TIME是超时时间,也就是30s后。

如果虚拟机设置的最多等待job个数,且当前等待超过最大值后,新插入job直接失败.

    if (cfg->maxQueuedJobs && priv->jobs_queued > cfg->maxQueuedJobs) 

        goto error;

先插入qemuDomainNestedJobAllowed函数

static bool qemuDomainNestedJobAllowed(qemuDomainObjPrivatePtr priv, qemuDomainJob job)

{

//也就是当前没有异步job或者vm的job mask和新job的mask不重复

//新mask由qemuDomainObjSetAsyncJobMask设置,其他job会设置这个mask防止新的异步job相互冲突

    return !priv->job.asyncJob || (priv->job.mask & JOB_MASK(job)) != 0;

}

那么当新job不是QEMU_JOB_ASYNC_NESTED,且和其他异步job冲突时,新job需要等待完成。

bool nested = job == QEMU_JOB_ASYNC_NESTED;

bool async = job == QEMU_JOB_ASYNC;

while (!nested && !qemuDomainNestedJobAllowed(priv, job)) {

    VIR_DEBUG("Waiting for async job (vm=%p name=%s)", obj, obj->def->name);

    if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0)

        goto error;

}

如果当前有正在执行的非异步job,其他任何job都要等待,再次while循环是因为只有同步才会更新priv->job.active

while (priv->job.active) {

   VIR_DEBUG("Waiting for job (vm=%p name=%s)", obj, obj->def->name);

   if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0)

       goto error;

}

检查是不是新的异步job已经提前进入队列

if (!nested && !qemuDomainNestedJobAllowed(priv, job))

    goto retry;

//重置同步job信息

qemuDomainObjResetJob(priv);

if (job != QEMU_JOB_ASYNC) {

//非异步job更新

    priv->job.active = job;

    priv->job.owner = virThreadSelfID();

    priv->job.ownerAPI = virThreadJobGet();

    priv->job.started = now;

} else {

//重置异步job信息

    qemuDomainObjResetAsyncJob(priv);

    if (VIR_ALLOC(priv->job.current) < 0)

        goto cleanup;

    priv->job.asyncJob = asyncJob;

    priv->job.asyncOwner = virThreadSelfID();

    priv->job.asyncOwnerAPI = virThreadJobGet();

    priv->job.asyncStarted = now;

    priv->job.current->started = now;

}

后面是error的处理,当前面virCondWaitUntil等待超时以后,就会走向error,一开始主要就是等待时间的计算,重点在:

if (nested || qemuDomainNestedJobAllowed(priv, job))

        blocker = priv->job.ownerAPI;

else

        blocker = priv->job.asyncOwnerAPI;

if (errno == ETIMEDOUT) {

        if (blocker) {

            virReportError(VIR_ERR_OPERATION_TIMEOUT,

                           _("cannot acquire state change lock (held by %s)"), blocker);

        }

可以看到当前的job执行的位置被谁占用了。

调用的栈

(gdb) bt

#0  qemuDomainObjBeginJobInternal (

driver=driver@entry=0x7fcdf01392c0, obj=0x7fcde4013d70,

job=job@entry=QEMU_JOB_QUERY,

asyncJob=asyncJob@entry=QEMU_ASYNC_JOB_NONE)

at qemu/qemu_domain.c:3483

#1  0x00007fcdfb0bffeb in qemuDomainObjBeginJob (

driver=driver@entry=0x7fcdf01392c0, obj=<optimized out>,

job=job@entry=QEMU_JOB_QUERY) at qemu/qemu_domain.c:3640

#2  0x00007fcdfb146298 in qemuDomainGetBlockInfo (

dom=0x7fcde4038bf0, path=0x7fcde4025380 "vda",

info=0x7fce0746eb00, flags=<optimized out>)

at qemu/qemu_driver.c:11495

#3  0x00007fce1865ae74 in virDomainGetBlockInfo (

domain=domain@entry=0x7fcde4038bf0, disk=0x7fcde4025380 "vda",

info=info@entry=0x7fce0746eb00, flags=0)

at libvirt-domain.c:6114

#4  0x0000558b449a4202 in remoteDispatchDomainGetBlockInfo (

server=0x558b46559f70, msg=0x558b465791b0, ret=0x7fcde4038ee0,

args=0x7fcde4038e80, rerr=0x7fce0746ec50,

client=<optimized out>) at remote_dispatch.h:5167

#5  remoteDispatchDomainGetBlockInfoHelper (server=0x558b46559f70,

client=<optimized out>, msg=0x558b465791b0,

rerr=0x7fce0746ec50, args=0x7fcde4038e80, ret=0x7fcde4038ee0)

at remote_dispatch.h:5141

#6  0x00007fce186cb0b2 in virNetServerProgramDispatchCall (

msg=0x558b465791b0, client=0x558b46575b70,

server=0x558b46559f70, prog=0x558b46572310)

at rpc/virnetserverprogram.c:437

#7  virNetServerProgramDispatch (prog=0x558b46572310,

server=server@entry=0x558b46559f70, client=0x558b46575b70,

msg=0x558b465791b0) at rpc/virnetserverprogram.c:307

#8  0x0000558b449c680d in virNetServerProcessMsg (

msg=<optimized out>, prog=<optimized out>,

client=<optimized out>, srv=0x558b46559f70)

at rpc/virnetserver.c:148

#9  virNetServerHandleJob (jobOpaque=<optimized out>,

opaque=0x558b46559f70) at rpc/virnetserver.c:169

#10 0x00007fce185b03b1 in virThreadPoolWorker (

opaque=opaque@entry=0x558b4654ee60) at util/virthreadpool.c:167

#11 0x00007fce185af738 in virThreadHelper (data=<optimized out>)

at util/virthread.c:206

#12 0x00007fce159c6e25 in start_thread ()

from /lib64/libpthread.so.0

#13 0x00007fce156f434d in clone () from /lib64/libc.so.6

对于qemuDomainObjEndJob就比较简单了

void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)

{

    qemuDomainObjPrivatePtr priv = obj->privateData;

    qemuDomainJob job = priv->job.active;

//计数器减一

    priv->jobs_queued--;

//重置job信息

    qemuDomainObjResetJob(priv);

//发信号唤醒其他使用virCondWaitUntil等待的job

    virCondSignal(&priv->job.cond);

}

结论:调用libvirt API发起的request最迟有30s的延迟,甚至失败。


libvirt的Job队列机制分析来自于OenHan,链接为:http://oenhan.com/libvirt-domain-qemu-job
更多阅读