Source code for buildtest.scheduler.detection

import json
import logging
import re

from buildtest.exceptions import BuildTestError
from buildtest.utils.command import BuildTestCommand
from buildtest.utils.tools import check_binaries


[docs]class Scheduler: """This is a base Scheduler class used for implementing common methods for detecting Scheduler details. The subclass implements specific queries that are scheduler specific. """ logger = logging.getLogger(__name__) binaries = [] def __init__(self, custom_dirs=None): self.logger = logging.getLogger(__name__) self.sched_cmds = check_binaries(self.binaries, custom_dirs=custom_dirs) if self.sched_cmds: self._queues = self.get_queues()
[docs] def queues(self): return self._queues
[docs] def active(self): """Returns ``True`` if buildtest is able to retrieve queues from Scheduler otherwises returns ``False``""" return self.sched_cmds
[docs] def get_queues(self): """This method is implemented by subclass to return a list of queues for a given scheduler""" raise NotImplementedError
[docs]class Slurm(Scheduler): """The Slurm class implements common functions to query Slurm cluster including partitions, qos, cluster. We check existence of slurm binaries in $PATH and return if slurm cluster is in valid state. """ # specify a set of Slurm commands to check for file existence binaries = ["sbatch", "sacct", "sacctmgr", "sinfo", "scancel", "scontrol"] def __init__(self, custom_dirs=None): self.logger = logging.getLogger(__name__) self.sched_cmds = check_binaries(self.binaries, custom_dirs=custom_dirs) # retrieve slurm partitions, qos, and cluster only if slurm is detected. if self.sched_cmds: self._partitions = self._get_partitions() self._clusters = self._get_clusters() self._qos = self._get_qos()
[docs] def partitions(self): return self._partitions
[docs] def clusters(self): return self._clusters
[docs] def qos(self): return self._qos
[docs] def run_command(self, query): """Run a command and return output as list of lines""" cmd = BuildTestCommand(query) cmd.execute() out = cmd.get_output() self.logger.debug(f"Running command: {query}") return [item.rstrip() for item in out]
[docs] def _get_partitions(self): """Get list of all partitions slurm partitions using ``sinfo -a -h -O partitionname``. The output is a list of queue names .. code-block:: console $ sinfo -a -h -O partitionname system system_shared debug_hsw debug_knl jupyter """ # get list of partitions return self.run_command(f"{self.sched_cmds['sinfo']} -a -h -O partitionname")
[docs] def _get_clusters(self): """Get list of slurm clusters by running ``sacctmgr list cluster -P -n format=Cluster``. The output is a list of slurm clusters something as follows .. code-block:: console $ sacctmgr list cluster -P -n format=Cluster cori escori """ return self.run_command( f"{self.sched_cmds['sacctmgr']} list cluster -P -n format=Cluster" )
[docs] def _get_qos(self): """Retrieve a list of all slurm qos by running ``sacctmgr list qos -P -n format=Name``. The output is a list of qos. Shown below is an example output .. code-block:: console $ sacctmgr list qos -P -n format=Name normal premium low serialize scavenger """ return self.run_command( f"{self.sched_cmds['sacctmgr']} list qos -P -n format=Name" )
[docs] def validate_partition(self, slurm_executor): """Validate the partition for a given executor. Args: slurm_executor (dict): The configuration of the executor. Returns: bool: True if the partition is valid and in 'up' state, False otherwise. """ # if 'partition' key defined check if its valid partition if slurm_executor["partition"] not in self._partitions: self.logger.error( f"Executor Configuration: {json.dumps(slurm_executor, indent=2)}" ) self.logger.error( f"executor - '{slurm_executor['partition']}' is not a valid partition. Please select one of the following partitions: {self._partitions}" ) return False self.logger.debug(f"Slurm partition: {slurm_executor['partition']} is found.") # check if partition is in 'up' state. If not we raise an error. part_state = self.run_command( f"{self.sched_cmds['sinfo']} -p {slurm_executor['partition']} -h -O available" ) if part_state != "up": self.logger.error( f"partition - {slurm_executor['partition']} is in state: {part_state}. It must be in 'up' state in order to accept jobs" ) return False return True
[docs] def validate_cluster(self, executor, slurm_executor): """This method will validate a cluster for a given executor. If 'cluster' key is defined in slurm executor configuration we will check if cluster is valid, if so we return True otherwise we return False. Args: executor (str): The name of the executor. slurm_executor (dict): The configuration of the executor. """ # check if 'cluster' key is valid slurm cluster cluster = slurm_executor.get("cluster") if cluster is not None and cluster not in self._clusters: self.logger.error( f"Executor Configuration: {json.dumps(slurm_executor, indent=2)}" ) self.logger.error( f"executor - {executor} has invalid slurm cluster - {cluster}. Please select one of the following slurm clusters: {self._clusters}" ) return False self.logger.debug( f"Slurm cluster: {cluster} is found in list of slurm clusters." ) return True
[docs] def validate_qos(self, executor, slurm_executor): """This method will validate a qos for a given executor. If 'qos' key is defined in slurm executor configuration we will check if qos is valid, if so we return True otherwise we return False. Args: executor (str): The name of the executor. slurm_executor (dict): The configuration of the executor. """ # check if 'qos' key is valid slurm qos qos = slurm_executor["qos"] if qos is not None and qos not in self._qos: self.logger.error( f"executor - {executor} has invalid slurm qos - {qos}. Please select one of the following slurm qos: {self._qos}" ) return False self.logger.debug(f"Slurm qos: {qos} is found in list of slurm qos.") return True
[docs]class LSF(Scheduler): """The LSF class checks for LSF binaries and returns a list of LSF queues""" # specify a set of LSF commands to check for file existence binaries = ["bsub", "bqueues", "bkill", "bjobs"]
[docs] def get_queues(self): """Return json dictionary of available LSF Queues and their queue states. The command we run is the following: ``bqueues -o 'queue_name status' -json`` which returns a JSON record of all queue details. .. code-block:: console $ bqueues -o 'queue_name status' -json { "COMMAND":"bqueues", "QUEUES":2, "RECORDS":[ { "QUEUE_NAME":"batch", "STATUS":"Open:Active" }, { "QUEUE_NAME":"test", "STATUS":"Open:Active" } ] } """ query = f"{self.sched_cmds['bqueues']} -o 'queue_name status' -json" cmd = BuildTestCommand(query) cmd.execute() out = cmd.get_output() self.logger.debug(f"Get all LSF Queues by running {query}") # if command returns output, we convert to string and return as json object if out: out = "".join(cmd.get_output()).rstrip() try: queues = json.loads(out) except json.JSONDecodeError: raise BuildTestError( f"Unable to process LSF Queues when running: {query}" ) return queues
[docs] def validate_queue(self, executor): """This method will validate a LSF queue. We check if queue is available and in 'Open:Active' state. The input is a dictionary containing the LSF executor configuration. If queue is not found we return False. Args: executor (dict): The dictionary containing the LSF executor configuration. Returns: bool: True if queue is found and in 'Open:Active' state, False otherwise. """ queue_name = executor["queue"] queue_active_state = "Open:Active" queue_list = [name["QUEUE_NAME"] for name in self._queues["RECORDS"]] if queue_name not in queue_list: return False for record in self._queues["RECORDS"]: # check queue record for Status # skip record until we find matching queue if record["QUEUE_NAME"] != queue_name: continue queue_state = record["STATUS"] # if state not Open:Active we raise error if not queue_state == queue_active_state: self.logger.error( f"'{queue_name}' is in state: {queue_state}. It must be in {queue_active_state} state in order to accept jobs" ) return False return True
[docs]class Cobalt(Scheduler): """The Cobalt class checks for Cobalt binaries and gets a list of Cobalt queues""" # specify a set of Cobalt commands to check for file existence binaries = ["qsub", "qstat", "qdel", "nodelist", "showres", "partlist"]
[docs] def get_queues(self): """Get all Cobalt queues by running ``qstat -Ql`` and parsing output""" query = f"{self.sched_cmds['qstat']} -Ql" cmd = BuildTestCommand(query) cmd.execute() content = cmd.get_output() self.logger.debug(f"Get all Cobalt Queues by running {query}") # remove all None from list content = list(filter(None, content)) queues = [] for line in content: if line.startswith("Name"): name = line.partition(":")[2].strip() queues.append(name) return queues
[docs]class PBS(Scheduler): """The PBS class checks for PBS binaries and gets a list of available queues""" # specify a set of PBS commands to check for file existence binaries = ["qsub", "qstat", "qdel", "qhold", "qmgr"] def __init__(self, custom_dirs=None): self.logger = logging.getLogger(__name__) self.sched_cmds = None self._state = self.check(custom_dirs=custom_dirs) if self._state: self._queues = self.get_queues()
[docs] def active(self): """Return True if PBS Scheduler is detected otherwise return False""" return True if self._state and self._queues else False
[docs] def check(self, custom_dirs=None): """Check if binaries exist in $PATH and run ``qsub --version`` to see output to determine if its OpenPBS scheduler. The return will be a boolean type where ``True`` indicates the check has passed. Output of ``qsub --version`` from OpenPBS scheduler would be as follows, we will search for string `pbs_version` [pbsuser@pbs tmp]$ qsub --version pbs_version = 19.0.0 Args: binaries (list): list of binaries to check for existence in $PATH """ self.sched_cmds = check_binaries(self.binaries, custom_dirs=custom_dirs) if not self.sched_cmds: return False # check output of qsub --version to see if it contains string 'pbs_version' # [pbsuser@pbs tmp]$ qsub --version # pbs_version = 19.0.0 qsub_version = f"{self.sched_cmds['qsub']} --version" cmd = BuildTestCommand(qsub_version) cmd.execute() out = " ".join(cmd.get_output()) self.logger.debug(f"Check PBS version by running {qsub_version} command") self.logger.debug(f"Output of {qsub_version}: {out}") if not out.startswith("pbs_version"): self.logger.debug( f"Cannot find 'pbs_version' in output of {qsub_version}, this is not a OpenPBS Scheduler" ) return False return True
[docs] def get_queues(self): """Get queue configuration using ``qstat -Q -f -F json`` and retrieve a list of queues. Shown below is an example output of ``qstat -Q -f -F json`` .. code-block:: console $ qstat -Q -f -F json { "timestamp":1615924938, "pbs_version":"19.0.0", "pbs_server":"pbs", "Queue":{ "workq":{ "queue_type":"Execution", "total_jobs":0, "state_count":"Transit:0 Queued:0 Held:0 Waiting:0 Running:0 Exiting:0 Begun:0 ", "resources_assigned":{ "mem":"0kb", "ncpus":0, "nodect":0 }, "hasnodes":"True", "enabled":"True", "started":"True" } } } """ query = f"{self.sched_cmds['qstat']} -Q -f -F json" cmd = BuildTestCommand(query) cmd.execute() content = " ".join(cmd.get_output()) self.logger.debug(f"Get PBS Queues details by running '{query}'") try: queue_summary = json.loads(content) except json.JSONDecodeError: raise BuildTestError(f"Unable to process PBS Queues when running: '{query}") self.logger.debug("PBS Queue Configuration") self.logger.debug(json.dumps(queue_summary, indent=2)) queues = list(queue_summary["Queue"].keys()) self.logger.debug( f"The following queues: {queues} are available in PBS Scheduler." ) return queue_summary
[docs] def validate_queue(self, queue_name): """Validate a PBS queue. Return True if queue exists and is enabled and started, False otherwise. Args: queue_name (str): The name of the queue to validate. """ avail_queues = list(self._queues["Queue"].keys()) if queue_name not in avail_queues: self.logger.error( f"PBS queue - '{queue_name}' not in list of available queues: {avail_queues} " ) return False self.logger.debug( f"PBS queue: {queue_name} is found in list of available queues." ) if ( self._queues["Queue"][queue_name]["enabled"] != "True" or self._queues["Queue"][queue_name]["started"] != "True" ): self.logger.info(f"PBS Queue Configuration: {queue_name}") self.logger.info(json.dumps(self._queues["Queue"][queue_name], indent=2)) self.logger.error(f"'{queue_name}' not 'enabled' or 'started' properly.") return False return True
[docs]class Torque(Scheduler): """The Torque class for detecting Torque Scheduler and getting list of queues.""" # specify a set of Torque commands to check for file existence binaries = ["qsub", "qstat", "qdel", "qhold", "qmgr"] def __init__(self, custom_dirs=None): self.logger = logging.getLogger(__name__) self.sched_cmds = None self._state = self.check(custom_dirs=custom_dirs) if self._state: self._queues = self.get_queues()
[docs] def active(self): """Return True if Torque Scheduler is detected otherwise return False""" return True if self._state and self._queues else False
[docs] def check(self, custom_dirs=None): """Check if binaries exist in $PATH and run ``qsub --version`` to see output if its Torque Scheduler. The return will be a boolean type where ``True`` indicates the check has passed. Output from ``qsub --version`` from Torque scheduler would be as follows, we will search for `Commit:` in output to distinguish Torque from OpenPBS .. code-block:: console $ qsub --version Version: 7.0.1 Commit: b405f8c22d41d29cbf9b9016bc1146bf4559e895 Args: binaries (list): list of binaries to check for existence in $PATH """ self.sched_cmds = check_binaries(self.binaries, custom_dirs=custom_dirs) if not self.sched_cmds: return False # check output of qsub --version to see if it contains 'Commit:' # (buildtest) adaptive50@e4spro-cluster:$ qsub --version # Version: 7.0.1 # Commit: b405f8c22d41d29cbf9b9016bc1146bf4559e895 qsub_version = f"{self.sched_cmds['qsub']} --version" cmd = BuildTestCommand(qsub_version) cmd.execute() # output goes to error stream content = " ".join(cmd.get_error()) self.logger.debug(f"Check Torque version by running {qsub_version} command") self.logger.debug(f"Output of {qsub_version}: %s", content) self.logger.debug(f"Check if 'Commit:' exists in output of {qsub_version}") match = re.search(r"Commit:\s*(.*)$", content, re.MULTILINE) if match: self.logger.debug( f"Found 'Commit:' in output of {qsub_version}, this must be a Torque Scheduler" ) return True return False
[docs] def get_queues(self): """Get queue configuration using 'qstat -Qf' and parse the output into a JSON dictionary. The output of this command will be as follows .. code-block:: console $ qstat -Qf Queue: lbl-cluster queue_type = Execution total_jobs = 0 state_count = Transit:0 Queued:0 Held:0 Waiting:0 Running:0 Exiting:0 Complete:0 resources_default.nodes = 1 resources_default.walltime = 24:00:00 mtime = 1711400391 enabled = True started = True """ query = f"{self.sched_cmds['qstat']} -Qf" cmd = BuildTestCommand(query) cmd.execute() self.logger.debug(f"Get Torque Queues details by running '{query}'") output = " ".join(cmd.get_output()) self.logger.debug(f"Output of {query}: {output}") self.logger.debug(f"Parse output of {query} to get queue details") queues = {} current_queue = None for line in output.split("\n"): if line.startswith("Queue:"): current_queue = line.split()[1] queues[current_queue] = {} continue if "=" in line: key, value = [x.strip() for x in line.split("=", 1)] queues[current_queue][key] = value if queues: self.logger.debug("Torque Queue Configuration") self.logger.debug(json.dumps(queues, indent=2)) self.logger.debug(f"List of available Queues: {list(queues.keys())}") return queues
[docs] def validate_queue(self, torque_executor): """This method will validate queue for a given executor. We will check if queue is available and check queue configuration to see if queue is enabled and started properly. """ queue = torque_executor["queue"] if queue not in self._queues: self.logger.error( f"Torque queue - '{queue}' not in list of available queues: {list(self._queues)} " ) return False self.logger.debug( f"Torque queue: {queue} is found in list of available queues." ) if ( self._queues[queue]["enabled"] != "True" or self._queues[queue]["started"] != "True" ): self.logger.debug(f"Torque Queue Configuration: {queue}") self.logger.debug(json.dumps(self._queues[queue], indent=2)) self.logger.error(f"Queue '{queue}' not 'enabled' or 'started' properly.") return False return True