import json
import logging
import time
from buildtest.scheduler.job import Job
from buildtest.utils.command import BuildTestCommand
logger = logging.getLogger(__name__)
[docs]
class LSFJob(Job):
def __init__(self, jobID, lsf_cmds):
super().__init__(jobID)
self.lsf_cmds = lsf_cmds
[docs]
def is_pending(self):
"""Check if Job is pending which is reported by LSF as ``PEND``. Return ``True`` if there is a match otherwise returns ``False``"""
return self._state == "PEND"
[docs]
def is_running(self):
"""Check if Job is running which is reported by LSF as ``RUN``. Return ``True`` if there is a match otherwise returns ``False``"""
return self._state == "RUN"
[docs]
def is_complete(self):
"""Check if Job is complete which is in ``DONE`` state. Return ``True`` if there is a match otherwise return ``False``"""
return self._state == "DONE"
[docs]
def is_suspended(self):
"""Check if Job is in suspended state which could be in any of the following states: [``PSUSP``, ``USUSP``, ``SSUSP``].
We return ``True`` if job is in one of the states otherwise return ``False``
"""
return self._state in ["PSUSP", "USUSP", "SSUSP"]
[docs]
def is_failed(self):
"""Check if Job failed. We return ``True`` if job is in ``EXIT`` state otherwise return ``False``"""
return self._state == "EXIT"
[docs]
def poll(self):
"""Given a job id we poll the LSF Job by retrieving its job state, output file, error file and exit code.
We run the following commands to retrieve following states
- Job State: ``bjobs -noheader -o 'stat' <JOBID>``
- Exit Code: ``bjobs -noheader -o 'EXIT_CODE' <JOBID>'``
"""
# get job state
query = f"{self.lsf_cmds['bjobs']} -noheader -o 'stat' {self.jobid}"
logger.debug(query)
logger.debug(
f"Extracting Job State for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
job_state = cmd.get_output()
self._state = "".join(job_state).rstrip()
logger.debug(f"Job State: {self._state}")
query = f"{self.lsf_cmds['bjobs']} -noheader -o 'EXIT_CODE' {self.jobid} "
logger.debug(
f"Extracting EXIT CODE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
output = "".join(cmd.get_output()).rstrip()
# for 0 or negative exit code output is in form "-" otherwise set value retrieved by bjobs
try:
self._exitcode = int(output)
except ValueError:
self._exitcode = 0
logger.debug(f"Exit Code: {self._exitcode}")
# if job is running and the start time is not recorded then we record the start time
if self.is_running() and not self.starttime:
self.starttime = time.time()
[docs]
def get_output_and_error_files(self):
"""This method will extract output and error file for a given jobID by running the following commands:
``bjobs -noheader -o 'output_file' <JOBID>`` and ``bjobs -noheader -o 'error_file' <JOBID>``
.. code-block:: console
$ bjobs -noheader -o 'output_file' 70910
hold_job.out
.. code-block:: console
$ bjobs -noheader -o 'error_file' 70910
hold_job.err
"""
# get path to output file
query = f"{self.lsf_cmds['bjobs']} -noheader -o 'output_file' {self.jobid} "
logger.debug(
f"Extracting OUTPUT FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._outfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Output File: {self._outfile}")
# get path to error file
query = f"{self.lsf_cmds['bjobs']} -noheader -o 'error_file' {self.jobid} "
logger.debug(
f"Extracting ERROR FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._errfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Error File: {self._errfile}")
[docs]
def retrieve_jobdata(self):
"""We will gather job record at onset of job completion by running ``bjobs -o '<format1> <format2>' <jobid> -json``. T
Shown below is the output format and we retrieve the job records defined in **RECORDS** property
.. code-block:: console
$ bjobs -o 'job_name stat user user_group queue proj_name pids exit_code from_host exec_host submit_time start_time finish_time nthreads exec_home exec_cwd output_file error_file' 58652 -json
{
"COMMAND":"bjobs",
"JOBS":1,
"RECORDS":[
{
"JOB_NAME":"hold_job",
"STAT":"PSUSP",
"USER":"shahzebsiddiqui",
"USER_GROUP":"GEN014ECPCI",
"QUEUE":"batch",
"PROJ_NAME":"GEN014ECPCI",
"PIDS":"",
"EXIT_CODE":"",
"FROM_HOST":"login1",
"EXEC_HOST":"",
"SUBMIT_TIME":"May 28 12:45",
"START_TIME":"",
"FINISH_TIME":"",
"NTHREADS":"",
"EXEC_HOME":"",
"EXEC_CWD":"",
"OUTPUT_FILE":"hold_job.out",
"ERROR_FILE":"hold_job.err"
}
]
}
"""
self.get_output_and_error_files()
format_fields = [
"job_name",
"stat",
"user",
"user_group",
"queue",
"proj_name",
"pids",
"exit_code",
"from_host",
"exec_host",
"submit_time",
"start_time",
"finish_time",
"nthreads",
"exec_home",
"exec_cwd",
"output_file",
"error_file",
]
query = f"{self.lsf_cmds['bjobs']} -o '{' '.join(format_fields)}' {self.jobid} -json"
logger.debug(f"Gather LSF job: {self.jobid} data by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
out = cmd.get_output()
out = "".join(out).rstrip()
out = json.loads(out)
logger.debug(json.dumps(out, indent=2))
job_data = {}
records = out["RECORDS"][0]
for field, value in records.items():
job_data[field] = value
self._jobdata = job_data
[docs]
def cancel(self):
"""Cancel LSF Job by running ``bkill <jobid>``. This method is called if job pending time exceeds
`maxpendtime` limit during poll stage."""
query = f"{self.lsf_cmds['bkill']} {self.jobid}"
logger.debug(f"Cancelling job {self.jobid} by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()