Source code for buildtest.executors.poll

import time

from buildtest.defaults import console
from rich.table import Table


[docs]class PollQueue: def __init__(self, builders, interval, buildexecutor): self.builders = builders self.interval = interval self.buildexecutor = buildexecutor self._completed = set() self._cancelled = set() self._pending = [] # only add builders that are batch jobs for builder in self.builders: # returns True if attribute builder.job is an instance of class Job if builder.is_batch_job(): self._pending.append(builder)
[docs] def cancelled(self): """Return a list of cancelled builders""" return self._cancelled
[docs] def completed(self): return self._completed
[docs] def sleep(self): time.sleep(self.interval)
[docs] def poll(self): """Poll all until all jobs are complete. At each poll interval, we poll each builder job state. If job is complete or failed we remove job from pending queue. In each interval we sleep and poll jobs until there is no pending jobs.""" while self._pending: print(f"Polling Jobs in {self.interval} seconds") self.sleep() # store list of cancelled and completed job at each interval cancelled = [] completed = [] for builder in self._pending: # get executor instance for corresponding builder. This would be one of the following: SlurmExecutor, PBSExecutor, LSFExecutor, CobaltExecutor executor = self.buildexecutor.get(builder.executor) # if builder is local executor we shouldn't be polling so we set job to # complete and return executor.poll(builder) if builder.is_complete(): completed.append(builder) elif builder.is_failure(): cancelled.append(builder) # remove completed jobs from pending queue if completed: for builder in completed: self._pending.remove(builder) self._completed.add(builder) # remove cancelled jobs from pending queue if cancelled: for builder in cancelled: self._pending.remove(builder) self._cancelled.add(builder) self.print_pending_jobs()
[docs] def print_pending_jobs(self): """Print pending jobs in table format during each poll step""" table = Table( "[blue]Builder", "[blue]executor", "[blue]JobID", "[blue]JobState", "[blue]runtime", title="Pending Jobs", ) for builder in self._pending: table.add_row( str(builder), builder.executor, builder.job.get(), builder.job.state(), str(builder.timer.duration()), ) console.print(table)
[docs] def print_polled_jobs(self): if not self._completed: return table = Table( "[blue]Builder", "[blue]executor", "[blue]JobID", "[blue]JobState", "[blue]runtime", title="Completed Jobs", ) for builder in self._completed: table.add_row( str(builder), builder.executor, builder.job.get(), builder.job.state(), str(builder.metadata["result"]["runtime"]), ) console.print(table)