Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
HyperLoom
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
Operations
Operations
Incidents
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
ADAS
HyperLoom
Commits
37ba5dcb
Commit
37ba5dcb
authored
Jul 12, 2016
by
Stanislav Bohm
Browse files
Options
Browse Files
Download
Plain Diff
Merging branches
parents
9be6b0ca
f7a48e01
Changes
37
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
37 changed files
with
984 additions
and
1086 deletions
+984
-1086
src/client/client.py
src/client/client.py
+6
-2
src/client/loomcomm_pb2.py
src/client/loomcomm_pb2.py
+16
-9
src/client/loomrun_pb2.py
src/client/loomrun_pb2.py
+12
-65
src/client/plan.py
src/client/plan.py
+40
-4
src/libloom/CMakeLists.txt
src/libloom/CMakeLists.txt
+2
-0
src/libloom/connection.cpp
src/libloom/connection.cpp
+0
-2
src/libloom/data.cpp
src/libloom/data.cpp
+5
-5
src/libloom/data.h
src/libloom/data.h
+4
-4
src/libloom/data/array.cpp
src/libloom/data/array.cpp
+119
-0
src/libloom/data/array.h
src/libloom/data/array.h
+61
-0
src/libloom/data/rawdata.cpp
src/libloom/data/rawdata.cpp
+14
-13
src/libloom/data/rawdata.h
src/libloom/data/rawdata.h
+1
-0
src/libloom/interconnect.cpp
src/libloom/interconnect.cpp
+1
-1
src/libloom/loomcomm.pb.cc
src/libloom/loomcomm.pb.cc
+50
-17
src/libloom/loomcomm.pb.h
src/libloom/loomcomm.pb.h
+57
-23
src/libloom/taskinstance.cpp
src/libloom/taskinstance.cpp
+3
-3
src/libloom/taskinstance.h
src/libloom/taskinstance.h
+1
-2
src/libloom/worker.cpp
src/libloom/worker.cpp
+6
-1
src/libloom/worker.h
src/libloom/worker.h
+2
-2
src/proto/loomcomm.proto
src/proto/loomcomm.proto
+2
-1
src/proto/loomrun.proto
src/proto/loomrun.proto
+2
-8
src/server/dummyworker.cpp
src/server/dummyworker.cpp
+4
-0
src/server/taskmanager.cpp
src/server/taskmanager.cpp
+16
-2
src/server/tasknode.h
src/server/tasknode.h
+0
-1
src/server/workerconn.cpp
src/server/workerconn.cpp
+1
-0
src/worker/CMakeLists.txt
src/worker/CMakeLists.txt
+2
-0
src/worker/arraytasks.cpp
src/worker/arraytasks.cpp
+27
-0
src/worker/arraytasks.h
src/worker/arraytasks.h
+22
-0
src/worker/basictasks.cpp
src/worker/basictasks.cpp
+15
-11
src/worker/loomrun.pb.cc
src/worker/loomrun.pb.cc
+50
-346
src/worker/loomrun.pb.h
src/worker/loomrun.pb.h
+133
-370
src/worker/main.cpp
src/worker/main.cpp
+10
-1
src/worker/runtask.cpp
src/worker/runtask.cpp
+153
-140
tests/client/array_test.py
tests/client/array_test.py
+56
-0
tests/client/cv_test.py
tests/client/cv_test.py
+6
-8
tests/client/data_test.py
tests/client/data_test.py
+8
-6
tests/server/scheduler.cpp
tests/server/scheduler.cpp
+77
-39
No files found.
src/client/client.py
View file @
37ba5dcb
...
...
@@ -66,8 +66,12 @@ class Client(object):
def
_receive_data
(
self
):
msg_data
=
Data
()
msg_data
.
ParseFromString
(
self
.
connection
.
receive_message
())
assert
msg_data
.
type_id
==
300
return
self
.
connection
.
read_data
(
msg_data
.
size
)
type_id
=
msg_data
.
type_id
if
type_id
==
300
:
# Data
return
self
.
connection
.
read_data
(
msg_data
.
arg0_u64
)
if
type_id
==
400
:
# Array
return
[
self
.
_receive_data
()
for
i
in
xrange
(
msg_data
.
arg0_u64
)]
assert
0
def
_send_message
(
self
,
message
):
data
=
message
.
SerializeToString
()
...
...
src/client/loomcomm_pb2.py
View file @
37ba5dcb
...
...
@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR
=
_descriptor
.
FileDescriptor
(
name
=
'loomcomm.proto'
,
package
=
'loomcomm'
,
serialized_pb
=
_b
(
'
\n\x0e
loomcomm.proto
\x12\x08
loomcomm
\"\xbb\x01\n\x08
Register
\x12\x18\n\x10
protocol_version
\x18\x01
\x02
(
\x05\x12
%
\n\x04
type
\x18\x02
\x02
(
\x0e\x32\x17
.loomcomm.Register.Type
\x12\x0c\n\x04
port
\x18\x03
\x01
(
\x05\x12\x12\n\n
task_types
\x18\x04
\x03
(
\t\x12\x0c\n\x04\x63
pus
\x18\x05
\x01
(
\x05\x12\x0c\n\x04
info
\x18\n
\x01
(
\x08\"
0
\n\x04
Type
\x12\x13\n\x0f
REGISTER_WORKER
\x10\x01\x12\x13\n\x0f
REGISTER_CLIENT
\x10\x02\"
&
\n\r
ServerMessage
\"\x15\n\x04
Type
\x12\r\n\t
START_JOB
\x10\x01\"\xf1\x01\n\r
WorkerCommand
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.WorkerCommand.Type
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\x12\x11\n\t
task_type
\x18\x03
\x01
(
\x05\x12\x13\n\x0b
task_config
\x18\x04
\x01
(
\t\x12\x13\n\x0b
task_inputs
\x18\x05
\x03
(
\x05\x12\x0f\n\x07\x61\x64\x64
ress
\x18\n
\x01
(
\t\x12\x11\n\t
with_size
\x18\x0b
\x01
(
\x08\x12\x0f\n\x07
symbols
\x18\x64
\x03
(
\t\"
6
\n\x04
Type
\x12\x08\n\x04
TASK
\x10\x01\x12\x08\n\x04
SEND
\x10\x02\x12\n\n\x06
REMOVE
\x10\x03\x12\x0e\n\n
DICTIONARY
\x10\x04\"\x1c\n\x0e
WorkerResponse
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\"\x18\n\x08\x41
nnounce
\x12\x0c\n\x04
port
\x18\x01
\x02
(
\x05\"
-
\n\x0c\x44\x61
taPrologue
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x11\n\t
data_size
\x18\x03
\x01
(
\x04\"
%
\n\x04\x44\x61
ta
\x12\x0f\n\x07
type_id
\x18\x01
\x02
(
\x05\x12\x0c\n\x04
size
\x18\x02
\x01
(
\x04\"\"\n\x04
Info
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x0e\n\x06
worker
\x18\x02
\x02
(
\t\"\x9b\x01\n\r
ClientMessage
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.ClientMessage.Type
\x12
$
\n\x04\x64\x61
ta
\x18\x02
\x01
(
\x0b\x32\x16
.loomcomm.DataPrologue
\x12\x1c\n\x04
info
\x18\x03
\x01
(
\x0b\x32\x0e
.loomcomm.Info
\"\x1a\n\x04
Type
\x12\x08\n\x04\x44\x41
TA
\x10\x01\x12\x08\n\x04
INFO
\x10\x02\x42\x02
H
\x03
'
)
serialized_pb
=
_b
(
'
\n\x0e
loomcomm.proto
\x12\x08
loomcomm
\"\xbb\x01\n\x08
Register
\x12\x18\n\x10
protocol_version
\x18\x01
\x02
(
\x05\x12
%
\n\x04
type
\x18\x02
\x02
(
\x0e\x32\x17
.loomcomm.Register.Type
\x12\x0c\n\x04
port
\x18\x03
\x01
(
\x05\x12\x12\n\n
task_types
\x18\x04
\x03
(
\t\x12\x0c\n\x04\x63
pus
\x18\x05
\x01
(
\x05\x12\x0c\n\x04
info
\x18\n
\x01
(
\x08\"
0
\n\x04
Type
\x12\x13\n\x0f
REGISTER_WORKER
\x10\x01\x12\x13\n\x0f
REGISTER_CLIENT
\x10\x02\"
&
\n\r
ServerMessage
\"\x15\n\x04
Type
\x12\r\n\t
START_JOB
\x10\x01\"\xf1\x01\n\r
WorkerCommand
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.WorkerCommand.Type
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\x12\x11\n\t
task_type
\x18\x03
\x01
(
\x05\x12\x13\n\x0b
task_config
\x18\x04
\x01
(
\t\x12\x13\n\x0b
task_inputs
\x18\x05
\x03
(
\x05\x12\x0f\n\x07\x61\x64\x64
ress
\x18\n
\x01
(
\t\x12\x11\n\t
with_size
\x18\x0b
\x01
(
\x08\x12\x0f\n\x07
symbols
\x18\x64
\x03
(
\t\"
6
\n\x04
Type
\x12\x08\n\x04
TASK
\x10\x01\x12\x08\n\x04
SEND
\x10\x02\x12\n\n\x06
REMOVE
\x10\x03\x12\x0e\n\n
DICTIONARY
\x10\x04\"\x1c\n\x0e
WorkerResponse
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\"\x18\n\x08\x41
nnounce
\x12\x0c\n\x04
port
\x18\x01
\x02
(
\x05\"
-
\n\x0c\x44\x61
taPrologue
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x11\n\t
data_size
\x18\x03
\x01
(
\x04\"
;
\n\x04\x44\x61
ta
\x12\x0f\n\x07
type_id
\x18\x01
\x02
(
\x05\x12\x10\n\x08\x61
rg0_u64
\x18\x02
\x01
(
\x04\x12\x10\n\x08\x61
rg1_u64
\x18\x03
\x01
(
\x04\"\"\n\x04
Info
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x0e\n\x06
worker
\x18\x02
\x02
(
\t\"\x9b\x01\n\r
ClientMessage
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.ClientMessage.Type
\x12
$
\n\x04\x64\x61
ta
\x18\x02
\x01
(
\x0b\x32\x16
.loomcomm.DataPrologue
\x12\x1c\n\x04
info
\x18\x03
\x01
(
\x0b\x32\x0e
.loomcomm.Info
\"\x1a\n\x04
Type
\x12\x08\n\x04\x44\x41
TA
\x10\x01\x12\x08\n\x04
INFO
\x10\x02\x42\x02
H
\x03
'
)
)
_sym_db
.
RegisterFileDescriptor
(
DESCRIPTOR
)
...
...
@@ -111,8 +111,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type
=
None
,
options
=
None
,
serialized_start
=
8
10
,
serialized_end
=
8
36
,
serialized_start
=
8
32
,
serialized_end
=
8
58
,
)
_sym_db
.
RegisterEnumDescriptor
(
_CLIENTMESSAGE_TYPE
)
...
...
@@ -399,12 +399,19 @@ _DATA = _descriptor.Descriptor(
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'
size'
,
full_name
=
'loomcomm.Data.size
'
,
index
=
1
,
name
=
'
arg0_u64'
,
full_name
=
'loomcomm.Data.arg0_u64
'
,
index
=
1
,
number
=
2
,
type
=
4
,
cpp_type
=
4
,
label
=
1
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'arg1_u64'
,
full_name
=
'loomcomm.Data.arg1_u64'
,
index
=
2
,
number
=
3
,
type
=
4
,
cpp_type
=
4
,
label
=
1
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
],
extensions
=
[
],
...
...
@@ -417,7 +424,7 @@ _DATA = _descriptor.Descriptor(
oneofs
=
[
],
serialized_start
=
605
,
serialized_end
=
6
42
,
serialized_end
=
6
64
,
)
...
...
@@ -453,8 +460,8 @@ _INFO = _descriptor.Descriptor(
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
6
44
,
serialized_end
=
678
,
serialized_start
=
6
66
,
serialized_end
=
700
,
)
...
...
@@ -498,8 +505,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
681
,
serialized_end
=
8
36
,
serialized_start
=
703
,
serialized_end
=
8
58
,
)
_REGISTER
.
fields_by_name
[
'type'
].
enum_type
=
_REGISTER_TYPE
...
...
src/client/loomrun_pb2.py
View file @
37ba5dcb
...
...
@@ -18,64 +18,13 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR
=
_descriptor
.
FileDescriptor
(
name
=
'loomrun.proto'
,
package
=
'loomrun'
,
serialized_pb
=
_b
(
'
\n\r
loomrun.proto
\x12\x07
loomrun
\"
X
\n\x07
MapFile
\x12\x10\n\x08\x66
ilename
\x18\x01
\x02
(
\t\x12\x13\n\x0b
input_index
\x18\x02
\x02
(
\x05\x12\x14\n\x0c
output_index
\x18\x03
\x02
(
\x05\x12\x10\n\x08
variable
\x18\x04
\x01
(
\t\"
3
\n\x03
Run
\x12\x0c\n\x04\x61
rgs
\x18\x01
\x03
(
\t\x12\x1e\n\x04
maps
\x18\x02
\x03
(
\x0b\x32\x10
.loomrun.MapFile
B
\x02
H
\x03
'
)
serialized_pb
=
_b
(
'
\n\r
loomrun.proto
\x12\x07
loomrun
\"
<
\n\x03
Run
\x12\x0c\n\x04\x61
rgs
\x18\x01
\x03
(
\t\x12\x12\n\n
map_inputs
\x18\x02
\x03
(
\t\x12\x13\n\x0b
map_outputs
\x18\x03
\x03
(
\t
B
\x02
H
\x03
'
)
)
_sym_db
.
RegisterFileDescriptor
(
DESCRIPTOR
)
_MAPFILE
=
_descriptor
.
Descriptor
(
name
=
'MapFile'
,
full_name
=
'loomrun.MapFile'
,
filename
=
None
,
file
=
DESCRIPTOR
,
containing_type
=
None
,
fields
=
[
_descriptor
.
FieldDescriptor
(
name
=
'filename'
,
full_name
=
'loomrun.MapFile.filename'
,
index
=
0
,
number
=
1
,
type
=
9
,
cpp_type
=
9
,
label
=
2
,
has_default_value
=
False
,
default_value
=
_b
(
""
).
decode
(
'utf-8'
),
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'input_index'
,
full_name
=
'loomrun.MapFile.input_index'
,
index
=
1
,
number
=
2
,
type
=
5
,
cpp_type
=
1
,
label
=
2
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'output_index'
,
full_name
=
'loomrun.MapFile.output_index'
,
index
=
2
,
number
=
3
,
type
=
5
,
cpp_type
=
1
,
label
=
2
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'variable'
,
full_name
=
'loomrun.MapFile.variable'
,
index
=
3
,
number
=
4
,
type
=
9
,
cpp_type
=
9
,
label
=
1
,
has_default_value
=
False
,
default_value
=
_b
(
""
).
decode
(
'utf-8'
),
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
],
extensions
=
[
],
nested_types
=
[],
enum_types
=
[
],
options
=
None
,
is_extendable
=
False
,
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
26
,
serialized_end
=
114
,
)
_RUN
=
_descriptor
.
Descriptor
(
name
=
'Run'
,
full_name
=
'loomrun.Run'
,
...
...
@@ -91,8 +40,15 @@ _RUN = _descriptor.Descriptor(
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'maps'
,
full_name
=
'loomrun.Run.maps'
,
index
=
1
,
number
=
2
,
type
=
11
,
cpp_type
=
10
,
label
=
3
,
name
=
'map_inputs'
,
full_name
=
'loomrun.Run.map_inputs'
,
index
=
1
,
number
=
2
,
type
=
9
,
cpp_type
=
9
,
label
=
3
,
has_default_value
=
False
,
default_value
=
[],
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'map_outputs'
,
full_name
=
'loomrun.Run.map_outputs'
,
index
=
2
,
number
=
3
,
type
=
9
,
cpp_type
=
9
,
label
=
3
,
has_default_value
=
False
,
default_value
=
[],
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
...
...
@@ -108,21 +64,12 @@ _RUN = _descriptor.Descriptor(
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
11
6
,
serialized_end
=
167
,
serialized_start
=
2
6
,
serialized_end
=
86
,
)
_RUN
.
fields_by_name
[
'maps'
].
message_type
=
_MAPFILE
DESCRIPTOR
.
message_types_by_name
[
'MapFile'
]
=
_MAPFILE
DESCRIPTOR
.
message_types_by_name
[
'Run'
]
=
_RUN
MapFile
=
_reflection
.
GeneratedProtocolMessageType
(
'MapFile'
,
(
_message
.
Message
,),
dict
(
DESCRIPTOR
=
_MAPFILE
,
__module__
=
'loomrun_pb2'
# @@protoc_insertion_point(class_scope:loomrun.MapFile)
))
_sym_db
.
RegisterMessage
(
MapFile
)
Run
=
_reflection
.
GeneratedProtocolMessageType
(
'Run'
,
(
_message
.
Message
,),
dict
(
DESCRIPTOR
=
_RUN
,
__module__
=
'loomrun_pb2'
...
...
src/client/plan.py
View file @
37ba5dcb
...
...
@@ -53,7 +53,7 @@ class OpenTask(Task):
class
SplitLinesTask
(
Task
):
task_type
=
"split_lines"
struct
=
u32
=
struct
.
Struct
(
"<QQ"
)
struct
=
struct
.
Struct
(
"<QQ"
)
def
__init__
(
self
,
input
,
start
,
end
):
self
.
config
=
self
.
struct
.
pack
(
start
,
end
)
...
...
@@ -137,6 +137,12 @@ class RunTask(Task):
class
Plan
(
object
):
TASK_ARRAY_MAKE
=
"array/make"
TASK_ARRAY_GET
=
"array/get"
TASK_RUN
=
"run"
u64
=
struct
.
Struct
(
"<Q"
)
def
__init__
(
self
):
self
.
tasks
=
[]
self
.
task_types
=
set
()
...
...
@@ -160,9 +166,39 @@ class Plan(object):
def
task_split_lines
(
self
,
input
,
start
,
end
):
return
self
.
add
(
SplitLinesTask
(
input
,
start
,
end
))
def
task_run
(
self
,
args
,
stdin
=
None
,
stdout
=
None
,
variable
=
None
):
return
self
.
add
(
RunTask
(
args
,
stdin
=
stdin
,
stdout
=
stdout
,
variable
=
variable
))
def
task_run
(
self
,
args
,
inputs
=
(),
outputs
=
(
None
,),
stdin
=
None
):
if
isinstance
(
args
,
str
):
args
=
args
.
split
()
if
stdin
is
not
None
:
inputs
=
((
stdin
,
None
),)
+
tuple
(
inputs
)
task
=
Task
()
task
.
task_type
=
self
.
TASK_RUN
task
.
inputs
=
tuple
(
i
for
i
,
fname
in
inputs
)
msg
=
loomrun_pb2
.
Run
()
msg
.
args
.
extend
(
args
)
msg
.
map_inputs
.
extend
(
fname
if
fname
else
"+in"
for
i
,
fname
in
inputs
)
msg
.
map_outputs
.
extend
(
fname
if
fname
else
"+out"
for
fname
in
outputs
)
task
.
config
=
msg
.
SerializeToString
()
return
self
.
add
(
task
)
def
task_array_make
(
self
,
inputs
):
task
=
Task
()
task
.
task_type
=
self
.
TASK_ARRAY_MAKE
task
.
inputs
=
inputs
return
self
.
add
(
task
)
def
task_array_get
(
self
,
input
,
index
):
task
=
Task
()
task
.
task_type
=
self
.
TASK_ARRAY_GET
task
.
inputs
=
(
input
,)
task
.
config
=
self
.
u64
.
pack
(
index
)
return
self
.
add
(
task
)
def
create_message
(
self
):
task_types
=
list
(
self
.
task_types
)
...
...
src/libloom/CMakeLists.txt
View file @
37ba5dcb
...
...
@@ -4,6 +4,8 @@ add_library(libloom
data/externfile.h
data/rawdata.h
data/rawdata.cpp
data/array.h
data/array.cpp
connection.cpp
connection.h
worker.cpp
...
...
src/libloom/connection.cpp
View file @
37ba5dcb
...
...
@@ -22,8 +22,6 @@ Connection::~Connection()
assert
(
state
==
ConnectionClosed
);
}
std
::
string
Connection
::
get_peername
()
{
sockaddr_in
addr
;
...
...
src/libloom/data.cpp
View file @
37ba5dcb
...
...
@@ -12,16 +12,16 @@ void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &
{
loomcomm
::
Data
msg
;
msg
.
set_type_id
(
get_type_id
());
msg
.
set_size
(
get_size
());
//
init_message(worker, msg);
//
msg.set_size(get_size());
init_message
(
worker
,
msg
);
buffer
.
add
(
msg
);
serialize_data
(
worker
,
buffer
,
data_ptr
);
}
/*void init_message(Worker &worker, loomcomm::Data &msg)
void
Data
::
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
}
*/
}
char
*
Data
::
get_raw_data
(
Worker
&
worker
)
{
...
...
@@ -30,7 +30,7 @@ char *Data::get_raw_data(Worker &worker)
std
::
string
Data
::
get_filename
()
const
{
return
""
;
return
""
;
}
DataUnpacker
::~
DataUnpacker
()
...
...
src/libloom/data.h
View file @
37ba5dcb
...
...
@@ -26,7 +26,7 @@ public:
virtual
std
::
string
get_info
()
=
0
;
void
serialize
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
//virtual void init_message(Worker &worker, loomcomm::Data &msg)
;
virtual
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
virtual
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
)
=
0
;
virtual
char
*
get_raw_data
(
Worker
&
worker
);
...
...
@@ -42,12 +42,12 @@ public:
virtual
void
on_data_chunk
(
const
char
*
data
,
size_t
size
);
virtual
bool
on_data_finish
(
Connection
&
connection
);
std
::
unique_ptr
<
Data
>
release
_data
()
{
return
std
::
move
(
data
)
;
std
::
shared_ptr
<
Data
>&
get
_data
()
{
return
data
;
}
protected:
std
::
unique
_ptr
<
Data
>
data
;
std
::
shared
_ptr
<
Data
>
data
;
};
}
...
...
src/libloom/data/array.cpp
0 → 100644
View file @
37ba5dcb
#include "array.h"
#include "../compat.h"
#include "../worker.h"
#include "../log.h"
loom
::
Array
::
Array
(
size_t
length
,
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
)
:
length
(
length
),
items
(
std
::
move
(
items
))
{
}
loom
::
Array
::~
Array
()
{
llog
->
debug
(
"Disposing array"
);
}
size_t
loom
::
Array
::
get_size
()
{
size_t
size
=
0
;
for
(
size_t
i
=
0
;
i
<
length
;
i
++
)
{
size
+=
items
[
i
]
->
get_size
();
}
return
size
;
}
std
::
string
loom
::
Array
::
get_info
()
{
return
"Array"
;
}
std
::
shared_ptr
<
loom
::
Data
>&
loom
::
Array
::
get_at_index
(
size_t
index
)
{
assert
(
index
<
length
);
return
items
[
index
];
}
void
loom
::
Array
::
serialize_data
(
loom
::
Worker
&
worker
,
loom
::
SendBuffer
&
buffer
,
std
::
shared_ptr
<
loom
::
Data
>
&
data_ptr
)
{
for
(
size_t
i
=
0
;
i
<
length
;
i
++
)
{
items
[
i
]
->
serialize
(
worker
,
buffer
,
items
[
i
]);
}
}
void
loom
::
Array
::
init_message
(
loom
::
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
msg
.
set_arg0_u64
(
length
);
}
loom
::
ArrayUnpacker
::~
ArrayUnpacker
()
{
}
bool
loom
::
ArrayUnpacker
::
init
(
loom
::
Worker
&
worker
,
loom
::
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
)
{
assert
(
msg
.
has_arg0_u64
());
index
=
0
;
length
=
msg
.
arg0_u64
();
items
=
std
::
make_unique
<
std
::
shared_ptr
<
Data
>
[]
>
(
length
);
if
(
length
==
0
)
{
finish
();
return
true
;
}
else
{
this
->
worker
=
&
worker
;
return
false
;
}
}
bool
loom
::
ArrayUnpacker
::
on_message
(
loom
::
Connection
&
connection
,
const
char
*
data
,
size_t
size
)
{
if
(
unpacker
)
{
bool
r
=
unpacker
->
on_message
(
connection
,
data
,
size
);
if
(
r
)
{
return
finish_data
();
}
return
false
;
}
loomcomm
::
Data
msg
;
msg
.
ParseFromArray
(
data
,
size
);
unpacker
=
worker
->
unpack
(
msg
.
type_id
());
if
(
unpacker
->
init
(
*
worker
,
connection
,
msg
))
{
return
finish_data
();
}
else
{
return
false
;
}
}
void
loom
::
ArrayUnpacker
::
on_data_chunk
(
const
char
*
data
,
size_t
size
)
{
unpacker
->
on_data_chunk
(
data
,
size
);;
}
bool
loom
::
ArrayUnpacker
::
on_data_finish
(
loom
::
Connection
&
connection
)
{
bool
r
=
unpacker
->
on_data_finish
(
connection
);
if
(
r
)
{
return
finish_data
();
}
return
false
;
}
void
loom
::
ArrayUnpacker
::
finish
()
{
data
=
std
::
make_shared
<
Array
>
(
length
,
std
::
move
(
items
));
}
bool
loom
::
ArrayUnpacker
::
finish_data
()
{
items
[
index
]
=
unpacker
->
get_data
();
unpacker
.
reset
();
index
++
;
if
(
index
==
length
)
{
finish
();
return
true
;
}
else
{
return
false
;
}
}
src/libloom/data/array.h
0 → 100644
View file @
37ba5dcb
#ifndef LIBLOOM_DATA_ARRAY_H
#define LIBLOOM_DATA_ARRAY_H
#include "../data.h"
namespace
loom
{
class
Array
:
public
Data
{
public:
static
const
int
TYPE_ID
=
400
;
Array
(
size_t
length
,
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
);
~
Array
();
int
get_type_id
()
{
return
TYPE_ID
;
}
size_t
get_length
()
const
{
return
length
;
}
size_t
get_size
();
std
::
string
get_info
();
std
::
shared_ptr
<
Data
>&
get_at_index
(
size_t
index
);
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
private:
size_t
length
;
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
;
};
class
ArrayUnpacker
:
public
DataUnpacker
{
public:
~
ArrayUnpacker
();
bool
init
(
Worker
&
worker
,
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
);
bool
on_message
(
Connection
&
connection
,
const
char
*
data
,
size_t
size
);
void
on_data_chunk
(
const
char
*
data
,
size_t
size
);
bool
on_data_finish
(
Connection
&
connection
);
protected:
void
finish
();
bool
finish_data
();
std
::
unique_ptr
<
DataUnpacker
>
unpacker
;
Worker
*
worker
;
size_t
index
;
size_t
length
;
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
;
};
}
#endif // LIBLOOM_DATA_ARRAY_H
src/libloom/data/rawdata.cpp
View file @
37ba5dcb
...
...
@@ -65,16 +65,15 @@ char* RawData::init_empty_file(Worker &worker, size_t size)
}
if
(
size
>
0
)
{
if
(
!
lseek
(
fd
,
size
-
1
,
SEEK_SET
)
)
{
if
(
lseek
(
fd
,
size
-
1
,
SEEK_SET
)
==
-
1
)
{
log_errno_abort
(
"lseek"
);
}
if
(
write
(
fd
,
""
,
1
)
!=
1
)
{
log_errno_abort
(
"write"
);
}
map
(
fd
,
true
);
}
map
(
fd
,
true
);
::
close
(
fd
);
return
data
;
}
...
...
@@ -103,8 +102,10 @@ std::string RawData::get_filename() const
void
RawData
::
open
(
Worker
&
worker
)
{
if
(
size
==
0
)
{
return
;
}
assert
(
!
filename
.
empty
());
int
fd
=
::
open
(
filename
.
c_str
(),
O_RDONLY
,
S_IRUSR
|
S_IWUSR
);
if
(
fd
<
0
)
{
llog
->
critical
(
"Cannot open data {}"
,
filename
);
...
...
@@ -134,13 +135,13 @@ void RawData::map(int fd, bool write)
std
::
string
RawData
::
get_info
()
{
return
"RawData"
;
return
"RawData"
;
}
/*void RawData::init_message(Worker &worker, loomcomm::Data &msg)
void
RawData
::
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
}
*/
msg
.
set_arg0_u64
(
size
);
}
void
RawData
::
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
)
{
...
...
@@ -154,11 +155,11 @@ RawDataUnpacker::~RawDataUnpacker()
bool
RawDataUnpacker
::
init
(
Worker
&
worker
,
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
)
{
auto
data
=
std
::
make_unique
<
RawData
>
();
assert
(
msg
.
has_size
()
);
a
uto
size
=
msg
.
size
(
);
pointer
=
data
->
init_empty_file
(
worker
,
size
);
this
->
data
=
std
::
move
(
data
);
this
->
data
=
std
::
make_shared
<
RawData
>
();
RawData
&
data
=
static_cast
<
RawData
&>
(
*
this
->
data
);
a
ssert
(
msg
.
has_arg0_u64
()
);
auto
size
=
msg
.
arg0_u64
(
);
pointer
=
data
.
init_empty_file
(
worker
,
size
);
if
(
size
==
0
)
{
return
true
;
}
...
...
src/libloom/data/rawdata.h
View file @
37ba5dcb
...
...
@@ -29,6 +29,7 @@ public:
}
std
::
string
get_info
();
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
//char* init_memonly(size_t size);
...
...
src/libloom/interconnect.cpp
View file @
37ba5dcb
...
...
@@ -44,7 +44,7 @@ void InterConnection::finish_data()
{
llog
->
debug
(
"Data {} sucessfully received"
,
data_id
);
worker
.
publish_data
(
data_id
,
data_unpacker
->
release
_data
());
data_unpacker
->
get
_data
());
data_unpacker
.
reset
();
data_id
=
-
1
;
}
...
...
src/libloom/loomcomm.pb.cc
View file @
37ba5dcb
...
...
@@ -1744,7 +1744,8 @@ void DataPrologue::Swap(DataPrologue* other) {
#ifndef _MSC_VER
const
int
Data
::
kTypeIdFieldNumber
;
const
int
Data
::
kSizeFieldNumber
;
const
int
Data
::
kArg0U64FieldNumber
;
const
int
Data
::
kArg1U64FieldNumber
;
#endif // !_MSC_VER
Data
::
Data
()
...
...
@@ -1766,7 +1767,8 @@ Data::Data(const Data& from)
void
Data
::
SharedCtor
()
{
_cached_size_
=
0
;
type_id_
=
0
;
size_
=
GOOGLE_ULONGLONG
(
0
);
arg0_u64_
=
GOOGLE_ULONGLONG
(
0
);
arg1_u64_
=
GOOGLE_ULONGLONG
(
0
);