import logging
from buildtest.scheduler.job import Job
from buildtest.utils.command import BuildTestCommand
logger = logging.getLogger(__name__)
[docs]class CobaltJob(Job):
"""The ``CobaltJob`` class performs operation on cobalt job upon job submission such
as polling job, gather job record, cancel job. We also retrieve job state and determine if job
is pending, running, complete, suspended.
"""
def __init__(self, jobID):
super().__init__(jobID)
self._outfile = str(jobID) + ".output"
self._errfile = str(jobID) + ".error"
self._cobaltlog = str(jobID) + ".cobaltlog"
[docs] def is_pending(self):
"""Return ``True`` if job is pending otherwise returns ``False``. When cobalt recieves job it is
in ``starting`` followed by ``queued`` state. We check if job is in either state.
"""
return self._state in ["queued", "starting"]
[docs] def is_running(self):
"""Return ``True`` if job is running otherwise returns ``False``. Cobalt job state for running job is
is marked as ``running``"""
return self._state == "running"
[docs] def is_complete(self):
"""Return ``True`` if job is complete otherwise returns ``False``. Cobalt job state for completed job
job is marked as ``exiting``"""
return self._state == "exiting"
[docs] def is_suspended(self):
"""Return ``True`` if job is suspended otherwise returns ``False``. Cobalt job state for suspended is
marked as ``user_hold``"""
return self._state == "user_hold"
[docs] def is_cancelled(self):
"""Return ``True`` if job is cancelled otherwise returns ``False``. Job state is ``cancelled`` which
is set by class ``cancel`` method
"""
return self._state == "cancelled"
[docs] def cobalt_log(self):
"""Return job cobalt.log file"""
return self._cobaltlog
[docs] def output_file(self):
"""Return job output file"""
return self._outfile
[docs] def error_file(self):
"""Return job error file"""
return self._errfile
[docs] def exitcode(self):
"""Return job exit code"""
return self._exitcode
[docs] def poll(self):
"""Poll job by running ``qstat -l --header State <jobid>`` which retrieves job state."""
# get Job State by running 'qstat -l --header <jobid>'
query = f"qstat -l --header State {self.jobid}"
logger.debug(f"Getting Job State for '{self.jobid}' by running: '{query}'")
cmd = BuildTestCommand(query)
cmd.execute()
output = cmd.get_output()
output = " ".join(output).strip()
# Output in format State: <state> so we need to get value of state
job_state = output.partition(":")[2].strip()
if job_state:
self._state = job_state
logger.debug(f"Job ID: '{self.job}' Job State: {self._state}")
[docs] def gather(self):
"""Gather Job state by running **qstat -lf <jobid>** which retrieves all fields.
The output is in text format which is parsed into key/value pair and stored in a dictionary. This method will
return a dict containing the job record
.. code-block:: console
$ qstat -lf 347106
JobID: 347106
JobName : hold_job
User : shahzebsiddiqui
WallTime : 00:10:00
QueuedTime : 00:13:14
RunTime : N/A
TimeRemaining : N/A
"""
# 'qstat -lf <jobid>' will get all fields of Job.
qstat_cmd = f"qstat -lf {self.jobid}"
logger.debug(f"Executing command: {qstat_cmd}")
cmd = BuildTestCommand(qstat_cmd)
cmd.execute()
output = cmd.get_output()
job_record = {}
# The output if in format KEY: VALUE so we store all records in a dictionary
for line in output:
key, sep, value = line.partition(":")
key = key.strip()
value = value.strip()
job_record[key] = value
return job_record
[docs] def cancel(self):
"""Cancel job by running ``qdel <jobid>``. This method is called if job timer exceeds
``maxpendtime`` if job is pending.
"""
query = f"qdel {self.jobid}"
logger.debug(f"Cancelling job {self.jobid} by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
self._state = "cancelled"