Skip to content
Snippets Groups Projects
Commit ecb900d2 authored by Stanislav Bohm's avatar Stanislav Bohm
Browse files

Source blacked

parent cf8f677c
No related branches found
No related tags found
No related merge requests found
...@@ -21,27 +21,25 @@ from quake.client.functions import reset_global_plan, set_global_client # noqa ...@@ -21,27 +21,25 @@ from quake.client.functions import reset_global_plan, set_global_client # noqa
nodes = 3 nodes = 3
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def docker_cluster(): def docker_cluster():
no_shutdown = os.environ.get("QUAKE_TEST_NO_SHUTDOWN") == "1" no_shutdown = os.environ.get("QUAKE_TEST_NO_SHUTDOWN") == "1"
subprocess.check_call( subprocess.check_call(["docker-compose", "up", "-d"], cwd=DOCKER_DIR)
["docker-compose", "up", "-d"], hostnames = tuple(
cwd=DOCKER_DIR) ["mpihead"] + ["mpinode{}".format(i) for i in range(1, nodes + 1)]
hostnames = tuple(["mpihead"] + ["mpinode{}".format(i) for i in range(1, nodes + 1)]) )
yield hostnames yield hostnames
if not no_shutdown: if not no_shutdown:
subprocess.check_call(["docker-compose", "down"], subprocess.check_call(["docker-compose", "down"], cwd=DOCKER_DIR)
cwd=DOCKER_DIR)
cmd_prefix = ["docker-compose", "exec", "-T", "--user", "mpirun", "--privileged"] cmd_prefix = ["docker-compose", "exec", "-T", "--user", "mpirun", "--privileged"]
def make_cmds(cmd): def make_cmds(cmd):
result = [ result = [cmd_prefix + ["mpihead"] + cmd]
cmd_prefix + ["mpihead"] + cmd
]
for i in range(1, nodes + 1): for i in range(1, nodes + 1):
result.append(cmd_prefix + ["mpinode{}".format(i)] + cmd) result.append(cmd_prefix + ["mpinode{}".format(i)] + cmd)
return result return result
...@@ -62,7 +60,14 @@ def wait_for_port(port): ...@@ -62,7 +60,14 @@ def wait_for_port(port):
def popen_helper(cmd, logfile, **kwargs): def popen_helper(cmd, logfile, **kwargs):
with open(logfile, "w") as f: with open(logfile, "w") as f:
return subprocess.Popen(cmd, cwd=DOCKER_DIR, stdin=subprocess.DEVNULL, stderr=subprocess.STDOUT, stdout=f, **kwargs) return subprocess.Popen(
cmd,
cwd=DOCKER_DIR,
stdin=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
stdout=f,
**kwargs
)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
...@@ -70,8 +75,15 @@ def client(docker_cluster, tmpdir): ...@@ -70,8 +75,15 @@ def client(docker_cluster, tmpdir):
print("Working directory:", tmpdir) print("Working directory:", tmpdir)
ps = [] ps = []
logdir = tmpdir.mkdir("logs") logdir = tmpdir.mkdir("logs")
for i, cmd in enumerate(make_cmds(["/bin/bash", "-c", for i, cmd in enumerate(
"pgrep python3 | xargs kill; sleep 0.1 ; rm -rf /tmp/data ; python3 -m quake.datasrv /tmp/data"])): make_cmds(
[
"/bin/bash",
"-c",
"pgrep python3 | xargs kill; sleep 0.1 ; rm -rf /tmp/data ; python3 -m quake.datasrv /tmp/data",
]
)
):
p = popen_helper(cmd, logfile=logdir.join("cleanup-{}".format(i))) p = popen_helper(cmd, logfile=logdir.join("cleanup-{}".format(i)))
ps.append(p) ps.append(p)
...@@ -79,7 +91,12 @@ def client(docker_cluster, tmpdir): ...@@ -79,7 +91,12 @@ def client(docker_cluster, tmpdir):
hostnames = ",".join(docker_cluster) hostnames = ",".join(docker_cluster)
# cmd = cmd_prefix + ["mpi_head", "/bin/bash", "-c", "kill `pgrep -f quake.server` ; sleep 0.1; echo 'xxx'; python3 -m quake.server --workers={}".format(hostnames)] # cmd = cmd_prefix + ["mpi_head", "/bin/bash", "-c", "kill `pgrep -f quake.server` ; sleep 0.1; echo 'xxx'; python3 -m quake.server --workers={}".format(hostnames)]
cmd = cmd_prefix + ["mpihead", "/bin/bash", "-c", "python3 -m quake.server --debug --workers={}".format(hostnames)] cmd = cmd_prefix + [
"mpihead",
"/bin/bash",
"-c",
"python3 -m quake.server --debug --workers={}".format(hostnames),
]
# print(" ".join(cmd)) # print(" ".join(cmd))
popen_helper(cmd, logfile=logdir.join("server")) popen_helper(cmd, logfile=logdir.join("server"))
ps.append(p) ps.append(p)
......
...@@ -12,12 +12,36 @@ def test_data_service(tmpdir, root_dir): ...@@ -12,12 +12,36 @@ def test_data_service(tmpdir, root_dir):
env = {"PYTHONPATH": root_dir} env = {"PYTHONPATH": root_dir}
ps = [] ps = []
ps.append( ps.append(
subprocess.Popen(["python3", "-m", "quake.datasrv", "--port", str(PORT1), str(tmpdir.join("srv1"))], env=env)) subprocess.Popen(
[
"python3",
"-m",
"quake.datasrv",
"--port",
str(PORT1),
str(tmpdir.join("srv1")),
],
env=env,
)
)
ps.append( ps.append(
subprocess.Popen(["python3", "-m", "quake.datasrv", "--port", str(PORT2), str(tmpdir.join("srv2"))], env=env)) subprocess.Popen(
[
"python3",
"-m",
"quake.datasrv",
"--port",
str(PORT2),
str(tmpdir.join("srv2")),
],
env=env,
)
)
async def main(): async def main():
connection1 = abrpc.Connection(await asyncio.open_connection("localhost", PORT1)) connection1 = abrpc.Connection(
await asyncio.open_connection("localhost", PORT1)
)
asyncio.ensure_future(connection1.serve()) asyncio.ensure_future(connection1.serve())
assert [] == await connection1.call("list_objects") assert [] == await connection1.call("list_objects")
await connection1.call("upload", "x_1", b"123") await connection1.call("upload", "x_1", b"123")
...@@ -26,7 +50,9 @@ def test_data_service(tmpdir, root_dir): ...@@ -26,7 +50,9 @@ def test_data_service(tmpdir, root_dir):
data = await connection1.call("get_data", "x_1") data = await connection1.call("get_data", "x_1")
assert data == b"123" assert data == b"123"
connection2 = abrpc.Connection(await asyncio.open_connection("localhost", PORT2)) connection2 = abrpc.Connection(
await asyncio.open_connection("localhost", PORT2)
)
asyncio.ensure_future(connection2.serve()) asyncio.ensure_future(connection2.serve())
c1 = connection2.call("get_data", "x_1", "localhost", PORT1) c1 = connection2.call("get_data", "x_1", "localhost", PORT1)
c2 = connection2.call("get_data", "x_1", "localhost", PORT1) c2 = connection2.call("get_data", "x_1", "localhost", PORT1)
...@@ -44,32 +70,36 @@ def test_data_service(tmpdir, root_dir): ...@@ -44,32 +70,36 @@ def test_data_service(tmpdir, root_dir):
s1 = await connection1.call("get_stats") s1 = await connection1.call("get_stats")
s2 = await connection2.call("get_stats") s2 = await connection2.call("get_stats")
assert s1 == {'connections': 0, assert s1 == {
'obj_data_provided': 2, "connections": 0,
'obj_fetched': 0, "obj_data_provided": 2,
# 'obj_file_provided': 0 "obj_fetched": 0,
} # 'obj_file_provided': 0
assert s2 == {'connections': 1, }
'obj_data_provided': 2, assert s2 == {
'obj_fetched': 1, "connections": 1,
# 'obj_file_provided': 2 "obj_data_provided": 2,
} "obj_fetched": 1,
# 'obj_file_provided': 2
}
assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2) assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2)
assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2) assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2)
s1 = await connection1.call("get_stats") s1 = await connection1.call("get_stats")
s2 = await connection2.call("get_stats") s2 = await connection2.call("get_stats")
assert s1 == {'connections': 1, assert s1 == {
'obj_data_provided': 4, "connections": 1,
'obj_fetched': 1, "obj_data_provided": 4,
# 'obj_file_provided': 0 "obj_fetched": 1,
} # 'obj_file_provided': 0
assert s2 == {'connections': 1, }
'obj_data_provided': 3, assert s2 == {
'obj_fetched': 1, "connections": 1,
# 'obj_file_provided': 2 "obj_data_provided": 3,
} "obj_fetched": 1,
# 'obj_file_provided': 2
}
try: try:
time.sleep(0.3) time.sleep(0.3)
......
from quake.common.taskinput import Layout from quake.common.taskinput import Layout
...@@ -14,17 +13,29 @@ def test_layouts(): ...@@ -14,17 +13,29 @@ def test_layouts():
] ]
assert check(Layout(1, 0, 0, 1), [11], 4) == [ assert check(Layout(1, 0, 0, 1), [11], 4) == [
[(11, 0)], [(11, 1)], [(11, 2)], [(11, 3)], [(11, 0)],
[(11, 1)],
[(11, 2)],
[(11, 3)],
] ]
assert check(Layout(1, 0, 0, 1), [11], 2) == [ assert check(Layout(1, 0, 0, 1), [11], 2) == [
[(11, 0)], [(11, 1)], [(11, 0)], [(11, 1)], [(11, 0)],
[(11, 1)],
[(11, 0)],
[(11, 1)],
] ]
assert check(Layout(1, 0, 0, 1), [20, 21, 22, 23], 1) == [ assert check(Layout(1, 0, 0, 1), [20, 21, 22, 23], 1) == [
[(20, 0)], [(21, 0)], [(22, 0)], [(23, 0)], [(20, 0)],
[(21, 0)],
[(22, 0)],
[(23, 0)],
] ]
assert check(Layout(2, 0, 0, 2), [20, 21, 22, 23], 2) == [ assert check(Layout(2, 0, 0, 2), [20, 21, 22, 23], 2) == [
[(20, 0), (21, 0)], [(22, 0), (23, 0)], [(20, 1), (21, 1)], [(22, 1), (23, 1)], [(20, 0), (21, 0)],
] [(22, 0), (23, 0)],
\ No newline at end of file [(20, 1), (21, 1)],
[(22, 1), (23, 1)],
]
...@@ -100,7 +100,14 @@ def test_greedy_match(): ...@@ -100,7 +100,14 @@ def test_greedy_match():
t1 = Task(1, 1, 3, None, False, []) t1 = Task(1, 1, 3, None, False, [])
t2 = Task(2, 1, 1, None, False, []) t2 = Task(2, 1, 1, None, False, [])
t3 = Task(3, 1, 2, None, False, []) t3 = Task(3, 1, 2, None, False, [])
t4 = Task(4, 0, 3, None, False, [inp(t1, 0, "scatter"), inp(t2, 0, "all_to_all"), inp(t3, 0, "scatter")]) t4 = Task(
4,
0,
3,
None,
False,
[inp(t1, 0, "scatter"), inp(t2, 0, "all_to_all"), inp(t3, 0, "scatter")],
)
workers = make_workers(4) workers = make_workers(4)
state = State(workers) state = State(workers)
state.add_tasks([t.to_dict() for t in [t1, t2, t3, t4]]) state.add_tasks([t.to_dict() for t in [t1, t2, t3, t4]])
...@@ -113,5 +120,5 @@ def test_greedy_match(): ...@@ -113,5 +120,5 @@ def test_greedy_match():
state._fake_placement(t3, [[{w1, w2}, {w2}]], [[1, 2]]) state._fake_placement(t3, [[{w1, w2}, {w2}]], [[1, 2]])
s = list(state.schedule()) s = list(state.schedule())
assert(len(s) == 1) assert len(s) == 1
assert(s[0][1] == [w2, w3, w1]) assert s[0][1] == [w2, w3, w1]
\ No newline at end of file
...@@ -4,7 +4,13 @@ import pickle ...@@ -4,7 +4,13 @@ import pickle
import pytest import pytest
import quake.job import quake.job
from quake.client.base.task import Task, new_mpirun_task, upload_data, new_py_task, make_input from quake.client.base.task import (
Task,
new_mpirun_task,
upload_data,
new_py_task,
make_input,
)
def test_task_fail(client): def test_task_fail(client):
...@@ -64,4 +70,6 @@ def test_py_job(client): ...@@ -64,4 +70,6 @@ def test_py_job(client):
assert [b"out0", b"out1"] == client.gather(t1, 0) assert [b"out0", b"out1"] == client.gather(t1, 0)
assert [b"out0", b"out1"] == client.gather(t1, 0) # Check the same call again assert [b"out0", b"out1"] == client.gather(t1, 0) # Check the same call again
assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(t2, 0) assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(t2, 0)
assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(t2, 0) # Check the same call again assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(
t2, 0
) # Check the same call again
...@@ -113,4 +113,4 @@ def test_wrapper_error(client): ...@@ -113,4 +113,4 @@ def test_wrapper_error(client):
def test_wrapper_too_many_processes(client): def test_wrapper_too_many_processes(client):
quake.set_global_client(client) quake.set_global_client(client)
with pytest.raises(Exception, match="server has only 4 workers"): with pytest.raises(Exception, match="server has only 4 workers"):
quake.wait(big_task()) quake.wait(big_task())
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment