Source code for buildtest.tools.cpu

import contextlib
import glob
import os
import platform
import re
import subprocess

import archspec.cpu
import psutil


[docs] def _bits_from_str(mask_s): """Return the set bits from a string representing a bit array.""" bits = [] mask = int(mask_s, 0) pos = 0 while mask: if mask & 1: bits.append(pos) pos += 1 mask >>= 1 return bits
[docs] def _str_from_bits(bits): """Return a string representation of a bit array with ``bits`` set.""" ret = 0 for b in bits: ret |= 1 << b return hex(ret).lower()
[docs] def _sysfs_topo(): cache_units = {"K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024} cpuinfo = {"topology": {}} cpu_dirs = glob.glob(r"/sys/devices/system/cpu/cpu[0-9]*") nodes = glob.glob(r"/sys/devices/system/node/node[0-9]*") cores = set() for cpu in cpu_dirs: core_cpus_path = os.path.join(cpu, "topology/core_cpus") thread_siblings_path = os.path.join(cpu, "topology/thread_siblings") if glob.glob(core_cpus_path): cores_path = core_cpus_path elif glob.glob(thread_siblings_path): cores_path = thread_siblings_path else: # Information cannot be retrieved continue with contextlib.suppress(IOError): with open(cores_path) as fp: core_cpus = fp.read() core_cpus = re.sub(r"[\s,]", "", core_cpus) core_cpus = f"0x{core_cpus.lower()}" cores.add(core_cpus) sockets = set() for cpu in cpu_dirs: package_cpus_path = os.path.join(cpu, "topology/package_cpus") core_siblings_path = os.path.join(cpu, "topology/core_siblings") if glob.glob(package_cpus_path): sockets_path = package_cpus_path elif glob.glob(core_siblings_path): sockets_path = core_siblings_path else: # Information cannot be retrieved continue with contextlib.suppress(IOError): with open(sockets_path) as fp: package_cpus = fp.read() package_cpus = re.sub(r"[\s,]", "", package_cpus) package_cpus = f"0x{package_cpus.lower()}" sockets.add(package_cpus) numa_nodes = [] for node in nodes: with contextlib.suppress(IOError): with open(os.path.join(node, "cpumap")) as fp: cpumap = fp.read() cpumap = re.sub(r"[\s,]", "", cpumap) cpumap = f"0x{cpumap.lower()}" numa_nodes.append(cpumap) numa_nodes.sort() caches = {} for cpu in cpu_dirs: cache_dirs = glob.glob(cpu + r"/cache/index[0-9]*") for cache in cache_dirs: cache_level = 0 cache_size = 0 cache_linesize = 0 cache_associativity = 0 cache_cpuset = "" with contextlib.suppress(IOError): with open(os.path.join(cache, "level")) as fp: cache_level = int(fp.read()) with contextlib.suppress(IOError): # Skip L1 instruction cache with open(os.path.join(cache, "type")) as fp: if cache_level == 1 and fp.read() == "Instruction\n": continue with contextlib.suppress(IOError): with open(os.path.join(cache, "size")) as fp: cache_size = fp.read() m = re.match(r"(?P<val>\d+)(?P<unit>\S)", cache_size) if m: value = int(m.group("val")) unit = cache_units.get(m.group("unit"), 1) cache_size = value * unit with contextlib.suppress(IOError): with open(os.path.join(cache, "coherency_line_size")) as fp: cache_linesize = int(fp.read()) # Don't take the associativity directly from # "ways_of_associativity" file because some archs (ia64, ppc) # put 0 there when fully-associative, while others (x86) # put something like -1. with contextlib.suppress(IOError): with open(os.path.join(cache, "number_of_sets")) as fp: cache_number_of_sets = int(fp.read()) with open(os.path.join(cache, "physical_line_partition")) as fp: cache_physical_line_partition = int(fp.read()) if ( cache_linesize and cache_physical_line_partition and cache_number_of_sets ): cache_associativity = ( cache_size // cache_linesize // cache_physical_line_partition // cache_number_of_sets ) with contextlib.suppress(IOError): with open(os.path.join(cache, "shared_cpu_map")) as fp: cache_cpuset = fp.read() cache_cpuset = re.sub(r"[\s,]", "", cache_cpuset) cache_cpuset = f"0x{cache_cpuset.lower()}" num_cpus = len(_bits_from_str(cache_cpuset)) caches.setdefault( ( cache_level, cache_size, cache_linesize, cache_associativity, num_cpus, ), set(), ) caches[ (cache_level, cache_size, cache_linesize, cache_associativity, num_cpus) ].add(cache_cpuset) num_cpus = len(cpu_dirs) num_cores = len(cores) num_sockets = len(sockets) num_cpus_per_core = num_cpus // num_cores if num_cores else 0 num_cpus_per_socket = num_cpus // num_sockets if num_sockets else 0 # Fill in the cpuinfo cpuinfo["num_cpus"] = num_cpus cpuinfo["num_cpus_per_core"] = num_cpus_per_core cpuinfo["num_cpus_per_socket"] = num_cpus_per_socket cpuinfo["num_sockets"] = num_sockets cpuinfo["topology"]["numa_nodes"] = numa_nodes cpuinfo["topology"]["sockets"] = sorted(list(sockets)) cpuinfo["topology"]["cores"] = sorted(list(cores)) cpuinfo["topology"]["caches"] = [] for cache_type, cpusets in caches.items(): (cache_level, cache_size, cache_linesize, cache_associativity, num_cpus) = ( cache_type ) c = { "type": f"L{cache_level}", "size": cache_size, "linesize": cache_linesize, "associativity": cache_associativity, "num_cpus": num_cpus, "cpusets": sorted(list(cpusets)), } cpuinfo["topology"]["caches"].append(c) return cpuinfo
[docs] def _sysctl_topo(): exec_output = subprocess.run( ["sysctl hw machdep.cpu"], shell=True, capture_output=True ) exec_output = exec_output.stdout.decode() # extract stdout and convert to string since its in bytes cpuinfo = {"topology": {}} match = re.search(r"hw\.ncpu: (?P<num_cpus>\d+)", exec_output) if match: num_cpus = int(match.group("num_cpus")) match = re.search(r"hw\.physicalcpu: (?P<num_cores>\d+)", exec_output) if match: num_cores = int(match.group("num_cores")) match = re.search(r"hw\.packages: (?P<num_sockets>\d+)", exec_output) if match: num_sockets = int(match.group("num_sockets")) cpuinfo["num_sockets"] = num_sockets match = re.search(r"hw\.cacheconfig:(?P<cacheconfig>(\s\d+)*)", exec_output) if match: cacheconfig = list(map(int, match.group("cacheconfig").split())) match = re.search(r"hw\.cachesize:(?P<cachesize>(\s\d+)*)", exec_output) if match: cachesize = list(map(int, match.group("cachesize").split())) match = re.search(r"hw\.cachelinesize: (?P<linesize>\d+)", exec_output) if match: linesize = int(match.group("linesize")) # index 0 is referring to memory cache_associativity = [0] for i in range(1, len(cachesize)): if cachesize[i] == 0: break match = re.search( rf"machdep\.cpu\.cache\.L{i}_associativity: " rf"(?P<associativity>\d+)", exec_output, ) assoc = int(match.group("associativity")) if match else 0 cache_associativity.append(assoc) num_cpus_per_socket = num_cpus // num_sockets num_cpus_per_core = num_cpus // num_cores # Fill in the cpuinfo cpuinfo["num_cpus"] = num_cpus cpuinfo["num_cpus_per_socket"] = num_cpus_per_socket cpuinfo["num_cpus_per_core"] = num_cpus_per_core cpuinfo["topology"]["numa_nodes"] = [_str_from_bits(range(num_cpus))] cpuinfo["topology"]["sockets"] = [ _str_from_bits(range(start, start + num_cpus_per_socket)) for start in range(0, num_cpus, num_cpus_per_socket) ] cpuinfo["topology"]["cores"] = [ _str_from_bits(range(start, start + num_cpus_per_core)) for start in range(0, num_cpus, num_cpus_per_core) ] cpuinfo["topology"]["caches"] = [] for i in range(1, len(cache_associativity)): t = { "type": f"L{i}", "size": cachesize[i], "linesize": linesize, "associativity": cache_associativity[i], "num_cpus": cacheconfig[i], "cpusets": [ _str_from_bits(range(start, start + cacheconfig[i])) for start in range(0, num_cpus, cacheconfig[i]) ], } cpuinfo["topology"]["caches"].append(t) return cpuinfo
[docs] def cpuinfo(): """Return CPU information using archspec and psutil library""" cpu_details = {} cpu_details["arch"] = archspec.cpu.host().name cpu_details["vendor"] = archspec.cpu.host().vendor cpu_details["model"] = archspec.cpu.brand_string() cpu_details["platform"] = platform.machine() cpu_details["cpu"] = psutil.cpu_count(logical=False) cpu_details["vcpu"] = psutil.cpu_count(logical=True) cpu_details["virtualmemory"] = {} cpu_details["virtualmemory"]["used"] = round( psutil.virtual_memory().used / (1024 * 1024), 2 ) cpu_details["virtualmemory"]["total"] = round( psutil.virtual_memory().total / (1024 * 1024), 2 ) cpu_details["virtualmemory"]["available"] = round( psutil.virtual_memory().available / (1024 * 1024), 2 ) cpu_details["virtualmemory"]["free"] = round( psutil.virtual_memory().free / (1024 * 1024), 2 ) # Try first to get information from the filesystem if os.path.isdir("/sys"): topology = _sysfs_topo() cpu_details.update(topology) else: topology = _sysctl_topo() cpu_details.update(topology) return cpu_details