Source code for buildtest.config

import json
import logging
import re

from buildtest.defaults import (
    DEFAULT_SETTINGS_FILE,
    DEFAULT_SETTINGS_SCHEMA,
    USER_SETTINGS_FILE,
)
from buildtest.exceptions import BuildTestError, ConfigurationError
from buildtest.schemas.defaults import custom_validator
from buildtest.schemas.utils import load_recipe, load_schema
from buildtest.system import LSF, PBS, Cobalt, Slurm
from buildtest.utils.command import BuildTestCommand
from buildtest.utils.file import resolve_path
from buildtest.utils.shell import Shell
from buildtest.utils.tools import deep_get

logger = logging.getLogger(__name__)


[docs]class SiteConfiguration: """This class is an interface to buildtest configuration""" def __init__(self, settings_file=None): """The initializer will declare class variables in its initial state and resolve path to configuration file. Once file is resolved we will load the configuration using :func:`load`. Args: settings_file (str, optional): path to buildtest configuration file """ self._file = settings_file self.config = None self._name = None # self.target_config stores value for target system. The configuration may define multiple system, # but only one system can be active depending on which host buildtest is run self.target_config = None self.disabled_executors = [] self.invalid_executors = [] self.valid_executors = { "local": {}, "slurm": {}, "lsf": {}, "pbs": {}, "cobalt": {}, } self.resolve() self.load()
[docs] def load(self): """Loads configuration file""" self.config = load_recipe(self._file)
@property def file(self): return self._file @file.setter def file(self, path): self._file = path
[docs] def resolve(self): """This method will resolve path to configuration file. The order of precedence is as follows: 1. command line argument via ``buildtest --config <path>`` 2. User Configuration: **$HOME/.buildtest/config.yml** 3. Default Configuration: **$BUILDTEST_ROOT/buildtest/settings/config.yml** """ self._file = ( resolve_path(self._file) or resolve_path(USER_SETTINGS_FILE) or DEFAULT_SETTINGS_FILE )
[docs] def name(self): """Return name of matched system from configuration file""" return self._name
[docs] def detect_system(self): """This method detects which system configuration to use by checking target hostname with list of hostname entries defined in ``hostnames`` property. If there is a match we set ``self._name`` to map to system name and load the target configuration by setting ``self.target_config`` to the desired system configuration. If no system is found we raise an exception. Raises: ConfigurationError: If there is no matching system """ self.systems = list(self.config["system"].keys()) host_lookup = {} # get hostname fqdn cmd = BuildTestCommand("hostname -f") cmd.execute() hostname = " ".join(cmd.get_output()) # for every system record we lookup 'hostnames' entry and apply re.match against current hostname. If found we break from loop for name in self.systems: host_lookup[name] = self.config["system"][name]["hostnames"] for host_entry in self.config["system"][name]["hostnames"]: if re.match(host_entry, hostname): self.target_config = self.config["system"][name] self._name = name break if not self.target_config: raise ConfigurationError( self.config, self.file, f"Based on current system hostname: {hostname} we cannot find a matching system {list(self.systems)} based on current hostnames: {host_lookup} ", )
# self.localexecutors = list(self.target_config["executors"]["local"].keys())
[docs] def validate(self, validate_executors=True, moduletool=None): """This method validates the site configuration with schema and checks executor setting. Args: validate_executors (bool): Check executor settings. This is the default behavior but can be disabled moduletool (bool, optional): Check whether module system (Lmod, environment-modules) match what is specified in configuration file. Valid options are ``Lmod``, ``environment-modules`` """ logger.debug(f"Loading default settings schema: {DEFAULT_SETTINGS_SCHEMA}") config_schema = load_schema(DEFAULT_SETTINGS_SCHEMA) logger.debug( f"Validating configuration file with schema: {DEFAULT_SETTINGS_SCHEMA}" ) custom_validator(recipe=self.config, schema=config_schema) logger.debug("Validation was successful") if validate_executors: self._executor_check() if ( self.target_config.get("moduletool") != "N/A" and self.target_config.get("moduletool") != moduletool ): raise ConfigurationError( self.config, self.file, f"There is a module tool mismatch, we have detected '{moduletool}' but configuration property 'moduletool' specifies '{self.target_config['moduletool']}'", )
[docs] def _executor_check(self): """Validate executors""" self._validate_local_executors() self._validate_slurm_executors() self._validate_lsf_executors() self._validate_cobalt_executors() self._validate_pbs_executors()
[docs] def is_executor_disabled(self, executor): if executor.get("disable"): return True return False
[docs] def _validate_local_executors(self): """Check local executor by verifying the 'shell' types are valid""" executor_type = "local" local_executors = deep_get(self.target_config, "executors", "local") if not local_executors: return # loop over all shell property and see if all shell types are valid path and supported shell. An exception will be raised if there is an issue for executor in local_executors: name = local_executors[executor]["shell"] executor_name = f"{self.name()}.{executor_type}.{executor}" if self.is_executor_disabled(local_executors[executor]): self.disabled_executors.append(executor_name) continue try: Shell(name) except BuildTestError as err: self.invalid_executors.append(executor_name) logger.error(err) continue self.valid_executors[executor_type][executor_name] = { "setting": local_executors[executor] }
[docs] def _validate_lsf_executors(self): """This method validates all LSF executors. We check if queue is available and in ``Open:Active`` state. """ lsf_executors = deep_get(self.target_config, "executors", "lsf") if not lsf_executors: return executor_type = "lsf" lsf = LSF() if not lsf.active(): return queue_list = [] valid_queue_state = "Open:Active" record = lsf.queues["RECORDS"] # retrieve all queues from json record for name in record: queue_list.append(name["QUEUE_NAME"]) # check all executors have defined valid queues and check queue state. for executor in lsf_executors: executor_name = f"{self.name()}.{executor_type}.{executor}" if self.is_executor_disabled(lsf_executors[executor]): self.disabled_executors.append(executor_name) continue queue = lsf_executors[executor].get("queue") # if queue field is defined check if its valid queue if queue: if queue not in queue_list: self.invalid_executors.append(executor_name) logger.error( f"'{queue}' is invalid LSF queue. Please select one of the following queues: {queue_list}" ) continue # check queue record for Status for name in record: # skip record until we find matching queue if name["QUEUE_NAME"] != queue: continue queue_state = name["STATUS"] # if state not Open:Active we raise error if not queue_state == valid_queue_state: self.invalid_executors.append(executor_name) logger.error( f"'{queue}' is in state: {queue_state}. It must be in {valid_queue_state} state in order to accept jobs" ) break self.valid_executors[executor_type][executor_name] = { "setting": lsf_executors[executor] }
[docs] def _validate_slurm_executors(self): """This method will validate slurm executors, we check if partition, qos, and cluster fields are valid values by retrieving details from slurm configuration. These checks are performed on fields ``partition``, ``qos`` or ``cluster`` if specified in executor section. """ slurm_executor = deep_get(self.target_config, "executors", "slurm") if not slurm_executor: return executor_type = "slurm" slurm = Slurm() if not slurm.active(): return for executor in slurm_executor: executor_name = f"{self.name()}.{executor_type}.{executor}" if self.is_executor_disabled(slurm_executor[executor]): self.disabled_executors.append(executor_name) continue # if 'partition' key defined check if its valid partition if slurm_executor[executor].get("partition"): if slurm_executor[executor]["partition"] not in slurm.partitions: self.invalid_executors(executor_name) logger.error( f"executor - {executor} has invalid partition name '{slurm_executor[executor]['partition']}'. Please select one of the following partitions: {slurm.partitions}" ) continue query = ( f"sinfo -p {slurm_executor[executor]['partition']} -h -O available" ) cmd = BuildTestCommand(query) cmd.execute() part_state = "".join(cmd.get_output()) part_state = part_state.rstrip() # check if partition is in 'up' state. If not we raise an error. if part_state != "up": self.invalid_executors(f"{executor_name}") logger.error( f"partition - {slurm_executor[executor]['partition']} is in state: {part_state}. It must be in 'up' state in order to accept jobs" ) continue """ disable qos check for now. Issue with 'regular' qos at Cori where it maps to 'regular_hsw' partition while 'regular' is the valid qos name' # check if 'qos' key is valid qos if ( slurm_executor[executor].get("qos") and slurm_executor[executor].get("qos") not in slurm.qos ): raise ConfigurationError( self.config, self.file, f"{slurm_executor[executor]['qos']} not a valid qos! Please select one of the following qos: {slurm.qos}", ) """ # check if 'cluster' key is valid slurm cluster if ( slurm_executor[executor].get("cluster") and slurm_executor[executor].get("cluster") not in slurm.clusters ): self.invalid_executors(f"{executor_name}") logger.error( f"executor - {executor} has invalid slurm cluster - {slurm_executor[executor]['cluster']}. Please select one of the following slurm clusters: {slurm.clusters}" ) continue self.valid_executors[executor_type][executor_name] = { "setting": slurm_executor[executor] }
[docs] def _validate_cobalt_executors(self): """Validate cobalt queue property by running ```qstat -Ql <queue>``. If its a non-zero exit code then queue doesn't exist otherwise it is a valid queue. """ cobalt_executor = deep_get(self.target_config, "executors", "cobalt") if not cobalt_executor: return executor_type = "cobalt" cobalt = Cobalt() if not cobalt.active(): return for executor in cobalt_executor: executor_name = f"{self.name()}.{executor_type}.{executor}" if self.is_executor_disabled(cobalt_executor[executor]): self.disabled_executors.append(executor_name) continue queue = cobalt_executor[executor].get("queue") # if queue property defined in cobalt executor name check if it exists if queue not in cobalt.queues: logger.error( f"Cobalt queue '{queue}' does not exist. Available Cobalt queues: {cobalt.queues} " ) continue self.valid_executors[executor_type][executor_name] = { "setting": cobalt_executor[executor] }
[docs] def _validate_pbs_executors(self): """Validate pbs queue property by running by checking if queue is found and queue is 'enabled' and 'started' which are two properties found in pbs queue configuration that can be retrieved using ``qstat -Q -f -F json``. The output is in the following format .. code-block:: console $ qstat -Q -f -F json { "timestamp":1615924938, "pbs_version":"19.0.0", "pbs_server":"pbs", "Queue":{ "workq":{ "queue_type":"Execution", "total_jobs":0, "state_count":"Transit:0 Queued:0 Held:0 Waiting:0 Running:0 Exiting:0 Begun:0 ", "resources_assigned":{ "mem":"0kb", "ncpus":0, "nodect":0 }, "hasnodes":"True", "enabled":"True", "started":"True" } } } """ pbs_executor = deep_get(self.target_config, "executors", "pbs") if not pbs_executor: return executor_type = "pbs" pbs = PBS() if not pbs.active(): return for executor in pbs_executor: executor_name = f"{self.name()}.{executor_type}.{executor}" if self.is_executor_disabled(pbs_executor[executor]): self.disabled_executors.append(executor_name) continue queue = pbs_executor[executor].get("queue") if queue not in pbs.queues: self.invalid_executors.append(executor_name) logger.error( f"PBS queue - '{queue}' not in list of available queues: {pbs.queues} " ) continue if ( pbs.queue_summary["Queue"][queue]["enabled"] != "True" or pbs.queue_summary["Queue"][queue]["started"] != "True" ): self.invalid_executors.append(executor_name) logger.info("Queue configuration") logger.info(json.dumps(pbs.queue_summary, indent=2)) logger.error( f"[{self.file}]: '{queue}' not 'enabled' or 'started' properly." ) continue self.valid_executors[executor_type][executor_name] = { "setting": pbs_executor[executor] }