import json
import logging
import os
import re
import time
from buildtest.exceptions import JobSchedulerError
from buildtest.scheduler.job import Job
from buildtest.utils.command import BuildTestCommand
logger = logging.getLogger(__name__)
[docs]
class PBSJob(Job):
"""The PBSJob models a PBS Job with helper methods to retrieve job state, check if job is running/pending/suspended. We have methods
to poll job state, gather job results upon completion and cancel job.
"""
def __init__(self, jobID, sched_cmds):
self._outfile = None
self._errfile = None
self.qdel_cmd = sched_cmds["qdel"]
self.poll_command = f"{sched_cmds['qstat']} -xf"
super().__init__(jobID)
[docs]
def is_pending(self):
"""Return ``True`` if job is pending. A pending job is in state ``Q``."""
return self._state == "Q"
[docs]
def is_running(self):
"""Return ``True`` if job is running. A completed job is in state ``R``."""
return self._state == "R"
[docs]
def is_complete(self):
"""Return ``True`` if job is complete. A completed job is in state ``F``."""
return self._state == "C"
[docs]
def is_suspended(self):
"""Return ``True`` if job is suspended which would be in one of these states ``H``, ``U``, ``S``."""
return self._state in ["H", "U", "S"]
[docs]
def success(self):
"""This method determines if job was completed successfully and returns ``True`` if exit code is 0.
According to https://help.altair.com/2021.1.3/PBS%20Professional/PBSAdminGuide2021.1.3.pdf section 14.9 Job Exit Status Codes we have the following
- Exit Code: X < 0 - Job could not be executed
- Exit Code: 0 <= X < 128 - Exit value of Shell or top-level process
- Exit Code: X >= 128 - Job was killed by signal
- Exit Code: X == 0 - Job executed was a successful
"""
return self._exitcode == 0
[docs]
def fail(self):
"""Return ``True`` if their is a job failure which would be if exit code is not 0"""
return not self.success()
[docs]
def get_output_error_files(self):
"""Fetch output and error files right after job submission."""
query = f"{self.poll_command} {self.jobid}"
cmd = BuildTestCommand(query)
cmd.execute()
output = " ".join(cmd.get_output())
# Regular expression pattern to match the OutPut_Path field. This will account for text spanning multiple lines
pattern = r"Output_Path\s*=\s*(.*?)\s*Priority"
match = re.search(pattern, output, re.DOTALL)
if match:
lines = match.group(1).split(":")[1].split("\n")
# Remove leading whitespace from lines after the first line
formatted_lines = [lines[0]] + [line.strip() for line in lines[1:]]
self._outfile = "".join(formatted_lines)
logger.debug(self._outfile)
# Regular expression pattern to match the Error_Path field
pattern = r"Error_Path\s*=\s*(.*?)\s*(?:\n\s*(?:\w+\s*=)|$)"
match = re.search(pattern, output, re.DOTALL)
if match:
lines = match.group(1).split(":")[1].split("\n")
# Remove leading whitespace from lines after the first line
formatted_lines = [lines[0]] + [line.strip() for line in lines[1:]]
self._errfile = "".join(formatted_lines)
[docs]
def is_output_ready(self):
"""Check if the output and error file exists."""
if not self._outfile or not self._errfile:
self.get_output_error_files()
return os.path.exists(self._outfile) and os.path.exists(self._errfile)
[docs]
def poll(self):
"""This method will poll the PBS Job by running ``qstat -f <jobid>`` which will retrieve the job details and extract
data such as job state, exit code, output and error file.
A typical output for the PBS job looks something like this
.. code-block:: console
(buildtest) adaptive50@e4spro-cluster:~/Documents/buildtest/aws_oddc$ qstat -f 40680075.e4spro-cluster
Job Id: 40680075.e4spro-cluster
Job_Name = hostname_test
Job_Owner = adaptive50@server.nodus.com
resources_used.cput = 00:00:00
resources_used.vmem = 0kb
resources_used.walltime = 00:00:05
resources_used.mem = 0kb
resources_used.energy_used = 0
job_state = C
queue = e4spro-cluster
server = e4spro-cluster
Checkpoint = u
ctime = Mon Mar 25 17:42:02 2024
Error_Path = e4spro-cluster:/home/adaptive50/Documents/buildtest/var/tests
/generic.torque.e4spro/sleep/hostname_test/b10fea47/stage/hostname_tes
t.e
exec_host = ac-d160-0-0/0
Hold_Types = n
Join_Path = n
Keep_Files = n
Mail_Points = a
mtime = Mon Mar 25 17:42:38 2024
Output_Path = e4spro-cluster:/home/adaptive50/Documents/buildtest/var/test
s/generic.torque.e4spro/sleep/hostname_test/b10fea47/stage/hostname_te
st.o
Priority = 0
qtime = Mon Mar 25 17:42:02 2024
Rerunable = True
Resource_List.nodes = 1
Resource_List.nodect = 1
Resource_List.walltime = 24:00:00
session_id = 1806
Variable_List = PBS_O_QUEUE=e4spro-cluster,PBS_O_HOME=/home/adaptive50,
PBS_O_LOGNAME=adaptive50,
PBS_O_PATH=/home/adaptive50/Documents/buildtest/bin:/home/adaptive50/
.local/share/virtualenvs/buildtest-hH765GEg/bin:/home/adaptive50/packa
ges/bin:/usr/local/paraview-5.11.2/bin:/home/adaptive50/.local/bin:/us
r/local/cuda/bin:/usr/local/julia/1.10.0/bin:/usr/local/go/bin:/usr/lo
cal/libexec/osu-micro-benchmarks/mpi/startup:/usr/local/libexec/osu-mi
cro-benchmarks/mpi/pt2pt:/usr/local/libexec/osu-micro-benchmarks/mpi/o
ne-sided:/usr/local/libexec/osu-micro-benchmarks/mpi/collective:/opt/b
ootstrap/view/bin:/home/adaptive50/packages/bin:/usr/local/paraview-5.
11.2/bin:/home/adaptive50/.local/bin:/usr/local/cuda/bin:/usr/local/ju
lia/1.10.0/bin:/usr/local/go/bin:/usr/local/libexec/osu-micro-benchmar
ks/mpi/startup:/usr/local/libexec/osu-micro-benchmarks/mpi/pt2pt:/usr/
local/libexec/osu-micro-benchmarks/mpi/one-sided:/usr/local/libexec/os
u-micro-benchmarks/mpi/collective:/opt/bootstrap/view/bin:/home/adapti
ve50/spack/bin:/home/adaptive50/packages/bin:/spack/bin:/usr/local/vis
it/bin:/usr/local/paraview-5.11.2/bin:/home/adaptive50/.local/bin:/usr
/local/cuda/bin:/usr/local/julia/1.10.0/bin:/usr/local/go/bin:/usr/loc
al/libexec/osu-micro-benchmarks/mpi/startup:/usr/local/libexec/osu-mic
ro-benchmarks/mpi/pt2pt:/usr/local/libexec/osu-micro-benchmarks/mpi/on
e-sided:/usr/local/libexec/osu-micro-benchmarks/mpi/collective:/opt/bo
otstrap/view/bin:/home/adaptive50/.local/bin:/home/adaptive50/bin:/opt
/mvapich2-x/gnu11.1.0/mofed/aws/mpirun/bin:/usr/local/bin:/usr/local/s
bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/
games:/usr/local/games:/snap/bin:/opt/mvapich2-x/gnu11.1.0/mofed/aws/m
pirun/libexec/osu-micro-benchmarks/mpi/startup:/opt/mvapich2-x/gnu11.1
.0/mofed/aws/mpirun/libexec/osu-micro-benchmarks/mpi/one-sided:/opt/mv
apich2-x/gnu11.1.0/mofed/aws/mpirun/libexec/osu-micro-benchmarks/mpi/c
ollective:/opt/mvapich2-x/gnu11.1.0/mofed/aws/mpirun/libexec/osu-micro
-benchmarks/mpi/pt2pt:/usr/local/cuda/bin:/usr/local/tau-2.33/x86_64/b
in:/spack/opt/spack/linux-ubuntu20.04-x86_64/gcc-11.4.0/openjdk-11.0.2
0.1_1-qg3jd2dpwz6bwi455lcljdkiv5rifjmr/bin:/usr/local/cuda/bin:/usr/lo
cal/tau-2.33/x86_64/bin:/spack/opt/spack/linux-ubuntu20.04-x86_64/gcc-
11.4.0/openjdk-11.0.20.1_1-qg3jd2dpwz6bwi455lcljdkiv5rifjmr/bin:/usr/l
ocal/cuda/bin:/usr/local/tau-2.33/x86_64/bin:/spack/opt/spack/linux-ub
untu20.04-x86_64/gcc-11.4.0/openjdk-11.0.20.1_1-qg3jd2dpwz6bwi455lcljd
kiv5rifjmr/bin,PBS_O_MAIL=/var/mail/adaptive50,
PBS_O_SHELL=/usr/bin/bash,PBS_O_LANG=C.UTF-8,
PBS_O_WORKDIR=/home/adaptive50/Documents/buildtest/var/tests/generic.
torque.e4spro/sleep/hostname_test/b10fea47/stage,
PBS_O_HOST=e4spro-cluster,PBS_O_SERVER=e4spro-cluster
euser = adaptive50
egroup = adaptive50
queue_type = E
etime = Mon Mar 25 17:42:02 2024
exit_status = 0
submit_args = -q e4spro-cluster /home/adaptive50/Documents/buildtest/var/t
ests/generic.torque.e4spro/sleep/hostname_test/b10fea47/stage/hostname
_test.sh
start_time = Mon Mar 25 17:42:32 2024
start_count = 1
fault_tolerant = False
comp_time = Mon Mar 25 17:42:38 2024
job_radix = 0
total_runtime = 6.235349
submit_host = e4spro-cluster
init_work_dir = /home/adaptive50/Documents/buildtest/var/tests/generic.tor
que.e4spro/sleep/hostname_test/b10fea47/stage
request_version = 1
req_information.task_count.0 = 1
req_information.lprocs.0 = 1
req_information.thread_usage_policy.0 = allowthreads
req_information.hostlist.0 = ac-d160-0-0:ppn=1
req_information.task_usage.0.task.0.cpu_list = 0
req_information.task_usage.0.task.0.mem_list = 0
req_information.task_usage.0.task.0.cores = 0
req_information.task_usage.0.task.0.threads = 1
req_information.task_usage.0.task.0.host = ac-d160-0-0
copy_on_rerun = False
"""
query = f"{self.poll_command} {self.jobid}"
logger.debug(f"Polling job by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
output = " ".join(cmd.get_output())
pattern = r"^Job Id:\s*(?P<jobid>\S+)"
jobid_match = re.search(pattern, output, re.MULTILINE)
logger.debug(
f"Extracting Job ID from output of command: {query} by applying regular expression pattern: '{pattern}'. The return value is {jobid_match}"
)
if jobid_match:
self.jobid = jobid_match.group("jobid")
else:
raise JobSchedulerError(
f"Unable to extract Job ID from output of qstat command: {query}"
)
# job_data = json.loads(output)
pattern = r"^\s*job_state = (?P<state>[A-Z])"
state_match = re.search(pattern, output, re.MULTILINE)
"""
if not state_match:
self.log(f'Job state not found (job info follows):\n{output}')
continue
"""
self._state = state_match.group("state")
pattern = r"^\s*exit_status = (?P<code>\d+)"
exitcode_match = re.search(pattern, output, re.MULTILINE)
logger.debug(
f"Retrieving exitcode for Job: {self.jobid} by applying regular expression pattern: '{pattern}'. The return value is {exitcode_match}"
)
if exitcode_match:
self._exitcode = int(exitcode_match.group("code"))
logger.debug(f"Retrieve exitcode: {self._exitcode} for Job: {self.jobid}")
# ---- Get output and error files ----
self.is_output_ready()
# if job is running and the start time is not recorded then we record the start time
if self.is_running() and not self.starttime:
self.starttime = time.time()
[docs]
def retrieve_jobdata(self):
"""This method is called once job is complete. We will gather record of job by running
``qstat -x -f -F json <jobid>`` and return the json object as a dict. This method is responsible
for getting output file, error file and exit status of job.
"""
query = f"{self.poll_command} -F json {self.jobid}"
logger.debug(f"Retrieving job data by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
output = " ".join(cmd.get_output())
job_data = json.loads(output)
logger.debug(f"Retrieved job data for job: {self.jobid}:\n{job_data}")
return job_data
[docs]
def cancel(self):
"""Cancel PBS job by running ``qdel <jobid>``."""
query = f"{self.qdel_cmd} {self.jobid}"
logger.debug(f"Cancelling job {self.jobid} by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
[docs]
class TorqueJob(PBSJob):
def __init__(self, jobID, sched_cmds):
self._outfile = None
self._errfile = None
super().__init__(jobID, sched_cmds)
self.qdel_cmd = sched_cmds["qdel"]
# need to redeclare this since we are using qstat -f and not qstat -xf (PBS)
self.poll_command = f"{sched_cmds['qstat']} -f"
[docs]
def retrieve_jobdata(self):
"""This method is called once job is complete. We will gather record of job by running
`qstat -f <jobid>` and return the output as a string.
"""
query = f"{self.poll_command} {self.jobid}"
logger.debug(f"Retrieving job data by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
output = " ".join(cmd.get_output())
logger.debug(f"Retrieved job data for job: {self.jobid}:\n{output}")
return output