diff --git a/tests/conftest.py b/tests/conftest.py index 5d2a37bf07468c647d4550bf8b54933a25adeba8..4f556d93156f4d4a87dfe1b132e428410e7750f3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,27 +21,25 @@ from quake.client.functions import reset_global_plan, set_global_client # noqa nodes = 3 + @pytest.fixture(scope="session") def docker_cluster(): no_shutdown = os.environ.get("QUAKE_TEST_NO_SHUTDOWN") == "1" - subprocess.check_call( - ["docker-compose", "up", "-d"], - cwd=DOCKER_DIR) - hostnames = tuple(["mpihead"] + ["mpinode{}".format(i) for i in range(1, nodes + 1)]) + subprocess.check_call(["docker-compose", "up", "-d"], cwd=DOCKER_DIR) + hostnames = tuple( + ["mpihead"] + ["mpinode{}".format(i) for i in range(1, nodes + 1)] + ) yield hostnames if not no_shutdown: - subprocess.check_call(["docker-compose", "down"], - cwd=DOCKER_DIR) + subprocess.check_call(["docker-compose", "down"], cwd=DOCKER_DIR) cmd_prefix = ["docker-compose", "exec", "-T", "--user", "mpirun", "--privileged"] def make_cmds(cmd): - result = [ - cmd_prefix + ["mpihead"] + cmd - ] + result = [cmd_prefix + ["mpihead"] + cmd] for i in range(1, nodes + 1): result.append(cmd_prefix + ["mpinode{}".format(i)] + cmd) return result @@ -62,7 +60,14 @@ def wait_for_port(port): def popen_helper(cmd, logfile, **kwargs): with open(logfile, "w") as f: - return subprocess.Popen(cmd, cwd=DOCKER_DIR, stdin=subprocess.DEVNULL, stderr=subprocess.STDOUT, stdout=f, **kwargs) + return subprocess.Popen( + cmd, + cwd=DOCKER_DIR, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + stdout=f, + **kwargs + ) @pytest.fixture(scope="function") @@ -70,8 +75,15 @@ def client(docker_cluster, tmpdir): print("Working directory:", tmpdir) ps = [] logdir = tmpdir.mkdir("logs") - for i, cmd in enumerate(make_cmds(["/bin/bash", "-c", - "pgrep python3 | xargs kill; sleep 0.1 ; rm -rf /tmp/data ; python3 -m quake.datasrv /tmp/data"])): + for i, cmd in enumerate( + make_cmds( + [ + "/bin/bash", + "-c", + "pgrep python3 | xargs kill; sleep 0.1 ; rm -rf /tmp/data ; python3 -m quake.datasrv /tmp/data", + ] + ) + ): p = popen_helper(cmd, logfile=logdir.join("cleanup-{}".format(i))) ps.append(p) @@ -79,7 +91,12 @@ def client(docker_cluster, tmpdir): hostnames = ",".join(docker_cluster) # cmd = cmd_prefix + ["mpi_head", "/bin/bash", "-c", "kill `pgrep -f quake.server` ; sleep 0.1; echo 'xxx'; python3 -m quake.server --workers={}".format(hostnames)] - cmd = cmd_prefix + ["mpihead", "/bin/bash", "-c", "python3 -m quake.server --debug --workers={}".format(hostnames)] + cmd = cmd_prefix + [ + "mpihead", + "/bin/bash", + "-c", + "python3 -m quake.server --debug --workers={}".format(hostnames), + ] # print(" ".join(cmd)) popen_helper(cmd, logfile=logdir.join("server")) ps.append(p) diff --git a/tests/test_datasrv.py b/tests/test_datasrv.py index 91940bdd3c647fd94701576545180d3db2f6ae44..fe67930121f5a7ca656955f6fa60f503bda13254 100644 --- a/tests/test_datasrv.py +++ b/tests/test_datasrv.py @@ -12,12 +12,36 @@ def test_data_service(tmpdir, root_dir): env = {"PYTHONPATH": root_dir} ps = [] ps.append( - subprocess.Popen(["python3", "-m", "quake.datasrv", "--port", str(PORT1), str(tmpdir.join("srv1"))], env=env)) + subprocess.Popen( + [ + "python3", + "-m", + "quake.datasrv", + "--port", + str(PORT1), + str(tmpdir.join("srv1")), + ], + env=env, + ) + ) ps.append( - subprocess.Popen(["python3", "-m", "quake.datasrv", "--port", str(PORT2), str(tmpdir.join("srv2"))], env=env)) + subprocess.Popen( + [ + "python3", + "-m", + "quake.datasrv", + "--port", + str(PORT2), + str(tmpdir.join("srv2")), + ], + env=env, + ) + ) async def main(): - connection1 = abrpc.Connection(await asyncio.open_connection("localhost", PORT1)) + connection1 = abrpc.Connection( + await asyncio.open_connection("localhost", PORT1) + ) asyncio.ensure_future(connection1.serve()) assert [] == await connection1.call("list_objects") await connection1.call("upload", "x_1", b"123") @@ -26,7 +50,9 @@ def test_data_service(tmpdir, root_dir): data = await connection1.call("get_data", "x_1") assert data == b"123" - connection2 = abrpc.Connection(await asyncio.open_connection("localhost", PORT2)) + connection2 = abrpc.Connection( + await asyncio.open_connection("localhost", PORT2) + ) asyncio.ensure_future(connection2.serve()) c1 = connection2.call("get_data", "x_1", "localhost", PORT1) c2 = connection2.call("get_data", "x_1", "localhost", PORT1) @@ -44,32 +70,36 @@ def test_data_service(tmpdir, root_dir): s1 = await connection1.call("get_stats") s2 = await connection2.call("get_stats") - assert s1 == {'connections': 0, - 'obj_data_provided': 2, - 'obj_fetched': 0, - # 'obj_file_provided': 0 - } - assert s2 == {'connections': 1, - 'obj_data_provided': 2, - 'obj_fetched': 1, - # 'obj_file_provided': 2 - } + assert s1 == { + "connections": 0, + "obj_data_provided": 2, + "obj_fetched": 0, + # 'obj_file_provided': 0 + } + assert s2 == { + "connections": 1, + "obj_data_provided": 2, + "obj_fetched": 1, + # 'obj_file_provided': 2 + } assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2) assert b"123" == await connection1.call("get_data", "x_1", "localhost", PORT2) s1 = await connection1.call("get_stats") s2 = await connection2.call("get_stats") - assert s1 == {'connections': 1, - 'obj_data_provided': 4, - 'obj_fetched': 1, - # 'obj_file_provided': 0 - } - assert s2 == {'connections': 1, - 'obj_data_provided': 3, - 'obj_fetched': 1, - # 'obj_file_provided': 2 - } + assert s1 == { + "connections": 1, + "obj_data_provided": 4, + "obj_fetched": 1, + # 'obj_file_provided': 0 + } + assert s2 == { + "connections": 1, + "obj_data_provided": 3, + "obj_fetched": 1, + # 'obj_file_provided': 2 + } try: time.sleep(0.3) diff --git a/tests/test_layout.py b/tests/test_layout.py index 45853d028a31f5fee052de7827c09ccc36a86066..665a721f14dadbd9048c08cbb6b7693a17ff89de 100644 --- a/tests/test_layout.py +++ b/tests/test_layout.py @@ -1,4 +1,3 @@ - from quake.common.taskinput import Layout @@ -14,17 +13,29 @@ def test_layouts(): ] assert check(Layout(1, 0, 0, 1), [11], 4) == [ - [(11, 0)], [(11, 1)], [(11, 2)], [(11, 3)], + [(11, 0)], + [(11, 1)], + [(11, 2)], + [(11, 3)], ] assert check(Layout(1, 0, 0, 1), [11], 2) == [ - [(11, 0)], [(11, 1)], [(11, 0)], [(11, 1)], + [(11, 0)], + [(11, 1)], + [(11, 0)], + [(11, 1)], ] assert check(Layout(1, 0, 0, 1), [20, 21, 22, 23], 1) == [ - [(20, 0)], [(21, 0)], [(22, 0)], [(23, 0)], + [(20, 0)], + [(21, 0)], + [(22, 0)], + [(23, 0)], ] assert check(Layout(2, 0, 0, 2), [20, 21, 22, 23], 2) == [ - [(20, 0), (21, 0)], [(22, 0), (23, 0)], [(20, 1), (21, 1)], [(22, 1), (23, 1)], - ] \ No newline at end of file + [(20, 0), (21, 0)], + [(22, 0), (23, 0)], + [(20, 1), (21, 1)], + [(22, 1), (23, 1)], + ] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 825fc59e0258beb540905998838302df0714b5b2..18e02963f4900a040dc1f79411322fddefa4617c 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -100,7 +100,14 @@ def test_greedy_match(): t1 = Task(1, 1, 3, None, False, []) t2 = Task(2, 1, 1, None, False, []) t3 = Task(3, 1, 2, None, False, []) - t4 = Task(4, 0, 3, None, False, [inp(t1, 0, "scatter"), inp(t2, 0, "all_to_all"), inp(t3, 0, "scatter")]) + t4 = Task( + 4, + 0, + 3, + None, + False, + [inp(t1, 0, "scatter"), inp(t2, 0, "all_to_all"), inp(t3, 0, "scatter")], + ) workers = make_workers(4) state = State(workers) state.add_tasks([t.to_dict() for t in [t1, t2, t3, t4]]) @@ -113,5 +120,5 @@ def test_greedy_match(): state._fake_placement(t3, [[{w1, w2}, {w2}]], [[1, 2]]) s = list(state.schedule()) - assert(len(s) == 1) - assert(s[0][1] == [w2, w3, w1]) \ No newline at end of file + assert len(s) == 1 + assert s[0][1] == [w2, w3, w1] diff --git a/tests/test_task.py b/tests/test_task.py index 00bded674715507e3ef76ba72110806cc271b85b..70a2efa8f62a2371139a4e15c1f814bf26b72a61 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -4,7 +4,13 @@ import pickle import pytest import quake.job -from quake.client.base.task import Task, new_mpirun_task, upload_data, new_py_task, make_input +from quake.client.base.task import ( + Task, + new_mpirun_task, + upload_data, + new_py_task, + make_input, +) def test_task_fail(client): @@ -64,4 +70,6 @@ def test_py_job(client): assert [b"out0", b"out1"] == client.gather(t1, 0) assert [b"out0", b"out1"] == client.gather(t1, 0) # Check the same call again assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(t2, 0) - assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather(t2, 0) # Check the same call again + assert [b"0out0out1", b"1out0out1", b"2out0out1"] == client.gather( + t2, 0 + ) # Check the same call again diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index be54a12514229c14d7d7478e5dcf9be3b7103f6e..3b4fef0b582620ef9f5eeead191dca1e2ec39c77 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -113,4 +113,4 @@ def test_wrapper_error(client): def test_wrapper_too_many_processes(client): quake.set_global_client(client) with pytest.raises(Exception, match="server has only 4 workers"): - quake.wait(big_task()) \ No newline at end of file + quake.wait(big_task())