Skip to content
Snippets Groups Projects
Unverified Commit beb04b95 authored by Jakub Beránek's avatar Jakub Beránek
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
Pipeline #24892 failed
# Makespan prediction using neural networks
This project contains an experimental predictor that can learn to predict task graph makespans
from a dataset of task graphs.
## Architecture
### Model
The predictor uses Graph Convolutional Layers from [PyTorch Geometric](https://pytorch-geometric.readthedocs.io/en/latest/)
as a building block. It employs bidirectional edge convolutions and global pooling using
average, maximum and mean reductions.
### Dataset
The input task graph data (dependencies between tasks and task durations) are embedded into the
PyTorch Geometric graph data format. The features of a task graph node contain its duration normalized
by the maximum task duration of the whole task graph.
The ground truth labels for makespan durations of a task graph are generated using
[Estee](https://github.com/spirali/estee), a task graph simulating environment.
## Installation
```bash
$ python3 -m venv venv
$ source venv/bin/activate
$ pip install -U pip setuptools wheel
$ pip install -r requirements.txt
```
## Usage
```bash
python makespan_prediction.py
```
You can load data from a JSON dataset (`dataset1.json` is included as an example), or modify
the dataset generator in the `generate_dataset_1` function.
Source diff could not be displayed: it is too large. Options to address this: view the blob.
import json
import random
from typing import List
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
import torchmetrics
from estee.generators.irw import mapreduce
from estee.serialization.dask_json import deserialize_graph, serialize_graph
from kitt.data import train_test_split
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, global_add_pool, global_max_pool, global_mean_pool
from torchmetrics import MeanAbsoluteError
from src.conversion import estee_to_pyg
from src.data import TrainExample
from src.generator import merge_neighbours, simulate_graph, triplets
from src.utils import timer
class Denormalizer:
def denormalize(self, value, batch):
return value * batch.normalization_factor.unsqueeze(1)
def df_to_geometric_data(df: pd.DataFrame) -> List[Data]:
dataset = []
for row in range(len(df)):
example = df["example"].iloc[row]
graph = example.graph
makespan = df["makespan"].iloc[row]
(node_features, edge_index) = estee_to_pyg(example)
max_duration = max(t.duration for t in graph.tasks.values())
node_features = node_features / max_duration
makespan = makespan / max_duration
data = Data(x=node_features, edge_index=edge_index,
y=torch.tensor([makespan], dtype=torch.float32),
normalization_factor=max_duration)
dataset.append(data)
return dataset
class GCN(torch.nn.Module):
def __init__(self, num_features: int):
super().__init__()
self.conv1 = GCNConv(num_features, 32, flow="target_to_source")
self.conv2 = GCNConv(num_features, 32, flow="source_to_target")
self.conv3 = GCNConv(64, 64)
self.node_head1 = torch.nn.Linear(64, 32)
self.graph_head = torch.nn.Linear(96, 1)
def forward(self, data):
x, edge_index = data.x, data.edge_index
input = x
x1 = self.conv1(input, edge_index)
x2 = self.conv2(input, edge_index)
x = torch.cat([x1, x2], axis=-1)
x = F.relu(x)
x = self.conv3(x, edge_index)
x = F.relu(x)
x = self.node_head1(x)
x = F.relu(x)
sum = global_add_pool(x, data.batch)
mean = global_mean_pool(x, data.batch)
max = global_max_pool(x, data.batch)
x = torch.cat([sum, mean, max], axis=1)
x = self.graph_head(x)
return x
class MakespanPredictor(pl.LightningModule):
def __init__(self, num_features: int, learning_rate: float, denormalizer: Denormalizer):
super().__init__()
self.module = GCN(num_features)
self.learning_rate = learning_rate
self.denormalizer = denormalizer
self.mae = MeanAbsoluteError()
def forward(self, x):
return self.module(x)
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay=5e-4)
def training_step(self, train_batch, batch_idx):
return self.step(train_batch, "loss")[2]
def validation_step(self, val_batch, batch_idx):
with torch.inference_mode():
pred, gt, loss = self.step(val_batch, "val_loss")
pred_dn = self.denormalizer.denormalize(pred, val_batch)
gt_dn = self.denormalizer.denormalize(gt, val_batch)
acc = self.mae(pred_dn, gt_dn)
self.log("val_mae", acc, prog_bar=True)
def step(self, batch: Data, loss_name: str):
pred = self.module(batch)
gt = batch.y.unsqueeze(1)
loss = F.mse_loss(pred, gt)
prog_bar = "val" in loss_name
self.log(loss_name, loss, prog_bar=prog_bar)
return pred, gt, loss
def save_dataset(path: str, dataset: pd.DataFrame):
examples = []
for example in dataset["example"]:
graph = serialize_graph(example.graph)
examples.append({
"graph": graph,
"worker_count": example.worker_count
})
makespans = list(dataset["makespan"])
data = {
"examples": examples,
"makespans": makespans
}
with open(path, "w") as f:
f.write(json.dumps(data))
def load_dataset(path: str) -> pd.DataFrame:
with open(path) as f:
data = json.load(f)
examples = []
for example in data["examples"]:
graph = deserialize_graph(example["graph"])
examples.append(TrainExample(graph, example["worker_count"]))
return pd.DataFrame({
"example": examples,
"makespan": data["makespans"]
})
def create_dataframe(examples: List[TrainExample]):
makespans = []
for example in examples:
makespans.append(simulate_graph(example))
return pd.DataFrame({
"example": examples,
"makespan": makespans
})
def generate_dataset_1(count=100) -> pd.DataFrame:
examples = []
for i in range(count):
worker_count = 1
task_count = random.randint(3, 50)
graph = mapreduce(task_count)
# graph = merge_neighbours(task_count)
# graph = triplets(task_count, cpus=worker_count)
example = TrainExample(graph=graph, worker_count=worker_count)
examples.append(example)
return create_dataframe(examples)
if __name__ == "__main__":
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
with timer("generate"):
dataset = generate_dataset_1(50)
save_dataset("dataset1.json", dataset)
# dataset = load_dataset("dataset1.json")
print(dataset.head(5))
# for graph in dataset["graph"]:
# nx_graph = estee_to_nx(graph)
# nx.draw_kamada_kawai(nx_graph)
# plt.show()
# exit()
with timer("convert"):
dataset = df_to_geometric_data(dataset)
train_dataset, val_dataset = train_test_split(dataset, 0.2)
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
if val_dataset:
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
else:
val_loader = None
learning_rate = 0.001
denormalizer = Denormalizer()
model = MakespanPredictor(dataset[0].num_features, learning_rate=learning_rate,
denormalizer=denormalizer)
gpus = None
max_epochs = 1500
trainer = pl.Trainer(gpus=gpus, max_epochs=max_epochs, log_every_n_steps=batch_size)
trainer.fit(model, train_loader, val_loader)
mae = torchmetrics.MeanAbsoluteError()
model.eval()
def eval_dataset(dataset):
errors = []
for (index, batch) in enumerate(dataset):
pred = model(batch)
gt = batch.y.unsqueeze(1)
pred = denormalizer.denormalize(pred, batch)
gt = denormalizer.denormalize(gt, batch)
error = mae(pred, gt).detach().numpy()
errors.append(error)
if index == 0:
print(error, pred, gt)
print(f"Average error: {np.mean(errors)}")
print("Train")
eval_dataset(train_loader)
if val_loader:
print("Val")
eval_dataset(val_loader)
torch==1.9.1
torch-geometric==2.0.1
git+https://github.com/it4innovations/estee
pandas==1.3.3
networkx==2.6.3
torch-scatter==2.0.8
torch-sparse==0.6.12
torch-cluster==1.5.9
torch-spline-conv==1.2.1
torchmetrics==0.5.1
tqdm==4.62.3
pytorch-lightning==1.4.7
git+https://github.com/spirali/kitt
from collections import deque
from itertools import chain
import networkx as nx
import torch
from estee.common import TaskGraph
from .data import TrainExample
def estee_to_pyg(example: TrainExample):
"""
Returns (node_features, edge_index)
"""
graph = example.graph
edges_from = []
edges_to = []
features = [None] * len(graph.tasks)
to_visit = deque(graph.source_tasks())
visited = set()
while to_visit:
node = to_visit.popleft()
if node in visited:
continue
visited.add(node)
consumers = list(chain.from_iterable(o.consumers for o in node.outputs))
feats = [node.duration, example.worker_count]
features[node.id] = feats
for consumer in consumers:
if consumer not in visited:
edges_from.append(node.id)
edges_to.append(consumer.id)
to_visit.append(consumer)
assert len(edges_from) == len(edges_to)
edge_index = torch.tensor([edges_from, edges_to], dtype=torch.long)
node_features = torch.tensor(features, dtype=torch.float)
return node_features, edge_index
def estee_to_nx(graph: TaskGraph) -> nx.DiGraph:
nx_graph = nx.DiGraph()
for task in graph.tasks.values():
for output in task.outputs:
nx_graph.add_edge(f"t{task.id}", f"o{output.id}")
for next_task in output.consumers:
nx_graph.add_edge(f"o{output.id}", f"t{next_task.id}")
return nx_graph
import dataclasses
from estee.common import TaskGraph
@dataclasses.dataclass()
class TrainExample:
graph: TaskGraph
worker_count: int
from estee.common import DataObject, TaskGraph
from estee.generators.utils import normal
from estee.schedulers import BlevelGtScheduler
from estee.simulator import Simulator, Worker
from estee.simulator.netmodels import InstantNetModel
from .data import TrainExample
def merge_neighbours(count, normal_center=20):
g = TaskGraph()
tasks1 = [g.new_task("a{}".format(i), duration=normal(normal_center * 1.2, normal_center / 4),
expected_duration=15,
outputs=[DataObject(normal(99, 2.5), 100)])
for i in range(count)]
for i in range(count):
t = g.new_task("b{}".format(i), duration=normal(normal_center, normal_center / 4),
expected_duration=15)
t.add_input(tasks1[i])
t.add_input(tasks1[(i + 1) % count])
return g
def triplets(count, cpus):
g = TaskGraph()
for i in range(count):
t1 = g.new_task("a{}".format(i), duration=normal(5, 1.5), expected_duration=5,
output_size=40)
t2 = g.new_task("b{}".format(i), duration=normal(120, 20), expected_duration=120,
output_size=120, cpus=cpus)
t2.add_input(t1)
t3 = g.new_task("c{}".format(i), duration=normal(32, 3), expected_duration=32)
t3.add_input(t2)
return g
def simulate_graph(example: TrainExample):
netmodel = InstantNetModel()
scheduler = BlevelGtScheduler()
workers = [Worker(cpus=1) for _ in range(example.worker_count)]
simulator = Simulator(example.graph, workers, scheduler, netmodel)
return simulator.run()
import contextlib
import time
@contextlib.contextmanager
def timer(name: str):
start = time.time()
yield
duration = (time.time() - start) * 1000
print(f"{name}: {duration} ms")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment