Commit fd26ceb2 authored by Jakub Beránek's avatar Jakub Beránek
Browse files

PBS deploy script

parent 38a55386
......@@ -17,17 +17,15 @@ def parse_args():
return parser.parse_args()
def main():
logging.basicConfig(level=logging.DEBUG)
args = parse_args()
service = Service(args.workdir)
logger.info("Starting data service on port %s", args.port)
def run_data_service(workdir, port):
service = Service(workdir)
logger.info("Starting data service on port %s", port)
logger.info("Working directory is: %s", args.workdir)
os.makedirs(args.workdir)
logger.info("Working directory is: %s", workdir)
os.makedirs(workdir, exist_ok=True)
if os.listdir(args.workdir):
raise Exception("Working directory '{}' is not empty".format(args.workdir))
if os.listdir(workdir):
raise Exception("Working directory '{}' is not empty".format(workdir))
async def handle(conn):
logger.info("New connection %s", conn)
......@@ -36,9 +34,15 @@ def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.start_server(on_connection(handle), port=args.port))
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=port))
loop.run_forever()
def main():
logging.basicConfig(level=logging.DEBUG)
args = parse_args()
run_data_service(args.workdir, args.port)
if __name__ == "__main__":
main()
......@@ -37,7 +37,7 @@ def main():
loop.run_until_complete(server.connect_to_ds())
loop.run_until_complete(
asyncio.start_server(on_connection(handle), port=args.port))
asyncio.start_server(on_connection(handle), host="0.0.0.0", port=args.port))
loop.run_forever()
......
cloudpickle
uvloop
mpi4py
click
git+https://github.com/spirali/abrpc@master
import logging
import os
import pathlib
import subprocess
import click as click
CURRENT_DIR = pathlib.Path(__file__).absolute().parent
ROOT_DIR = CURRENT_DIR.parent
DATASRV_PORT = 8602
def prepare_directory(path):
os.makedirs(path, exist_ok=True)
def start_process(commands, host=None, workdir=None, name=None, env=None, init_cmd=""):
if not workdir:
workdir = os.getcwd()
workdir = os.path.abspath(workdir)
if init_cmd:
init_cmd = f"{init_cmd} || exit 1"
args = []
if env:
args += ["env"]
for (key, val) in env.items():
args += [f"{key}={val}"]
args += [str(cmd) for cmd in commands]
if not name:
name = "process"
output = os.path.join(workdir, name)
logging.info(f"Running {' '.join(str(c) for c in commands)} on {host}")
stdout_file = f"{output}.out"
stderr_file = f"{output}.err"
command = f"""
cd {workdir} || exit 1
{init_cmd}
ulimit -c unlimited
{' '.join(args)} > {stdout_file} 2> {stderr_file} &
ps -ho pgid $!
""".strip()
cmd_args = []
if host:
cmd_args += ["ssh", host]
else:
cmd_args += ["setsid"]
cmd_args += ["/bin/bash"]
process = subprocess.Popen(cmd_args, cwd=workdir, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stdin=subprocess.PIPE)
out, err = process.communicate(command.encode())
pid = out.strip()
if not pid:
logging.error(
f"Process startup failed with status: {process.returncode}, stderr: {err.decode()}, stdout: {out.decode()}")
if os.path.isfile(stderr_file):
with open(stderr_file) as f:
logging.error("".join(f.readlines()))
raise Exception(f"Process startup failed on {host if host else 'localhost'}: {command}")
pid = int(pid)
logging.info(f"PID: {pid}")
return (pid, command)
def is_inside_pbs():
return "PBS_NODEFILE" in os.environ
def get_pbs_nodes():
assert is_inside_pbs()
with open(os.environ["PBS_NODEFILE"]) as f:
return [line.strip() for line in f]
def start_datasrv(node, workdir, env, init_cmd):
datasrv_dir = workdir / f"{node}-datasrv"
prepare_directory(datasrv_dir)
datasrv_data_dir = datasrv_dir / "data"
prepare_directory(datasrv_data_dir)
name = "datasrv"
commands = ["python", "-m", "quake.datasrv", str(datasrv_data_dir), "--port", DATASRV_PORT]
start_process(commands, host=node, workdir=str(datasrv_dir), name=name, env=env,
init_cmd=init_cmd)
def start_server(workers, workdir, env, init_cmd):
workdir = workdir / "server"
prepare_directory(workdir)
commands = ["python", "-m", "quake.server", "--ds-port", DATASRV_PORT, "--workers",
",".join(workers)]
start_process(commands, workdir=str(workdir), name="server", env=env, init_cmd=init_cmd)
@click.command()
@click.argument("workdir")
@click.option("--init-cmd", default="")
def pbs_deploy(workdir, init_cmd):
nodes = get_pbs_nodes()
workdir = pathlib.Path(workdir).absolute()
prepare_directory(workdir)
env = {}
env["PYTHONPATH"] = f'{ROOT_DIR}:{env.get("PYTHONPATH", "")}'
for node in nodes:
start_datasrv(node, workdir, env, init_cmd)
start_server(nodes, workdir, env, init_cmd)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
pbs_deploy()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment