Skip to content
Snippets Groups Projects
Commit 3b8aff1b authored by Jakub Beránek's avatar Jakub Beránek
Browse files

initial commit

parents
Branches
No related tags found
No related merge requests found
Pipeline #22587 failed
LICENSE 0 → 100644
MIT License
Copyright (c) 2021, Jakub Beránek
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
# Cluster
Utilities for managing processes and monitoring on clusters.
dacite==1.6.0
dataclasses==0.6
click==8.0.1
psutil==5.8.0
pandas==1.3.3
bokeh==2.4.0
tornado==6.1
setup.py 0 → 100644
import os
from setuptools import find_packages, setup
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
def read(fname):
return open(os.path.join(ROOT_DIR, fname)).read()
with open("requirements.txt") as reqs:
requirements = [line.strip() for line in reqs.readlines()]
setup(
name="cluster",
version="0.0.1",
author="Jakub Beránek",
description="Cluster utilities",
license="MIT",
packages=find_packages(),
install_requires=requirements,
long_description=read("README.md"),
entry_points={
"console_scripts": [
"cluster-monitor = src.scripts.monitor:main",
],
},
)
import dataclasses
import json
import logging
import os
import socket
import subprocess
from multiprocessing import Pool
from typing import Dict, Iterator, List, Optional, Tuple
import dacite
HOSTNAME = socket.gethostname()
@dataclasses.dataclass
class Process:
# Command used to launch the process
cmd: str
pid: int
key: Optional[str] = None
attrs: Dict = dataclasses.field(default_factory=lambda: {})
@dataclasses.dataclass
class Node:
processes: List[Process]
class Cluster:
"""
Stores information about processes running on a cluster.
The information about the cluster can be serialized and deserialized and the processes
may be later killed (even when running on a remote node, if it's accessible by SSH).
"""
@staticmethod
def deserialize(file) -> "Cluster":
data = json.load(file)
nodes = {k: dacite.from_dict(Node, v) for (k, v) in data["nodes"].items()}
return Cluster(data["workdir"], nodes)
def __init__(self, workdir: str, nodes: Optional[Dict[str, Node]] = None):
if nodes is None:
nodes = {}
self.workdir = workdir
self.nodes = nodes
def add(self, node: str, pid: int, cmd: str, key: Optional[str] = None, **kwargs) -> Process:
process = Process(
cmd=cmd,
pid=pid,
key=key,
attrs=kwargs
)
if node not in self.nodes:
self.nodes[node] = Node(processes=[])
self.nodes[node].processes.append(process)
return process
def processes(self) -> Iterator[Tuple[str, Process]]:
for (address, node) in self.nodes.items():
for process in node.processes:
yield (address, process)
def kill(self, kill_fn=None):
if kill_fn is None:
kill_fn = lambda node, process: kill_process(node, process.pid)
with Pool() as pool:
pool.map(kill_process_pool,
[(kill_fn, node, process) for (node, process) in self.processes()])
def get_processes(self, key: str = None, node: str = None):
for (address, process) in self.processes():
if key is not None and process.key != key:
continue
if node is not None and address != node:
continue
yield (node, process)
def serialize(self, file):
json.dump({
"workdir": self.workdir,
"nodes": {address: dataclasses.asdict(node) for (address, node) in
self.nodes.items()}
}, file, indent=2)
def __repr__(self):
out = f"Workdir: {self.workdir}\n"
out += "Nodes:\n"
for (address, node) in self.nodes.items():
out += f"{address}: {node}\n"
return out
def is_local(host: str) -> bool:
"""
Returns true if the given `host` is the local computer.
"""
return host == HOSTNAME or host == "localhost" or host == socket.gethostbyname(HOSTNAME)
def start_process(
commands: List[str],
host: Optional[str] = None,
workdir: Optional[str] = None,
modules: Optional[List[str]] = None,
name: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
pyenv: Optional[str] = None,
init_cmd: Optional[List[str]] = None
):
"""
Start a process on the given `host`.
:param commands: List of commands to run.
:param host: Hostname where to start the process.
:param workdir: Working directory of the process.
:param modules: LMOD modules to load on the host.
:param name: Name (used for stdout/stderr files in the working directory).
:param env: Environment variables passed to the process.
:param pyenv: Python virtual environment that should be sourced by the process.
:param init_cmd: Initialization commands performed at the start of the process.
"""
if not workdir:
workdir = os.getcwd()
workdir = os.path.abspath(workdir)
init_cmd = init_cmd if init_cmd is not None else init_cmd
init_cmd = list(init_cmd)
if modules is not None:
init_cmd += [f"ml {' '.join(modules)}"]
if pyenv:
assert os.path.isabs(pyenv)
init_cmd += [f"source {pyenv}/bin/activate"]
args = []
if env:
args += ["env"]
for (key, val) in env.items():
args += [f"{key}={val}"]
args += [str(cmd) for cmd in commands]
if not name:
name = "process"
output = os.path.join(workdir, name)
stdout_file = f"{output}.out"
stderr_file = f"{output}.err"
command = f"""
cd {workdir} || exit 1
{' && '.join(f"{{ {cmd} || exit 1; }}" for cmd in init_cmd)}
ulimit -c unlimited
{' '.join(args)} > {stdout_file} 2> {stderr_file} &
ps -ho pgid $!
""".strip()
cmd_args = []
if host:
cmd_args += ["ssh", host]
else:
cmd_args += ["setsid"]
cmd_args += ["/bin/bash"]
process = subprocess.Popen(cmd_args, cwd=workdir, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stdin=subprocess.PIPE)
out, err = process.communicate(command.encode())
pid = out.strip()
if not pid:
logging.error(
f"Process startup failed with status: {process.returncode}, stderr: {err.decode()}, stdout: {out.decode()}")
if os.path.isfile(stderr_file):
with open(stderr_file) as f:
logging.error("".join(f.readlines()))
raise Exception(f"Process startup failed on {host if host else 'localhost'}: {command}")
return (int(pid), command)
def kill_process_pool(args):
kill_fn, node, process = args
kill_fn(node, process)
def kill_process(host: str, pid: int, signal="TERM"):
"""
Kill a process with the given `pid` on the specified `host`
:param host: Hostname where the process is located.
:param pid: PID of the process to kill.
:param signal: Signal used to kill the process. One of "TERM", "KILL" or "INT".
"""
assert signal in ("TERM", "KILL", "INT")
logging.debug(f"Killing PGID {pid} on {host}")
args = ["kill", f"-{signal}", f"-{pid}"]
if not is_local(host):
args = ["ssh", host, "--"] + args
res = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if res.returncode != 0:
logging.error(
f"error: {res.returncode} {res.stdout.decode().strip()} {res.stderr.decode().strip()}")
return False
return True
import json
import logging
import signal
import time
from typing import List
def dump_records(path: str, records: List):
if records:
with open(path, "a") as f:
for record in records:
json.dump(record, f)
f.write("\n")
records.clear()
def measure_and_store(capture_interval: int, dump_interval: int, file: str, capture_fn, finish_fn):
"""
Runs an endless loop that will repeatedly call `capture_fn` and store its outputs into the
given `file`.
The process will end upon receiving SIGINT.
:param capture_interval: How often to call `capture_fn`. [seconds]
:param dump_interval: How often to dump data to disk. [seconds]
:param file: Path where data will be collected.
:param capture_fn: Function that will be called to measure some data. It should return any
JSON-serializable item.
:param finish_fn: Function that will be called when the measurement process ends.
"""
record_count = 0
records = []
last_update = time.time()
def capture():
now = time.time()
record = capture_fn(now)
if record is not None:
records.append(record)
return now
def finish(sig, frame):
nonlocal record_count
capture()
record_count += len(records)
dump_records(file, records)
logging.info(f"Interrupting monitoring, wrote {record_count} records")
finish_fn()
signal.signal(signal.SIGINT, finish)
while True:
now = capture()
if now - last_update > dump_interval:
last_update = now
record_count += len(records)
dump_records(file, records)
time.sleep(capture_interval)
import logging
import os
import shutil
import sys
import time
import click
import psutil
from ..io import measure_and_store
logging.basicConfig(level=logging.INFO)
def get_resources():
cpus = psutil.cpu_percent(percpu=True)
mem = psutil.virtual_memory().percent
connections = sum(1 if c[5] == "ESTABLISHED" else 0 for c in psutil.net_connections())
bytes = psutil.net_io_counters()
io = psutil.disk_io_counters()
return {
"cpu": cpus,
"mem": mem,
"connections": connections,
"net-write": 0 if bytes is None else bytes.bytes_sent,
"net-read": 0 if bytes is None else bytes.bytes_recv,
"disk-write": 0 if io is None else io.write_bytes,
"disk-read": 0 if io is None else io.read_bytes
}
def generate_record(timestamp):
resources = get_resources()
return {
"timestamp": timestamp,
"resources": resources,
}
@click.command()
@click.argument("output")
@click.option("--capture-interval", default=1)
@click.option("--dump-interval", default=10)
def main(output: str, capture_interval: int, dump_interval: int):
def capture(timestamp):
try:
return generate_record(timestamp)
except Exception as e:
logging.error("Opening cluster exception: {}".format(e))
return None
def finish():
logging.info(f"Copying trace from {tmp_output} to {output}")
shutil.copyfile(tmp_output, output)
sys.exit()
tmp_output = f"/tmp/{os.path.basename(output)}-{int(time.time())}"
# create tmp file
with open(tmp_output, "w") as f:
pass
measure_and_store(capture_interval, dump_interval, tmp_output, capture, finish)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment