Source code for buildtest.executors.lsf

"""
This module implements the LSFExecutor class responsible for submitting
jobs to LSF Scheduler. This class is called in class BuildExecutor
when initializing the executors.
"""

import logging
import os
import re

from buildtest.defaults import console
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.lsf import LSFJob
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)


[docs]class LSFExecutor(BaseExecutor): """The LSFExecutor class is responsible for submitting jobs to LSF Scheduler. The LSFExecutor performs the following steps - **load**: load lsf configuration from buildtest configuration file - **dispatch**: dispatch job to scheduler and acquire job ID - **poll**: wait for LSF jobs to finish - **gather**: Once job is complete, gather job data """ type = "lsf" def __init__( self, name, settings, site_configs, account=None, maxpendtime=None, timeout=None ): super().__init__( name, settings, site_configs, timeout=timeout, account=account, maxpendtime=maxpendtime, ) self.queue = self._settings.get("queue") self.custom_dirs = deep_get(site_configs.target_config, "paths", "lsf")
[docs] def launcher_command(self, numprocs=None, numnodes=None): """This command returns the launcher command and any options specified in configuration file. This is useful when generating the build script in the BuilderBase class """ self.lsf_cmds = check_binaries( ["bsub", "bjobs", "bkill"], custom_dirs=self.custom_dirs ) cmd = [self.lsf_cmds["bsub"]] if self.queue: cmd += [f"-q {self.queue}"] if self.account: cmd += [f"-P {self.account}"] if numprocs: cmd += [f"-n {numprocs}"] if numnodes: cmd += [f"-nnodes {numnodes}"] if self.launcher_opts: cmd += [" ".join(self.launcher_opts)] return cmd
[docs] def run(self, builder): """This method is responsible for dispatching job to scheduler and extracting job ID by applying a ``re.search`` against output at onset of job submission. If job id is not retrieved due to job failure or unable to match regular expression we mark job incomplete by invoking :func:`buildtest.buildsystem.base.BuilderBase.incomplete`` method and return from method. If we have a valid job ID we invoke :class:`buildtest.executors.lsf.LSFJob` class given the job id to poll job and store this into ``builder.job`` attribute. Args: builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type """ os.chdir(builder.stage_dir) self.logger.debug(f"Changing to stage directory {builder.stage_dir}") cmd = f"{self.shell} {os.path.basename(builder.build_script)}" self.timeout = self.timeout or self._buildtestsettings.target_config.get( "timeout" ) command = builder.run(cmd, self.timeout) if command.returncode() != 0: builder.failed() return builder out = command.get_output() out = " ".join(out) pattern = r"(\d+)" # output in the form: 'Job <58654> is submitted to queue <batch>' and applying regular expression to get job ID regex_match = re.search(pattern, out) self.logger.debug(f"Applying regular expression '{pattern}' to output: '{out}'") # if there is no match we raise error if not regex_match: self.logger.debug(f"Unable to find LSF Job ID in output: '{out}'") builder.failed() return builder try: job_id = int(regex_match.group(0)) except ValueError: self.logger.debug( f"Unable to convert '{regex_match.group(0)}' to int to extract Job ID" ) builder.failed() return builder builder.job = LSFJob(job_id, self.lsf_cmds) builder.metadata["jobid"] = job_id msg = f"[blue]{builder}[/]: JobID: {builder.metadata['jobid']} dispatched to scheduler" self.logger.debug(msg) console.print(msg) return builder