Commit 918bb699 authored by Stanislav Bohm's avatar Stanislav Bohm
Browse files

Task monitoring

parent 3153896f
......@@ -21,7 +21,9 @@ class Client:
def _connect(self, hostname, port):
async def connect():
connection = abrpc.Connection(await asyncio.open_connection(hostname, port=port))
connection = abrpc.Connection(
await asyncio.open_connection(hostname, port=port)
)
asyncio.ensure_future(connection.serve())
logger.info("Connection to server established")
return connection
......@@ -54,7 +56,9 @@ class Client:
logger.debug("Submitting %s tasks", len(tasks))
serialized_tasks = self._prepare_submit(tasks)
if serialized_tasks:
self.loop.run_until_complete(self.connection.call("submit", serialized_tasks))
self.loop.run_until_complete(
self.connection.call("submit", serialized_tasks)
)
def wait(self, task):
logger.debug("Waiting on task id=%s", task.task_id)
......@@ -72,4 +76,6 @@ class Client:
if not task.keep:
raise Exception("'keep' flag is not set for a task")
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.connection.call("gather", task.task_id, output_id))
return loop.run_until_complete(
self.connection.call("gather", task.task_id, output_id)
)
class Plan:
def __init__(self):
self.tasks = []
......
......@@ -10,7 +10,6 @@ class TaskState(enum.Enum):
class Task:
def __init__(self, task_id, n_outputs, n_workers, config, keep, inputs):
self.task_id = task_id
self.inputs = inputs
......@@ -51,11 +50,7 @@ PY_JOB_ARGS = ("python3", "-m", "quake.job", "$TASK_ID", "$RANK", "$DS_PORT")
def new_mpirun_task(n_outputs, n_workers, args, keep=False, task_data=None, inputs=()):
config = {
"type": "mpirun",
"args": args,
"env": DEFAULT_ENV
}
config = {"type": "mpirun", "args": args, "env": DEFAULT_ENV}
if task_data is not None:
assert isinstance(task_data, bytes)
config["data"] = task_data
......
......@@ -14,6 +14,7 @@ global_client = None
def mpi_task(*, n_processes, n_outputs=1):
def _builder(fn):
return FunctionWrapper(fn, n_processes, n_outputs, global_plan)
return _builder
......@@ -28,6 +29,7 @@ def arg(name, layout="all_to_all"):
fn._quake_args = configs
configs[name] = ArgConfig(layout)
return fn
return _builder
......@@ -80,8 +82,10 @@ def ensure_global_client():
if global_client is None:
server = os.environ.get("QUAKE_SERVER")
if server is None:
raise Exception("No global server is defined."
"Set variable QUAKE_SERVER or call quake.client.set_global_client()")
raise Exception(
"No global server is defined."
"Set variable QUAKE_SERVER or call quake.client.set_global_client()"
)
if ":" in server:
hostname, port = server.rsplit(":", 1)
try:
......@@ -93,6 +97,7 @@ def ensure_global_client():
global_client = Client(server)
return global_client
# ===== Internals ==============
......@@ -106,4 +111,4 @@ def _get_task(obj):
if isinstance(obj, ResultProxy):
return obj.task
else:
raise Exception("ResultProxy expected, got '{}'".format(repr(obj)))
\ No newline at end of file
raise Exception("ResultProxy expected, got '{}'".format(repr(obj)))
......@@ -23,7 +23,6 @@ def _load(obj):
class PythonJob:
def __init__(self, pickled_fn, task_args, const_args):
self.pickled_fn = pickled_fn
self.task_args = task_args
......@@ -42,7 +41,6 @@ ArgConfig = collections.namedtuple("ArgConfig", "layout")
class FunctionWrapper:
def __init__(self, fn, n_processes, n_outputs, plan):
self.fn = fn
self.signature = inspect.signature(fn)
......@@ -103,4 +101,4 @@ class ResultProxy:
__slots__ = ["task"]
def __init__(self, task):
self.task = task
\ No newline at end of file
self.task = task
......@@ -14,15 +14,16 @@ class TaskInput:
return {
"task": self.task.task_id,
"output_ids": self.output_ids,
"layout": self.layout.serialize()
"layout": self.layout.serialize(),
}
@staticmethod
def from_dict(data, tasks):
return TaskInput(
tasks[data["task"]],
data["output_ids"],
Layout.deserialize(data["layout"]))
tasks[data["task"]], data["output_ids"], Layout.deserialize(data["layout"])
)
def __repr__(self):
return "<Input task={} o={} l={}>".format(self.task.task_id, self.output_ids, self.layout)
return "<Input task={} o={} l={}>".format(
self.task.task_id, self.output_ids, self.layout
)
......@@ -34,7 +34,8 @@ def run_data_service(workdir, port):
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=port))
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=port)
)
loop.run_forever()
......
......@@ -15,5 +15,5 @@ def get_resources():
"net-write": 0 if bytes is None else bytes.bytes_sent,
"net-read": 0 if bytes is None else bytes.bytes_recv,
"disk-write": 0 if io is None else io.write_bytes,
"disk-read": 0 if io is None else io.read_bytes
"disk-read": 0 if io is None else io.read_bytes,
}
......@@ -3,7 +3,6 @@ import asyncio
class Object:
def __init__(self, name, data):
self.name = name
self.data = data
......
......@@ -15,14 +15,19 @@ def validate_name(name):
class Service:
def __init__(self, workdir):
self.workdir = workdir
self.objects = {}
self.connections = {}
self.events = []
self.stats_obj_fetched = 0
self.stats_bytes_fetched = 0
self.stats_obj_data_provided = 0
self.stats_bytes_provided = 0
self.stats_obj_uploaded = 0
self.stats_bytes_uploaded = 0
async def _serve(self, connection, hostname, port):
await connection.serve()
......@@ -51,6 +56,7 @@ class Service:
conn = await self._connect(hostname, port)
data = await conn.call("get_data", name)
self.stats_obj_fetched += 1
self.stats_bytes_fetched += len(data)
obj = Object(name, data)
f.set_result(obj)
return obj
......@@ -73,14 +79,17 @@ class Service:
raise Exception("Data is not bytes, but {}".format(type(data)))
obj_f = asyncio.Future()
obj_f.set_result(Object(name, data))
self.stats_obj_uploaded += 1
self.stats_bytes_uploaded += len(data)
self.objects[name] = obj_f
@expose()
async def get_data(self, name, hostname=None, port=None):
validate_name(name)
obj = await self._get_object(name, hostname, port)
data = (await self._get_object(name, hostname, port)).get_data()
self.stats_obj_data_provided += 1
return obj.get_data()
self.stats_bytes_provided += len(data)
return data
# if data is None:
# # This can happen in case of racing with .remove()
# raise Exception("Object removed")
......@@ -119,14 +128,29 @@ class Service:
return True
@expose()
async def get_stats(self):
return {
async def add_event(self, event):
self.events.append(event)
@expose()
async def get_stats(self, clear_events=True):
result = {
# "obj_file_provided": self.stats_obj_file_provided,
"service": {
"obj_data_provided": self.stats_obj_data_provided,
"obj_fetched": self.stats_obj_fetched,
"connections": len(self.connections),
"obj_provided": self.stats_obj_data_provided,
"obj_fetched": self.stats_obj_fetched,
"obj_uploaded": self.stats_obj_uploaded,
"bytes_fetched": self.stats_bytes_fetched,
"bytes_provided": self.stats_bytes_provided,
"bytes_uploaded": self.stats_bytes_uploaded,
"connections": len(self.connections),
},
"resources": get_resources(),
"timestamp": datetime.now().isoformat()
}
\ No newline at end of file
"timestamp": datetime.now().isoformat(),
}
events = self.events
if clear_events:
self.events = []
if events:
result["events"] = events
return result
class JobConfiguration:
def __init__(self, fn, n_outputs, payload=None):
self.fn = fn
self.n_outputs = n_outputs
......
import asyncio
import json
import logging
# import cloudpickle
import pickle
import random
......@@ -9,19 +10,18 @@ import abrpc
from quake.common.layout import Layout
from quake.common.utils import make_data_name
from datetime import datetime
logger = logging.getLogger(__name__)
class JobContext:
def __init__(self, rank, inputs):
self.rank = rank
self.inputs = inputs
class Job:
def __init__(self, task_id, rank, ds_local_port, data_placements):
logger.info("Starting task=%s, rank=%s", task_id, rank)
self.task_id = task_id
......@@ -33,7 +33,12 @@ class Job:
async def connect_to_ds(self):
logger.info("Connecting to data service on port %s", self.ds_local_port)
self.ds_connection = abrpc.Connection(await asyncio.open_connection("localhost", self.ds_local_port))
self.ds_connection = abrpc.Connection(
await asyncio.open_connection("localhost", self.ds_local_port)
)
self.ds_connection.set_nr_error_handle(
lambda name, args: logger.error("Call '{}' failed".format(name))
)
self.ds_task = asyncio.ensure_future(self.ds_connection.serve())
logger.info("Connection to data service established")
......@@ -63,18 +68,27 @@ class Job:
async def download_input(self, task_id, pairs):
return await asyncio.gather(
*[self.download_object(make_data_name(task_id, output_id, part))
for output_id, part in pairs])
*[
self.download_object(make_data_name(task_id, output_id, part))
for output_id, part in pairs
]
)
async def upload_data(self, output_id, data):
name = make_data_name(self.task_id, output_id, self.rank)
await self.ds_connection.call("upload", name, data)
async def send_event(self, event):
event["timestamp"] = datetime.now().isoformat()
await self.ds_connection.call_no_response("add_event", event)
async def start(self):
rank = self.rank
logger.info("Starting task id=%s on rank=%s", self.task_id, rank)
task_id = self.task_id
logger.info("Starting task id=%s on rank=%s", task_id, rank)
await self.connect_to_ds()
await self.send_event({"type": "init", "task": task_id, "rank": rank})
config = await self.download_config()
pd = await self.download_placement_dict()
self.data_placements = pd["placements"]
......@@ -82,11 +96,15 @@ class Job:
fs = []
for inp_dict in inputs:
# TODO: Other layouts
layout = Layout.deserialize(inp_dict["layout"])
fs.append(self.download_input(
inp_dict["task_id"],
layout.iterate(rank, inp_dict["output_ids"], inp_dict["n_parts"])))
fs.append(
self.download_input(
inp_dict["task_id"],
layout.iterate(rank, inp_dict["output_ids"], inp_dict["n_parts"]),
)
)
await self.send_event({"type": "start", "task": task_id, "rank": rank})
input_data = await asyncio.gather(*fs)
jctx = JobContext(rank, input_data)
......@@ -95,3 +113,5 @@ class Job:
for i, data in enumerate(output):
await self.upload_data(i, data)
await self.send_event({"type": "end", "task": task_id, "rank": rank})
import click
import pandas as pd
import plotly.express as px
from .events import EventStream
def write_fig(fig, name):
print("Creating '{}'".format(name))
fig.write_html(name)
@click.command()
@click.argument("filename")
def cli(filename):
stream = EventStream.load(filename)
resources = stream.resources()
res = resources[["timestamp", "hostname", "cpu", "mem"]].melt(
["timestamp", "hostname"]
)
fig = px.line(
res,
x="timestamp",
y="value",
color="variable",
facet_row="hostname",
render_mode="svg",
)
write_fig(fig, "cpumem.html")
res = resources[
["timestamp", "hostname", "net-write", "net-read", "disk-write", "disk-read"]
].melt(["timestamp", "hostname"])
fig = px.line(
res,
x="timestamp",
y="value",
color="variable",
facet_row="hostname",
render_mode="svg",
)
write_fig(fig, "transfers.html")
res = resources[
["timestamp", "hostname", "obj_fetched", "obj_provided", "obj_uploaded"]
].melt(["timestamp", "hostname"])
fig = px.line(
res,
x="timestamp",
y="value",
color="variable",
facet_row="hostname",
render_mode="svg",
)
write_fig(fig, "services-objs.html")
res = resources[
["timestamp", "hostname", "bytes_fetched", "bytes_provided", "bytes_uploaded"]
].melt(["timestamp", "hostname"])
fig = px.line(
res,
x="timestamp",
y="value",
color="variable",
facet_row="hostname",
render_mode="svg",
)
write_fig(fig, "services-bytes.html")
task_events = stream.task_events()
pivoted = task_events.pivot(index=["task", "rank", "hostname"], columns=["type"])
pivoted.reset_index(inplace=True)
pivoted.columns = [y if y else x for x, y in pivoted.columns]
df1 = pivoted.rename(columns={"init": "Begin", "start": "End"})
df1["type"] = "init"
df2 = pivoted.rename(columns={"start": "Begin", "end": "End"})
df2["type"] = "run"
df = pd.concat([df1, df2])
fig = px.timeline(df, x_start="Begin", x_end="End", y="hostname", color="type")
write_fig(fig, "tasks-per-node.html")
df3 = df.groupby("task").agg(
{"Begin": "min", "End": "max", "hostname": lambda x: ",".join(sorted(set(x)))}
)
df3.reset_index(inplace=True)
print(df3)
fig = px.timeline(df3, x_start="Begin", x_end="End", y="task", color="hostname")
write_fig(fig, "tasks.html")
print(pivoted)
if __name__ == "__main__":
cli()
import json
from datetime import datetime
from pandas import DataFrame
import numpy as np
class EventStream:
def __init__(self, events):
self.events = events
def resources(self):
res_names = ["mem", "net-write", "net-read", "disk-write", "disk-read"]
srv_names = [
"obj_fetched",
"bytes_fetched",
"obj_provided",
"bytes_provided",
"obj_uploaded",
"bytes_uploaded",
]
timestamps = []
hostnames = []
cpus = []
resources = [[] for _ in res_names]
services = [[] for _ in srv_names]
for event in self.events:
hostnames.append(event["hostname"])
timestamps.append(event["timestamp"])
res = event["resources"]
cpus.append(np.mean(res["cpu"]))
for i in range(len(res_names)):
resources[i].append(res[res_names[i]])
srv = event["service"]
for i in range(len(srv_names)):
services[i].append(srv[srv_names[i]])
columns = {
"timestamp": timestamps,
"hostname": hostnames,
"cpu": cpus,
}
for name, value in zip(res_names, resources):
columns[name] = value
for name, value in zip(srv_names, services):
columns[name] = value
df = DataFrame(columns)
g = df.groupby("hostname")
for column in ["net-write", "net-read", "disk-write", "disk-read"]:
df[column] = g[column].transform(lambda x: x - x.min())
return df
def task_events(self):
rows = []
for event in self.events:
evs = event.get("events")
hostname = event["hostname"]
if evs:
for ev in evs:
rows.append(
(
hostname,
ev["timestamp"],
ev["type"],
ev["task"],
ev.get("rank"),
)
)
return DataFrame(
rows, columns=["hostname", "timestamp", "type", "task", "rank"]
)
@staticmethod
def load(filename):
events = []
with open(filename, "r") as f:
for line in f:
if line:
event = json.loads(line)
event["timestamp"] = datetime.fromisoformat(event["timestamp"])
evs = event.get("events")
if evs:
for ev in evs:
ev["timestamp"] = datetime.fromisoformat(ev["timestamp"])
events.append(event)
return EventStream(events)
......@@ -43,7 +43,8 @@ def main():
logger.info("Listing for client on port %s", args.port)
loop.run_until_complete(
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=args.port))
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=args.port)
)
loop.run_forever()
......
......@@ -20,7 +20,6 @@ logger = logging.getLogger(__file__)
class Process:
def __init__(self, task, workers):
self.task = task
self.workers = workers
......@@ -62,7 +61,11 @@ async def _remove_task(task):
for output_id in range(task.n_outputs):
for part in range(task.n_workers):
fs.append(_remove_from_workers(placement[output_id][part], task.make_data_name(output_id, part)))
fs.append(
_remove_from_workers(
placement[output_id][part], task.make_data_name(output_id, part)
)
)
await asyncio.gather(*fs)
logger.debug("All parts of task %s was removed (%s calls)", task, len(fs))
......@@ -79,9 +82,16 @@ async def _remove_from_workers(workers, name):
async def _download_sizes(task, workers):
fs = [w.ds_connection.call("get_sizes",
[task.make_data_name(output_id, part_id) for output_id in range(task.n_outputs)])
for part_id, w in enumerate(workers)]
fs = [
w.ds_connection.call(
"get_sizes",
[
task.make_data_name(output_id, part_id)
for output_id in range(task.n_outputs)
],
)
for part_id, w in enumerate(workers)
]
sizes = await asyncio.gather(*fs)
return [
[sizes[part_id][output_id] for part_id in range(task.n_workers)]
......@@ -97,7 +107,6 @@ async def _fetch_stats(worker):
class Server:
def __init__(self, worker_hostnames, local_ds_port, monitor_filename):
logger.debug("Starting QUake server")
......@@ -121,7 +130,10 @@ class Server:
async def _gather_output(task, output_id):