Source code for buildtest.executors.lsf

This module implements the LSFExecutor class responsible for submitting
jobs to LSF Scheduler. This class is called in class BuildExecutor
when initializing the executors.
import logging
import os
import re

from buildtest.defaults import console
from buildtest.exceptions import RuntimeFailure
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.lsf import LSFJob

logger = logging.getLogger(__name__)

[docs]class LSFExecutor(BaseExecutor): """The LSFExecutor class is responsible for submitting jobs to LSF Scheduler. The LSFExecutor performs the following steps - **load**: load lsf configuration from buildtest configuration file - **dispatch**: dispatch job to scheduler and acquire job ID - **poll**: wait for LSF jobs to finish - **gather**: Once job is complete, gather job data """ type = "lsf" def __init__( self, name, settings, site_configs, account=None, maxpendtime=None, timeout=None ): self.account = account self.maxpendtime = maxpendtime super().__init__(name, settings, site_configs, timeout=None) self.queue = self._settings.get("queue")
[docs] def launcher_command(self, numprocs=None, numnodes=None): """This command returns the launcher command and any options specified in configuration file. This is useful when generating the build script in the BuilderBase class """ cmd = ["bsub"] if self.queue: cmd += [f"-q {self.queue}"] if self.account: cmd += [f"-P {self.account}"] if numprocs: cmd += [f"-n {numprocs}"] if numnodes: cmd += [f"-nnodes {numnodes}"] if self.launcher_opts: cmd += [" ".join(self.launcher_opts)] return cmd
[docs] def run(self, builder): """This method is responsible for dispatching job to scheduler and extracting job ID by applying a ```` against output at onset of job submission. If job id is not retrieved due to job failure or unable to match regular expression we mark job incomplete by invoking :func:`buildtest.buildsystem.base.BuilderBase.incomplete`` method and return from method. If we have a valid job ID we invoke :class:`buildtest.executors.lsf.LSFJob` class given the job id to poll job and store this into ``builder.job`` attribute. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ os.chdir(builder.stage_dir) self.logger.debug(f"Changing to stage directory {builder.stage_dir}") cmd = f"bash {self._bashopts} {os.path.basename(builder.build_script)}" timeout = self.timeout or self._buildtestsettings.target_config.get("timeout") try: command =, timeout) except RuntimeFailure as err: self.logger.error(err) return out = command.get_output() out = " ".join(out) pattern = r"(\d+)" # output in the form: 'Job <58654> is submitted to queue <batch>' and applying regular expression to get job ID m =, out) self.logger.debug(f"Applying regular expression '{pattern}' to output: '{out}'") # if there is no match we raise error if not m: self.logger.debug(f"Unable to find LSF Job ID in output: '{out}'") builder.failed() return try: job_id = int( except ValueError: self.logger.debug( f"Unable to convert '{}' to int to extract Job ID" ) builder.failed() return builder.job = LSFJob(job_id) builder.metadata["jobid"] = job_id msg = f"[blue]{builder}[/]: JobID: {builder.metadata['jobid']} dispatched to scheduler" self.logger.debug(msg) console.print(msg) return builder
[docs] def poll(self, builder): """Given a builder object we poll the job by invoking builder method ``builder.job.poll()`` return state of job. If job is suspended or pending we stop timer and check if timer exceeds maxpendtime value which could be defined in configuration file or passed via command line ``--max-pend-time`` Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ builder.job.poll() # if job is complete gather job data if builder.job.is_complete(): self.gather(builder) return builder.stop() if builder.job.is_suspended() or builder.job.is_pending(): self.logger.debug(f"Time Duration: {builder.duration}") self.logger.debug(f"Max Pend Time: {self.maxpendtime}") # if timer time is more than requested pend time then cancel job if int(builder.timer.duration()) > self.maxpendtime: builder.job.cancel() builder.failed() console.print( f"[blue]{builder}[/]: [red]Cancelling Job {builder.job.get()} because job exceeds max pend time of {self.maxpendtime} sec with current pend time of {builder.timer.duration()} sec[/red] " ) return builder.start()
[docs] def gather(self, builder): """Gather Job detail after completion of job by invoking the builder method ``builder.job.gather()``. We retrieve exit code, output file, error file and update builder metadata. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ builder.record_endtime() builder.metadata["job"] = builder.job.gather() builder.metadata["result"]["returncode"] = builder.job.exitcode() self.logger.debug( f"[{}] returncode: {builder.metadata['result']['returncode']}" ) builder.metadata["outfile"] = os.path.join( builder.stage_dir, builder.job.output_file() ) builder.metadata["errfile"] = os.path.join( builder.stage_dir, builder.job.error_file() ) console.print(f"[blue]{builder}[/]: Job {builder.job.get()} is complete! ") builder.post_run_steps()