Forked from
Ada Böhm / quake
25 commits behind the upstream repository.
-
Stanislav Bohm authoredStanislav Bohm authored
deploy_pbs.py 2.44 KiB
import logging
import os
import pathlib
import click as click
from cluster import CLUSTER_FILENAME, Cluster, HOSTNAME, start_process
CURRENT_DIR = pathlib.Path(__file__).absolute().parent
ROOT_DIR = CURRENT_DIR.parent
DATASRV_PORT = 8602
def prepare_directory(path):
os.makedirs(path, exist_ok=True)
def is_inside_pbs():
return "PBS_NODEFILE" in os.environ
def get_pbs_nodes():
assert is_inside_pbs()
with open(os.environ["PBS_NODEFILE"]) as f:
return [line.strip() for line in f]
def start_datasrv(cluster, node, workdir, env, init_cmd):
datasrv_dir = workdir / f"{node}-datasrv"
prepare_directory(datasrv_dir)
datasrv_data_dir = datasrv_dir / "data"
prepare_directory(datasrv_data_dir)
name = "datasrv"
commands = ["python", "-m", "quake.datasrv", str(datasrv_data_dir), "--port", DATASRV_PORT]
pid, cmd = start_process(commands, host=node, workdir=str(datasrv_dir), name=name, env=env,
init_cmd=init_cmd)
cluster.add(node, pid, cmd, key="datasrv")
def start_server(cluster, workers, workdir, env, init_cmd):
workdir = workdir / "server"
prepare_directory(workdir)
commands = ["python", "-m", "quake.server", "--ds-port", DATASRV_PORT, "--workers",
",".join(workers)]
pid, cmd = start_process(commands, workdir=str(workdir), name="server", env=env,
init_cmd=init_cmd)
cluster.add(HOSTNAME, pid, cmd, key="server")
@click.command()
@click.argument("workdir")
@click.option("--init-cmd", default="")
def up(workdir, init_cmd):
nodes = get_pbs_nodes()
workdir = pathlib.Path(workdir).absolute()
prepare_directory(workdir)
env = {}
env["PYTHONPATH"] = f'{ROOT_DIR}:{env.get("PYTHONPATH", "")}'
cluster = Cluster(str(workdir))
for node in nodes:
start_datasrv(cluster, node, workdir, env, init_cmd)
start_server(cluster, nodes, workdir, env, init_cmd)
cluster_path = workdir / CLUSTER_FILENAME
logging.info(f"Writing cluster into {cluster_path}")
with open(cluster_path, "w") as f:
cluster.serialize(f)
@click.command()
@click.argument("workdir")
def down(workdir):
with open(os.path.join(workdir, CLUSTER_FILENAME)) as f:
cluster = Cluster.deserialize(f)
cluster.kill()
@click.group()
def cli():
pass
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
cli.add_command(up)
cli.add_command(down)
cli()