Source code for buildtest.executors.pbs

"""This module implements PBSExecutor class that defines how executors submit
job to PBS Scheduler"""

import logging
import os

from buildtest.defaults import console
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.pbs import PBSJob, TorqueJob
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)


[docs]class PBSExecutor(BaseExecutor): """The PBSExecutor class is responsible for submitting jobs to PBS Scheduler. The class implements the following methods: - load: load PBS executors from configuration file - dispatch: submit PBS job to scheduler - poll: poll PBS job via qstat and retrieve job state - gather: gather job result - cancel: cancel job if it exceeds max pending time """ type = "pbs" def __init__( self, name, settings, site_configs, account=None, maxpendtime=None, timeout=None ): super().__init__( name, settings, site_configs, timeout=timeout, account=account, maxpendtime=maxpendtime, ) self.queue = self._settings.get("queue") self.custom_dirs = None if isinstance(self, PBSExecutor): self.custom_dirs = deep_get(site_configs.target_config, "paths", "pbs") elif isinstance(self, TorqueExecutor): self.custom_dirs = deep_get(site_configs.target_config, "paths", "torque")
[docs] def launcher_command(self, numprocs=None, numnodes=None): batch_cmd = [] self.pbs_cmds = check_binaries( ["qsub", "qstat", "qdel"], custom_dirs=self.custom_dirs ) batch_cmd += [self.pbs_cmds["qsub"]] if self.queue: batch_cmd += [f"-q {self.queue}"] if self.account: batch_cmd += [f"-P {self.account}"] if numprocs: batch_cmd += [f"-l ncpus={numprocs}"] if numnodes: batch_cmd += [f"-l nodes={numnodes}"] if self.launcher_opts: batch_cmd += [" ".join(self.launcher_opts)] return batch_cmd
[docs] def run(self, builder): """This method is responsible for dispatching PBS job, get JobID and start record metadata in builder object. If job failed to submit we check returncode and exit with failure. After we submit job, we start timer and record when job was submitted and poll job once to get job details and store them in builder object. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ os.chdir(builder.stage_dir) cmd = f"{self.shell} {os.path.basename(builder.build_script)}" self.timeout = self.timeout or self._buildtestsettings.target_config.get( "timeout" ) command = builder.run(cmd, timeout=self.timeout) if command.returncode() != 0: builder.failed() return builder out = command.get_output() JobID = " ".join(out).strip() if isinstance(self, TorqueExecutor): builder.job = TorqueJob(JobID, self.pbs_cmds) elif isinstance(self, PBSExecutor): builder.job = PBSJob(JobID, self.pbs_cmds) # store job id builder.metadata["jobid"] = builder.job.get() msg = f"[blue]{builder}[/]: JobID: {builder.metadata['jobid']} dispatched to scheduler" console.print(msg) self.logger.debug(msg) return builder
[docs]class TorqueExecutor(PBSExecutor): """This class is a sub-class of PBSExecutor class and is responsible for Torque Executor"""