Source code for buildtest.executors.pbs

"""This module implements PBSExecutor class that defines how executors submit
job to PBS Scheduler"""
import logging
import os

from buildtest.defaults import console
from buildtest.exceptions import RuntimeFailure
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.pbs import PBSJob

logger = logging.getLogger(__name__)


[docs]class PBSExecutor(BaseExecutor): """The PBSExecutor class is responsible for submitting jobs to PBS Scheduler. The class implements the following methods: - load: load PBS executors from configuration file - dispatch: submit PBS job to scheduler - poll: poll PBS job via qstat and retrieve job state - gather: gather job result - cancel: cancel job if it exceeds max pending time """ type = "pbs" def __init__( self, name, settings, site_configs, account=None, maxpendtime=None, timeout=None ): self.maxpendtime = maxpendtime self.account = account super().__init__(name, settings, site_configs, timeout=timeout) self.queue = self._settings.get("queue")
[docs] def launcher_command(self, numprocs=None, numnodes=None): batch_cmd = ["qsub"] if self.queue: batch_cmd += [f"-q {self.queue}"] if self.account: batch_cmd += [f"-P {self.account}"] if numprocs: batch_cmd += [f"-l ncpus={numprocs}"] if numnodes: batch_cmd += [f"-l nodes={numnodes}"] if self.launcher_opts: batch_cmd += [" ".join(self.launcher_opts)] return batch_cmd
[docs] def run(self, builder): """This method is responsible for dispatching PBS job, get JobID and start record metadata in builder object. If job failed to submit we check returncode and exit with failure. After we submit job, we start timer and record when job was submitted and poll job once to get job details and store them in builder object. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ self.load() os.chdir(builder.stage_dir) cmd = f"bash {self._bashopts} {os.path.basename(builder.build_script)}" timeout = self.timeout or self._buildtestsettings.target_config.get("timeout") try: command = builder.run(cmd, timeout=timeout) except RuntimeFailure as err: builder.failed() self.logger.error(err) return out = command.get_output() JobID = " ".join(out).strip() builder.metadata["jobid"] = JobID builder.job = PBSJob(JobID) # store job id builder.metadata["jobid"] = builder.job.get() msg = f"[blue]{builder}[/]: JobID: {builder.metadata['jobid']} dispatched to scheduler" console.print(msg) self.logger.debug(msg) return builder
[docs] def poll(self, builder): """This method is responsible for polling PBS job which will update the job state. If job is complete we will gather job result. If job is pending we will stop timer and check if pend time exceeds max pend time for executor. If so we will cancel the job. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ builder.job.poll() # if job is complete gather job data if builder.job.is_complete(): self.gather(builder) return builder.stop() # if job in pending or suspended, check if it exceeds maxpendtime if so cancel job if builder.job.is_pending() or builder.job.is_suspended(): self.logger.debug(f"Time Duration: {builder.timer.duration()}") self.logger.debug(f"Max Pend Time: {self.maxpendtime}") # if timer time is more than requested pend time then cancel job if int(builder.timer.duration()) > self.maxpendtime: builder.job.cancel() builder.failed() console.print( f"[blue]{builder}[/]: [red]Cancelling Job {builder.job.get()} because job exceeds max pend time of {self.maxpendtime} sec with current pend time of {builder.timer.duration()} sec[/red] " ) console.print( f"{builder} in job state: {builder.job.state()} and {builder._state}" ) return builder.start()
[docs] def gather(self, builder): """This method is responsible for gather job results including output and error file and complete metadata for job which is stored in the builder object. We will retrieve job exitcode which corresponds to test returncode. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ builder.record_endtime() builder.metadata["job"] = builder.job.gather() builder.metadata["result"]["returncode"] = builder.job.exitcode() builder.metadata["outfile"] = builder.job.output_file() builder.metadata["errfile"] = builder.job.error_file() console.print(f"[blue]{builder}[/]: Job {builder.job.get()} is complete! ") builder.post_run_steps()