#!/usr/bin/env python # # 20160826 # Wirawan Purwanto # # A tool that dumps every possibly imaginable info I want to get from # a SGE-managed cluster. """ This is a tool that dumps every possible imaginable info I want to get from a SGE-managed cluster. This tool runs at user-level, so can only gather information that an ordinary user can mine from the cluster. Currently the info available for dumping are: - cpufreq - lscpu - lspci - free memory (the `free` command) - uname - dmesg - mount - df Typical workflow ---------------- As a starter, use routine `Gather_all` to gather all information bits from the compute nodes. This is an expensive gather operation; it may take a while to complete. There is a tool called `test_accessible_hosts` to read the list of nodes from `qhost` SGE command, then checks the availability of every node by performing ssh into each one. Analysis: CPU variety --------------------- To summarize the kinds of CPUs available on the compute nodes, as well as listing the nodes that have them, use `summarize_cpu` and `print_summarize_cpu`. This tool requires that the output of `gather_cpuinfo` has been saved to `cluster-info/$HOSTNAME/cpuinfo.txt` files`, where $HOSTNAME stands for the host basename (without domain qualifier) for every compute node. The routine that does them all is `analyze_cpu_composition`. """ import os import re import subprocess import sys def pipe_out(args, split=False, shell=False): """Executes a shell command, piping out the stdout to python for parsing. This is my customary shortcut for backtick operator. The result is either a single string (if split==False) or a list of strings with EOLs removed (if split==True).""" retval = subprocess.Popen(args, stdout=subprocess.PIPE, shell=shell).communicate()[0] if not split: return retval else: return retval.splitlines() class pipe_in(object): """Executes a shell command, piping in the stdin from python for driving. This is the reverse of pipe_out. Commands are given through file-like write() or writelines() methods.""" def __init__(self, args, shell=False): self.px = subprocess.Popen(args, stdin=subprocess.PIPE, shell=shell) self.args = args def write(self, line): self.px.stdin.write(line) def writelines(self, lines): for line in lines: self.write(line) def flush(self): self.px.stdin.flush() def close(self): self.px.stdin.close() def errchk(cmd, args, retcode): """Checking for error after the invocation of an external command.""" if retcode == 0: return print >>sys.stderr, "Error executing ", cmd, " ".join(args) if retcode < 0: err = "Command %s was terminated by signal %d" % (cmd, -retcode) else: err = "Command %s returned %d" % (cmd, retcode) raise RuntimeError, err class sh(object): @staticmethod def run(prg, args): retcode = subprocess.call((prg,) + tuple(args)) errchk(prg, args, retcode) return 0 _g = globals() _g.setdefault("NODE_LIST", []) #_g.setdefault("NODE_BAD_LIST", set()) _g.setdefault("NODE_BAD_LIST", []) _g.setdefault("NODE_GOOD_LIST", []) _g.setdefault("ROOT_DIR", "cluster-info") def get_node_list(): """Reads node list from SGE configuration.""" node_list = pipe_out(("qconf", "-sel"), split=True) return node_list def node_list(): global NODE_LIST if not NODE_LIST: NODE_LIST = get_node_list() return NODE_LIST def rhost_pipe_out(host, cmdline, split=False): cmdline_full = ["ssh", "-o", "PreferredAuthentications=publickey", host] \ + (list(cmdline) if not isinstance(cmdline, basestring) else cmdline.split()) rslt = pipe_out(cmdline_full, split=split) return rslt def rhost_run(host, cmdline): cmdline_full = ["ssh", "-o", "PreferredAuthentications=publickey", host] \ + (list(cmdline) if not isinstance(cmdline, basestring) else cmdline.split()) rslt = sh.run(cmdline_full[0], cmdline_full[1:]) return rslt def rhosts_pipe_out(cmdline, filename, hosts=None, rootdir=None): """Executes cmdline on each remote host (the list is given in and """ global ROOT_DIR from os.path import dirname, join, isdir path_join = join Verb = 100 if hosts is None: hosts = node_list() if rootdir is None: rootdir = ROOT_DIR for H in hosts: host_base = H.split(".")[0] outfname = path_join(rootdir, host_base, filename) outdir = dirname(outfname) if not isdir(outdir): os.makedirs(outdir) if Verb >= 1: print(" exec: %s %s" % (H, cmdline)) out = rhost_pipe_out(H, cmdline, split=False) with open(outfname, "w") as F: F.write(out) def test_accessible_hosts(hosts=None): """Tests ssh connectivity for all the hosts and return a two-tuple containing lists of good and inaccessible hosts, respectively.""" from os.path import dirname, join, isdir path_join = join Verb = 100 if hosts is None: hosts = node_list() good_hosts = [] bad_hosts = [] for H in hosts: host_base = H.split(".")[0] msg_send = "Success login from host " + host_base msg_recv = rhost_pipe_out(H, ("echo", msg_send)) if msg_send == msg_recv.rstrip(): good_hosts.append(H) else: bad_hosts.append(H) return good_hosts, bad_hosts def cpuinfo_extract_processor_names(fn, ht=False): # REFS: # https://access.redhat.com/discussions/480953 """Extracts the names of processors from /proc/cpuinfo. Returns it as a list of processor names. WARNING: Hyperthreading is detected with a lame methodology, and only half of the number of cores are reported (i.e. only physical cores)""" A = [] siblings_on_socket = None cores_on_socket = None with open(fn, "r") as F: for L in F: if L.startswith("model name"): modelname = L.split(":", 1)[1].strip() A.append(modelname) elif L.startswith("siblings"): siblings_on_socket = int(L.split(":", 1)[1].strip()) elif L.startswith("cpu cores"): cores_on_socket = int(L.split(":", 1)[1].strip()) #print "siblings: ", siblings_on_socket #print "cores: ", cores_on_socket # FIXME: Quick-and-dirty solution for hyperthreading; # see Red Hat site above; not 100% reliable if there are several # kinds of CPU models, which I don't think I'll ever encountered. if (not ht) \ and siblings_on_socket is not None \ and cores_on_socket is not None \ and siblings_on_socket != cores_on_socket: assert cores_on_socket*2 == siblings_on_socket # ^^otherwise it's not Hyperthreading, the code has to be fixed! A = A[0:len(A)/2] ### HACK!!! print("Warning: hyperthreading detected in %s" % fn) return A def agg_count_names(namelist): """Aggregates the names in namelist to names->count mapping, as a dict. Useful, e.g. for counting number of unique elements in a list. """ A = {} for C in namelist: try: A[C] = A[C] + 1 except KeyError: A[C] = 1 return A # Below are the main gather tools def gather_cpuinfo(hosts=None): """Gather tool: for cpuinfo""" rhosts_pipe_out(("cat", "/proc/cpuinfo"), "cpuinfo.txt", hosts=hosts) def gather_lscpu(hosts=None): """Gather tool: for lscpu""" rhosts_pipe_out(("lscpu"), "lscpu.txt", hosts=hosts) def gather_lspci(hosts=None): """Gather tool: for lspci""" rhosts_pipe_out(("lspci"), "lspci.txt", hosts=hosts) def gather_free(hosts=None): """Gather tool: for free""" rhosts_pipe_out(("free"), "free.txt", hosts=hosts) def gather_uname_a(hosts=None): """Gather tool: for free""" rhosts_pipe_out(("uname", "-a"), "uname-a.txt", hosts=hosts) def gather_dmesg(hosts=None): """Gather tool: for dmesg""" rhosts_pipe_out(("dmesg",), "dmesg.txt", hosts=hosts) def gather_mount(hosts=None): """Gather tool: for mount points""" rhosts_pipe_out(("mount",), "mount.txt", hosts=hosts) def gather_df(hosts=None): """Gather tool: for disk free""" rhosts_pipe_out(("df",), "df.txt", hosts=hosts) #def dict_str_sorted(d): # return "{" + ", ". def summarize_cpu(hosts=None): from pprint import pformat global ROOT_DIR hosts_base = [ H.split(".")[0] for H in hosts ] getfile = lambda H, bn: os.path.join(ROOT_DIR, H, bn) cpu_info = [] px_hosts_by_type = {} for H in hosts_base: px_names = cpuinfo_extract_processor_names(getfile(H, "cpuinfo.txt")) px_group = agg_count_names(px_names) #print("%s : %s" % (H, px_group)) px_group_key = pformat(px_group) # use pretty representation try: px_hosts_by_type[px_group_key]["hosts"] += [ H ] except KeyError: px_hosts_by_type[px_group_key] = { "cpu_count": px_group, "hosts": [ H ] } return px_hosts_by_type def print_summarize_cpu(summary): host_types = sorted(summary.keys()) nproc_grand_total = 0 nnode_grand_total = 0 for T in host_types: rec = summary[T] nproc_per_node = sum(rec["cpu_count"].values()) print("%s:: %d hosts, %d procs/node, total %d procs" \ % (T, len(rec["hosts"]), nproc_per_node, len(rec["hosts"]) * nproc_per_node, )) print("") print(" " + " ".join(sorted(rec["hosts"]))) print("") nproc_grand_total += len(rec["hosts"]) * nproc_per_node nnode_grand_total += len(rec["hosts"]) print("Grand total %d procs" % nproc_grand_total) print("Grand total %d nodes" % nnode_grand_total) def tally_summarize_cpu(summary): """Tallies up the total number of processors """ def analyze_cpu_composition(): """Performs analysis of the CPU composition of an SGE cluster. Automatically queries the up (available) nodes and gathers the cpuinfo, if it is necessary. """ global NODE_GOOD_LIST, NODE_BAD_LIST global ROOT_DIR getfile = lambda H, bn: os.path.join(ROOT_DIR, H.split('.')[0], bn) if len(NODE_GOOD_LIST) == 0: print("Warning: need to test node accesibility...") NODE_GOOD_LIST, NODE_BAD_LIST = test_accessible_hosts() if not os.path.exists(getfile(NODE_GOOD_LIST[0], "cpuinfo.txt")): print("Warning: need to gather cpuinfo...") # Most likely you haven't run gather_cpuinfo then... gather_cpu_info(NODE_GOOD_LIST) summ = summarize_cpu(NODE_GOOD_LIST) print_summarize_cpu(summ) def Gather_all(): """Master gathering routine, to gather everything all at once. It will take some time to gather every bit of information. """ global NODE_GOOD_LIST, NODE_BAD_LIST, NODE_LIST print("Testing node accesibility...") NODE_GOOD_LIST, NODE_BAD_LIST = test_accesible_hosts() print("\nGathering cpuinfo...") gather_cpuinfo(NODE_GOOD_LIST) print("\nGathering lscpu...") gather_lscpu(NODE_GOOD_LIST) print("\nGathering lspci...") gather_lspci(NODE_GOOD_LIST) print("\nGathering free mem...") gather_free(NODE_GOOD_LIST) print("\nGathering uname...") gather_uname_a(NODE_GOOD_LIST) print("\nGathering dmesg...") gather_dmesg(NODE_GOOD_LIST) print("\nGathering mount...") gather_mount(NODE_GOOD_LIST) print("\nGathering df...") gather_df(NODE_GOOD_LIST)