Commit cf70ccf3 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Migration to Python 3

parent 87bfffea
......@@ -16,6 +16,10 @@ mark_as_advanced(LIBUV_INCLUDE_DIR LIBUV_LIBRARY)
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIRS})
# Python
find_package(PythonLibs 3.4 REQUIRED)
include_directories(${PYTHON_INCLUDE_DIRS})
add_subdirectory(src)
add_subdirectory(tests)
......
from client import Client, LoomException, TaskFailed, make_dry_report # noqa
import tasks # noqa
from .client import Client, LoomException, TaskFailed, make_dry_report # noqa
from . import tasks # noqa
from .connection import Connection
from .task import Task
from .plan import Plan
from ..pb.loomcomm_pb2 import Register, Data, ClientMessage, ClientSubmit
from ..pb.loomreport_pb2 import Report
import socket
from connection import Connection
from task import Task
from plan import Plan
LOOM_PROTOCOL_VERSION = 1
......@@ -133,8 +135,8 @@ class Client(object):
if type_id == self.rawdata_id:
return self.connection.read_data(msg_data.size)
if type_id == self.array_id:
return [self._receive_data() for i in xrange(msg_data.length)]
print type_id, self.array_id, self.rawdata_id
return [self._receive_data() for i in range(msg_data.length)]
print(type_id, self.array_id, self.rawdata_id)
assert 0
def _send_message(self, message):
......@@ -165,5 +167,5 @@ def make_dry_report(tasks, report_filename):
def write_report(report_data, report_filename):
with open(report_filename, "w") as f:
with open(report_filename, "wb") as f:
f.write(report_data.SerializeToString())
......@@ -8,7 +8,7 @@ class Connection(object):
def __init__(self, socket):
self.socket = socket
self.data = ""
self.data = bytes()
def receive_message(self):
while True:
......@@ -26,7 +26,7 @@ class Connection(object):
self.data += new_data
def read_data(self, data_size):
result = ""
result = bytes()
while True:
change = min(data_size, len(self.data))
result += self.data[:change]
......
......@@ -43,7 +43,10 @@ class Plan(object):
# Build tasks
for task in tasks:
msg_t = msg.tasks.add()
msg_t.config = task.config
config = task.config
if isinstance(config, str):
config = bytes(config, encoding="utf-8")
msg_t.config = config
msg_t.task_type = symbols[task.task_type]
msg_t.input_ids.extend(self.tasks[t] for t in task.inputs)
msg_t.policy = task.policy
......
......@@ -13,7 +13,7 @@ from google.protobuf import descriptor_pb2
_sym_db = _symbol_database.Default()
import loomplan_pb2
from . import loomplan_pb2
DESCRIPTOR = _descriptor.FileDescriptor(
......
......@@ -13,8 +13,8 @@ from google.protobuf import descriptor_pb2
_sym_db = _symbol_database.Default()
import loomplan_pb2
import loomcomm_pb2
from . import loomplan_pb2
from . import loomcomm_pb2
DESCRIPTOR = _descriptor.FileDescriptor(
......
......@@ -18,16 +18,16 @@ def test_make_array(loom_env):
result_d, result_e0, result_e1, result_e2 = \
loom_env.submit((d, e0, e1, e2))
assert result_d == ["ABC", "123456", ""]
assert result_e0 == "ABC"
assert result_e1 == "123456"
assert result_e2 == ""
assert result_d == [b"ABC", b"123456", b""]
assert result_e0 == b"ABC"
assert result_e1 == b"123456"
assert result_e2 == b""
def test_slice_array(loom_env):
loom_env.start(1)
items = [tasks.const(str(i)) for i in xrange(20)]
items = [tasks.const(str(i)) for i in range(20)]
a = tasks.array_make(items)
e0 = tasks.slice(a, 0, 100)
......@@ -38,11 +38,11 @@ def test_slice_array(loom_env):
r0, r1, r2, r3, r4 = \
loom_env.submit((e0, e1, e2, e3, e4))
assert r0 == list(map(str, range(20)))
assert r0 == [bytes(str(i), "ascii") for i in range(0, 20)]
assert r1 == []
assert r2 == ['2', '3']
assert r2 == [b'2', b'3']
assert r3 == []
assert r4 == list(map(str, range(4, 20)))
assert r4 == [bytes(str(i), "ascii") for i in range(4, 20)]
def test_make_empty_array(loom_env):
......@@ -63,13 +63,13 @@ def test_array_of_array(loom_env):
loom_env.start(1)
result_d, result_e, result_f = loom_env.submit((d, e, f))
assert result_d == ["ABC", "123"]
assert result_e == [["ABC", "123"], "+++"]
assert result_f == ["ABC", "123"]
assert result_d == [b"ABC", b"123"]
assert result_e == [[b"ABC", b"123"], b"+++"]
assert result_f == [b"ABC", b"123"]
def test_array_same_value(loom_env):
a = tasks.const("ABC")
b = tasks.array_make((a, a, a, a))
loom_env.start(1)
assert ["ABC"] * 4 == loom_env.submit(b)
assert [b"ABC"] * 4 == loom_env.submit(b)
......@@ -10,7 +10,7 @@ loom_env # silence flake8
def test_cv_iris(loom_env):
CHUNKS = 15
CHUNK_SIZE = 150 / CHUNKS # There are 150 irises
CHUNK_SIZE = 150 // CHUNKS # There are 150 irises
loom_env.start(4, 4)
loom_env.info = False
......@@ -20,22 +20,22 @@ def test_cv_iris(loom_env):
lines = tasks.split(data)
chunks = [tasks.slice(lines, i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE)
for i in xrange(CHUNKS)]
for i in range(CHUNKS)]
trainsets = [tasks.merge(chunks[:i] + chunks[i + 1:])
for i in xrange(CHUNKS)]
for i in range(CHUNKS)]
models = []
for i, ts in enumerate(trainsets):
model = tasks.run("svm-train data",
[(ts, "data")], ["data.model"])
[(ts, "data")], ["data.model"])
model.label = "svm-train: {}".format(i)
models.append(model)
predict = []
for chunk, model in zip(chunks, models):
task = tasks.run("svm-predict testdata model out",
[(chunk, "testdata"), (model, "model")])
[(chunk, "testdata"), (model, "model")])
task.label = "svm-predict"
predict.append(task)
......@@ -45,4 +45,4 @@ def test_cv_iris(loom_env):
assert len(results) == CHUNKS
for line in results:
assert line.startswith("Accuracy = ")
assert line.startswith(b"Accuracy = ")
......@@ -4,7 +4,6 @@ import loom.client.tasks as tasks # noqa
import struct
from datetime import datetime
import os
from loom import client
FILE1 = os.path.join(LOOM_TEST_DATA_DIR, "file1")
FILE2 = os.path.join(LOOM_TEST_DATA_DIR, "file2")
......@@ -22,11 +21,11 @@ def pytestprog(sleep, op="copy", stamp=False, file_out=None, file_in=None):
args.append("--out=" + file_out)
args.append(op)
args.append(sleep)
return map(str, args)
return list(str(a) for a in args)
def str2datetime(s):
parts = s.split('.')
def str2datetime(data):
parts = data.decode().split(".")
dt = datetime.strptime(parts[0], "%Y-%m-%d %H:%M:%S")
return dt.replace(microsecond=int(parts[1]))
......@@ -36,7 +35,7 @@ def test_single_result(loom_env):
a = tasks.const("ABCDE")
b = tasks.const("123")
c = tasks.merge((a, b))
assert "ABCDE123" == loom_env.submit(c)
assert b"ABCDE123" == loom_env.submit(c)
def test_more_results(loom_env):
......@@ -44,7 +43,7 @@ def test_more_results(loom_env):
a = tasks.const("ABCDE")
b = tasks.const("123")
c = tasks.merge((a, b))
assert ["ABCDE123"] * 2 == loom_env.submit([c, c])
assert [b"ABCDE123"] * 2 == loom_env.submit([c, c])
def test_merge_w3(loom_env):
......@@ -52,21 +51,21 @@ def test_merge_w3(loom_env):
a = tasks.const("ABCDE")
b = tasks.const("123")
c = tasks.merge((a, b))
assert "ABCDE123" == loom_env.submit(c)
assert b"ABCDE123" == loom_env.submit(c)
def test_merge_delimiter(loom_env):
loom_env.start(1)
consts = [tasks.const(str(i)) for i in xrange(10)]
consts = [tasks.const(str(i)) for i in range(10)]
c = tasks.merge(consts, "abc")
expected = "abc".join(str(i) for i in xrange(10))
expected = bytes("abc".join(str(i) for i in range(10)), "ascii")
assert expected == loom_env.submit(c)
def test_merge_empty_with_delimiter(loom_env):
loom_env.start(1)
c = tasks.merge((), "abc")
assert "" == loom_env.submit(c)
assert b"" == loom_env.submit(c)
def test_run_separated_1_cpu(loom_env):
......@@ -78,7 +77,7 @@ def test_run_separated_1_cpu(loom_env):
starts = []
for result in results:
line1, line2 = result.strip().split("\n")
line1, line2 = result.strip().split(b"\n")
starts.append(str2datetime(line1))
for i in range(len(starts)):
......@@ -100,7 +99,7 @@ def test_run_separated_4_cpu(loom_env):
starts = []
for result in results:
line1, line2 = result.strip().split("\n")
line1, line2 = result.strip().split(b"\n")
starts.append(str2datetime(line1))
for i in range(len(starts)):
......@@ -122,7 +121,7 @@ def test_run_separated_4cpu_tasks_4_cpu(loom_env):
starts = []
for result in results:
line1, line2 = result.strip().split("\n")
line1, line2 = result.strip().split(b"\n")
starts.append(str2datetime(line1))
for i in range(len(starts)):
......@@ -148,7 +147,7 @@ def test_run_double_lines(loom_env):
result = tasks.merge((c1, c2, a2))
expect = "chk" * COUNT + "llmmnn" * COUNT + "kkllmm" * COUNT
expect = b"chk" * COUNT + b"llmmnn" * COUNT + b"kkllmm" * COUNT
for i in range(1, 4):
# print "Runnig for {}".format(i)
......@@ -172,7 +171,7 @@ def test_run_files(loom_env):
loom_env.start(1)
result = loom_env.submit(c1)
assert result == "cdef" * 100
assert result == b"cdef" * 100
def test_run_variable2(loom_env):
......@@ -181,7 +180,7 @@ def test_run_variable2(loom_env):
c = tasks.run("/bin/echo $xyz xyz $ab $c", inputs=[(a, "$xyz"), (b, "$c")])
loom_env.start(1)
result = loom_env.submit(c)
assert result == "123 xyz $ab 456\n"
assert result == b"123 xyz $ab 456\n"
def test_open_and_merge(loom_env):
......@@ -190,9 +189,9 @@ def test_open_and_merge(loom_env):
c = tasks.merge((a, b))
loom_env.start(1)
result = loom_env.submit(c)
expect = ("This is file 1\n" +
"\n".join("Line {}".format(i) for i in xrange(1, 13)) +
"\n")
expect = bytes("This is file 1\n" +
"\n".join("Line {}".format(i) for i in range(1, 13)) +
"\n", "ascii")
assert result == expect
......@@ -204,14 +203,14 @@ def test_open_and_splitlines(loom_env):
c2 = tasks.slice(lines, 0, 6)
c3 = tasks.slice(lines, 3, 60)
result1, result2, result3 = loom_env.submit([c1, c2, c3])
expect1 = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result1 == expect1
expect1 = "\n".join("Line {}".format(i) for i in range(3, 7)) + "\n"
assert result1.decode() == expect1
expect2 = "\n".join("Line {}".format(i) for i in xrange(1, 7)) + "\n"
assert result2 == expect2
expect2 = "\n".join("Line {}".format(i) for i in range(1, 7)) + "\n"
assert result2.decode() == expect2
expect3 = "\n".join("Line {}".format(i) for i in xrange(4, 13)) + "\n"
assert result3 == expect3
expect3 = "\n".join("Line {}".format(i) for i in range(4, 13)) + "\n"
assert result3.decode() == expect3
def test_split(loom_env):
......@@ -224,7 +223,7 @@ def test_split(loom_env):
e = tasks.slice(b, 0, 2)
f = tasks.slice(b, 10, 20)
r1, r2, r3, r4 = loom_env.submit((c, d, e, f))
r1, r2, r3, r4 = [r.decode() for r in loom_env.submit((c, d, e, f))]
assert r1 == "Line2\n"
assert r2 == "Line4"
......
......@@ -7,7 +7,7 @@ loom_env # silence flake8
def test_dslice(loom_env):
loom_env.start(2)
consts = []
for i in xrange(16):
for i in range(16):
consts.append(tasks.const("data{}".format(i)))
a = tasks.array_make(consts)
ds = tasks.dslice(a)
......@@ -16,17 +16,19 @@ def test_dslice(loom_env):
result = loom_env.submit(r)
assert len(result) >= 2
assert result == ["data{}".format(i) for i in xrange(0, 16, 2)]
assert result == [bytes("data{}".format(i), "ascii")
for i in range(0, 16, 2)]
def test_dget(loom_env):
loom_env.start(2)
consts = []
for i in xrange(16):
for i in range(16):
consts.append(tasks.const("data{}".format(i)))
a = tasks.array_make(consts)
ds = tasks.dget(a)
f = tasks.run("/bin/cat", stdin=ds)
r = tasks.array_make((f,))
result = loom_env.submit(r)
assert result == ["data{}".format(i) for i in xrange(16)]
assert result == [bytes("data{}".format(i), "ascii")
for i in range(16)]
......@@ -72,7 +72,7 @@ class LoomEnv(Env):
if VALGRIND:
time.sleep(2)
worker_args = valgrind_args + worker_args
for i in xrange(workers_count):
for i in range(workers_count):
w = self.start_process("worker{}".format(i), worker_args)
workers.append(w)
time.sleep(0.1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment