Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
HyperLoom
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
Operations
Operations
Incidents
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
ADAS
HyperLoom
Commits
d89ecb91
Commit
d89ecb91
authored
Jul 06, 2016
by
Stanislav Bohm
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ENH: Datatype "Array"
parent
95315818
Changes
27
Hide whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
500 additions
and
101 deletions
+500
-101
src/client/client.py
src/client/client.py
+6
-2
src/client/loomcomm_pb2.py
src/client/loomcomm_pb2.py
+16
-9
src/client/plan.py
src/client/plan.py
+18
-1
src/libloom/CMakeLists.txt
src/libloom/CMakeLists.txt
+2
-0
src/libloom/connection.cpp
src/libloom/connection.cpp
+0
-2
src/libloom/data.cpp
src/libloom/data.cpp
+5
-5
src/libloom/data.h
src/libloom/data.h
+5
-4
src/libloom/data/array.cpp
src/libloom/data/array.cpp
+118
-0
src/libloom/data/array.h
src/libloom/data/array.h
+61
-0
src/libloom/data/rawdata.cpp
src/libloom/data/rawdata.cpp
+15
-13
src/libloom/data/rawdata.h
src/libloom/data/rawdata.h
+1
-0
src/libloom/interconnect.cpp
src/libloom/interconnect.cpp
+1
-1
src/libloom/loomcomm.pb.cc
src/libloom/loomcomm.pb.cc
+50
-17
src/libloom/loomcomm.pb.h
src/libloom/loomcomm.pb.h
+57
-23
src/libloom/taskinstance.cpp
src/libloom/taskinstance.cpp
+3
-3
src/libloom/taskinstance.h
src/libloom/taskinstance.h
+1
-2
src/libloom/worker.cpp
src/libloom/worker.cpp
+1
-1
src/libloom/worker.h
src/libloom/worker.h
+2
-1
src/proto/loomcomm.proto
src/proto/loomcomm.proto
+2
-1
src/worker/CMakeLists.txt
src/worker/CMakeLists.txt
+2
-0
src/worker/arraytasks.cpp
src/worker/arraytasks.cpp
+27
-0
src/worker/arraytasks.h
src/worker/arraytasks.h
+22
-0
src/worker/basictasks.cpp
src/worker/basictasks.cpp
+14
-10
src/worker/main.cpp
src/worker/main.cpp
+10
-1
src/worker/runtask.cpp
src/worker/runtask.cpp
+6
-5
tests/client/array_test.py
tests/client/array_test.py
+55
-0
tests/client/data_test.py
tests/client/data_test.py
+0
-0
No files found.
src/client/client.py
View file @
d89ecb91
...
...
@@ -66,8 +66,12 @@ class Client(object):
def
_receive_data
(
self
):
msg_data
=
Data
()
msg_data
.
ParseFromString
(
self
.
connection
.
receive_message
())
assert
msg_data
.
type_id
==
300
return
self
.
connection
.
read_data
(
msg_data
.
size
)
type_id
=
msg_data
.
type_id
if
type_id
==
300
:
# Data
return
self
.
connection
.
read_data
(
msg_data
.
arg0_u64
)
if
type_id
==
400
:
# Array
return
[
self
.
_receive_data
()
for
i
in
xrange
(
msg_data
.
arg0_u64
)]
assert
0
def
_send_message
(
self
,
message
):
data
=
message
.
SerializeToString
()
...
...
src/client/loomcomm_pb2.py
View file @
d89ecb91
...
...
@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR
=
_descriptor
.
FileDescriptor
(
name
=
'loomcomm.proto'
,
package
=
'loomcomm'
,
serialized_pb
=
_b
(
'
\n\x0e
loomcomm.proto
\x12\x08
loomcomm
\"\xbb\x01\n\x08
Register
\x12\x18\n\x10
protocol_version
\x18\x01
\x02
(
\x05\x12
%
\n\x04
type
\x18\x02
\x02
(
\x0e\x32\x17
.loomcomm.Register.Type
\x12\x0c\n\x04
port
\x18\x03
\x01
(
\x05\x12\x12\n\n
task_types
\x18\x04
\x03
(
\t\x12\x0c\n\x04\x63
pus
\x18\x05
\x01
(
\x05\x12\x0c\n\x04
info
\x18\n
\x01
(
\x08\"
0
\n\x04
Type
\x12\x13\n\x0f
REGISTER_WORKER
\x10\x01\x12\x13\n\x0f
REGISTER_CLIENT
\x10\x02\"
&
\n\r
ServerMessage
\"\x15\n\x04
Type
\x12\r\n\t
START_JOB
\x10\x01\"\xc4\x01\n\r
WorkerCommand
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.WorkerCommand.Type
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\x12\x11\n\t
task_type
\x18\x03
\x01
(
\x05\x12\x13\n\x0b
task_config
\x18\x04
\x01
(
\t\x12\x13\n\x0b
task_inputs
\x18\x05
\x03
(
\x05\x12\x0f\n\x07\x61\x64\x64
ress
\x18\n
\x01
(
\t\x12\x11\n\t
with_size
\x18\x0b
\x01
(
\x08\"\x1a\n\x04
Type
\x12\x08\n\x04
TASK
\x10\x01\x12\x08\n\x04
SEND
\x10\x02\"\x1c\n\x0e
WorkerResponse
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\"\x18\n\x08\x41
nnounce
\x12\x0c\n\x04
port
\x18\x01
\x02
(
\x05\"
-
\n\x0c\x44\x61
taPrologue
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x11\n\t
data_size
\x18\x03
\x01
(
\x04\"
%
\n\x04\x44\x61
ta
\x12\x0f\n\x07
type_id
\x18\x01
\x02
(
\x05\x12\x0c\n\x04
size
\x18\x02
\x01
(
\x04\"\"\n\x04
Info
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x0e\n\x06
worker
\x18\x02
\x02
(
\t\"\x9b\x01\n\r
ClientMessage
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.ClientMessage.Type
\x12
$
\n\x04\x64\x61
ta
\x18\x02
\x01
(
\x0b\x32\x16
.loomcomm.DataPrologue
\x12\x1c\n\x04
info
\x18\x03
\x01
(
\x0b\x32\x0e
.loomcomm.Info
\"\x1a\n\x04
Type
\x12\x08\n\x04\x44\x41
TA
\x10\x01\x12\x08\n\x04
INFO
\x10\x02\x42\x02
H
\x03
'
)
serialized_pb
=
_b
(
'
\n\x0e
loomcomm.proto
\x12\x08
loomcomm
\"\xbb\x01\n\x08
Register
\x12\x18\n\x10
protocol_version
\x18\x01
\x02
(
\x05\x12
%
\n\x04
type
\x18\x02
\x02
(
\x0e\x32\x17
.loomcomm.Register.Type
\x12\x0c\n\x04
port
\x18\x03
\x01
(
\x05\x12\x12\n\n
task_types
\x18\x04
\x03
(
\t\x12\x0c\n\x04\x63
pus
\x18\x05
\x01
(
\x05\x12\x0c\n\x04
info
\x18\n
\x01
(
\x08\"
0
\n\x04
Type
\x12\x13\n\x0f
REGISTER_WORKER
\x10\x01\x12\x13\n\x0f
REGISTER_CLIENT
\x10\x02\"
&
\n\r
ServerMessage
\"\x15\n\x04
Type
\x12\r\n\t
START_JOB
\x10\x01\"\xc4\x01\n\r
WorkerCommand
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.WorkerCommand.Type
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\x12\x11\n\t
task_type
\x18\x03
\x01
(
\x05\x12\x13\n\x0b
task_config
\x18\x04
\x01
(
\t\x12\x13\n\x0b
task_inputs
\x18\x05
\x03
(
\x05\x12\x0f\n\x07\x61\x64\x64
ress
\x18\n
\x01
(
\t\x12\x11\n\t
with_size
\x18\x0b
\x01
(
\x08\"\x1a\n\x04
Type
\x12\x08\n\x04
TASK
\x10\x01\x12\x08\n\x04
SEND
\x10\x02\"\x1c\n\x0e
WorkerResponse
\x12\n\n\x02
id
\x18\x02
\x01
(
\x05\"\x18\n\x08\x41
nnounce
\x12\x0c\n\x04
port
\x18\x01
\x02
(
\x05\"
-
\n\x0c\x44\x61
taPrologue
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x11\n\t
data_size
\x18\x03
\x01
(
\x04\"
;
\n\x04\x44\x61
ta
\x12\x0f\n\x07
type_id
\x18\x01
\x02
(
\x05\x12\x10\n\x08\x61
rg0_u64
\x18\x02
\x01
(
\x04\x12\x10\n\x08\x61
rg1_u64
\x18\x03
\x01
(
\x04\"\"\n\x04
Info
\x12\n\n\x02
id
\x18\x01
\x02
(
\x05\x12\x0e\n\x06
worker
\x18\x02
\x02
(
\t\"\x9b\x01\n\r
ClientMessage
\x12
*
\n\x04
type
\x18\x01
\x02
(
\x0e\x32\x1c
.loomcomm.ClientMessage.Type
\x12
$
\n\x04\x64\x61
ta
\x18\x02
\x01
(
\x0b\x32\x16
.loomcomm.DataPrologue
\x12\x1c\n\x04
info
\x18\x03
\x01
(
\x0b\x32\x0e
.loomcomm.Info
\"\x1a\n\x04
Type
\x12\x08\n\x04\x44\x41
TA
\x10\x01\x12\x08\n\x04
INFO
\x10\x02\x42\x02
H
\x03
'
)
)
_sym_db
.
RegisterFileDescriptor
(
DESCRIPTOR
)
...
...
@@ -103,8 +103,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type
=
None
,
options
=
None
,
serialized_start
=
7
65
,
serialized_end
=
791
,
serialized_start
=
7
87
,
serialized_end
=
813
,
)
_sym_db
.
RegisterEnumDescriptor
(
_CLIENTMESSAGE_TYPE
)
...
...
@@ -384,12 +384,19 @@ _DATA = _descriptor.Descriptor(
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'
size'
,
full_name
=
'loomcomm.Data.size
'
,
index
=
1
,
name
=
'
arg0_u64'
,
full_name
=
'loomcomm.Data.arg0_u64
'
,
index
=
1
,
number
=
2
,
type
=
4
,
cpp_type
=
4
,
label
=
1
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
_descriptor
.
FieldDescriptor
(
name
=
'arg1_u64'
,
full_name
=
'loomcomm.Data.arg1_u64'
,
index
=
2
,
number
=
3
,
type
=
4
,
cpp_type
=
4
,
label
=
1
,
has_default_value
=
False
,
default_value
=
0
,
message_type
=
None
,
enum_type
=
None
,
containing_type
=
None
,
is_extension
=
False
,
extension_scope
=
None
,
options
=
None
),
],
extensions
=
[
],
...
...
@@ -402,7 +409,7 @@ _DATA = _descriptor.Descriptor(
oneofs
=
[
],
serialized_start
=
560
,
serialized_end
=
597
,
serialized_end
=
619
,
)
...
...
@@ -438,8 +445,8 @@ _INFO = _descriptor.Descriptor(
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
599
,
serialized_end
=
6
33
,
serialized_start
=
621
,
serialized_end
=
6
55
,
)
...
...
@@ -483,8 +490,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges
=
[],
oneofs
=
[
],
serialized_start
=
6
36
,
serialized_end
=
791
,
serialized_start
=
6
58
,
serialized_end
=
813
,
)
_REGISTER
.
fields_by_name
[
'type'
].
enum_type
=
_REGISTER_TYPE
...
...
src/client/plan.py
View file @
d89ecb91
...
...
@@ -53,7 +53,7 @@ class OpenTask(Task):
class
SplitLinesTask
(
Task
):
task_type
=
"split_lines"
struct
=
u32
=
struct
.
Struct
(
"<QQ"
)
struct
=
struct
.
Struct
(
"<QQ"
)
def
__init__
(
self
,
input
,
start
,
end
):
self
.
config
=
self
.
struct
.
pack
(
start
,
end
)
...
...
@@ -137,6 +137,10 @@ class RunTask(Task):
class
Plan
(
object
):
TASK_ARRAY_MAKE
=
"array/make"
TASK_ARRAY_GET
=
"array/get"
u64
=
struct
.
Struct
(
"<Q"
)
def
__init__
(
self
):
self
.
tasks
=
[]
self
.
task_types
=
set
()
...
...
@@ -164,6 +168,19 @@ class Plan(object):
return
self
.
add
(
RunTask
(
args
,
stdin
=
stdin
,
stdout
=
stdout
,
variable
=
variable
))
def
task_array_make
(
self
,
inputs
):
task
=
Task
()
task
.
task_type
=
self
.
TASK_ARRAY_MAKE
task
.
inputs
=
inputs
return
self
.
add
(
task
)
def
task_array_get
(
self
,
input
,
index
):
task
=
Task
()
task
.
task_type
=
self
.
TASK_ARRAY_GET
task
.
inputs
=
(
input
,)
task
.
config
=
self
.
u64
.
pack
(
index
)
return
self
.
add
(
task
)
def
create_message
(
self
):
task_types
=
list
(
self
.
task_types
)
task_types
.
sort
()
...
...
src/libloom/CMakeLists.txt
View file @
d89ecb91
...
...
@@ -4,6 +4,8 @@ add_library(libloom
data/externfile.h
data/rawdata.h
data/rawdata.cpp
data/array.h
data/array.cpp
connection.cpp
connection.h
worker.cpp
...
...
src/libloom/connection.cpp
View file @
d89ecb91
...
...
@@ -22,8 +22,6 @@ Connection::~Connection()
assert
(
state
==
ConnectionClosed
);
}
std
::
string
Connection
::
get_peername
()
{
sockaddr_in
addr
;
...
...
src/libloom/data.cpp
View file @
d89ecb91
...
...
@@ -12,16 +12,16 @@ void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &
{
loomcomm
::
Data
msg
;
msg
.
set_type_id
(
get_type_id
());
msg
.
set_size
(
get_size
());
//
init_message(worker, msg);
//
msg.set_size(get_size());
init_message
(
worker
,
msg
);
buffer
.
add
(
msg
);
serialize_data
(
worker
,
buffer
,
data_ptr
);
}
/*void init_message(Worker &worker, loomcomm::Data &msg)
void
Data
::
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
}
*/
}
char
*
Data
::
get_raw_data
(
Worker
&
worker
)
{
...
...
@@ -30,7 +30,7 @@ char *Data::get_raw_data(Worker &worker)
std
::
string
Data
::
get_filename
(
Worker
&
worker
)
const
{
return
""
;
return
""
;
}
DataUnpacker
::~
DataUnpacker
()
...
...
src/libloom/data.h
View file @
d89ecb91
...
...
@@ -26,11 +26,12 @@ public:
virtual
std
::
string
get_info
()
=
0
;
void
serialize
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
//virtual void init_message(Worker &worker, loomcomm::Data &msg)
;
virtual
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
virtual
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
)
=
0
;
virtual
char
*
get_raw_data
(
Worker
&
worker
);
virtual
std
::
string
get_filename
(
Worker
&
worker
)
const
;
};
class
DataUnpacker
...
...
@@ -42,12 +43,12 @@ public:
virtual
void
on_data_chunk
(
const
char
*
data
,
size_t
size
);
virtual
bool
on_data_finish
(
Connection
&
connection
);
std
::
unique_ptr
<
Data
>
release
_data
()
{
return
std
::
move
(
data
)
;
std
::
shared_ptr
<
Data
>&
get
_data
()
{
return
data
;
}
protected:
std
::
unique
_ptr
<
Data
>
data
;
std
::
shared
_ptr
<
Data
>
data
;
};
}
...
...
src/libloom/data/array.cpp
0 → 100644
View file @
d89ecb91
#include "array.h"
#include "../compat.h"
#include "../worker.h"
loom
::
Array
::
Array
(
size_t
length
,
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
)
:
length
(
length
),
items
(
std
::
move
(
items
))
{
}
loom
::
Array
::~
Array
()
{
}
size_t
loom
::
Array
::
get_size
()
{
size_t
size
=
0
;
for
(
size_t
i
=
0
;
i
<
length
;
i
++
)
{
size
+=
items
[
i
]
->
get_size
();
}
return
size
;
}
std
::
string
loom
::
Array
::
get_info
()
{
return
"Array"
;
}
std
::
shared_ptr
<
loom
::
Data
>&
loom
::
Array
::
get_at_index
(
size_t
index
)
{
assert
(
index
<
length
);
return
items
[
index
];
}
void
loom
::
Array
::
serialize_data
(
loom
::
Worker
&
worker
,
loom
::
SendBuffer
&
buffer
,
std
::
shared_ptr
<
loom
::
Data
>
&
data_ptr
)
{
for
(
size_t
i
=
0
;
i
<
length
;
i
++
)
{
items
[
i
]
->
serialize
(
worker
,
buffer
,
items
[
i
]);
}
}
void
loom
::
Array
::
init_message
(
loom
::
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
msg
.
set_arg0_u64
(
length
);
}
loom
::
ArrayUnpacker
::~
ArrayUnpacker
()
{
}
bool
loom
::
ArrayUnpacker
::
init
(
loom
::
Worker
&
worker
,
loom
::
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
)
{
assert
(
msg
.
has_arg0_u64
());
index
=
0
;
length
=
msg
.
arg0_u64
();
items
=
std
::
make_unique
<
std
::
shared_ptr
<
Data
>
[]
>
(
length
);
if
(
length
==
0
)
{
finish
();
return
true
;
}
else
{
this
->
worker
=
&
worker
;
return
false
;
}
}
bool
loom
::
ArrayUnpacker
::
on_message
(
loom
::
Connection
&
connection
,
const
char
*
data
,
size_t
size
)
{
if
(
unpacker
)
{
bool
r
=
unpacker
->
on_message
(
connection
,
data
,
size
);
if
(
r
)
{
return
finish_data
();
}
return
false
;
}
loomcomm
::
Data
msg
;
msg
.
ParseFromArray
(
data
,
size
);
unpacker
=
worker
->
unpack
(
msg
.
type_id
());
if
(
unpacker
->
init
(
*
worker
,
connection
,
msg
))
{
return
finish_data
();
}
else
{
return
false
;
}
}
void
loom
::
ArrayUnpacker
::
on_data_chunk
(
const
char
*
data
,
size_t
size
)
{
unpacker
->
on_data_chunk
(
data
,
size
);;
}
bool
loom
::
ArrayUnpacker
::
on_data_finish
(
loom
::
Connection
&
connection
)
{
bool
r
=
unpacker
->
on_data_finish
(
connection
);
if
(
r
)
{
return
finish_data
();
}
return
false
;
}
void
loom
::
ArrayUnpacker
::
finish
()
{
data
=
std
::
make_shared
<
Array
>
(
length
,
std
::
move
(
items
));
}
bool
loom
::
ArrayUnpacker
::
finish_data
()
{
items
[
index
]
=
unpacker
->
get_data
();
unpacker
.
reset
();
index
++
;
if
(
index
==
length
)
{
finish
();
return
true
;
}
else
{
return
false
;
}
}
src/libloom/data/array.h
0 → 100644
View file @
d89ecb91
#ifndef LIBLOOM_DATA_ARRAY_H
#define LIBLOOM_DATA_ARRAY_H
#include "../data.h"
namespace
loom
{
class
Array
:
public
Data
{
public:
static
const
int
TYPE_ID
=
400
;
Array
(
size_t
length
,
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
);
~
Array
();
int
get_type_id
()
{
return
TYPE_ID
;
}
size_t
get_length
()
const
{
return
length
;
}
size_t
get_size
();
std
::
string
get_info
();
std
::
shared_ptr
<
Data
>&
get_at_index
(
size_t
index
);
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
private:
size_t
length
;
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
;
};
class
ArrayUnpacker
:
public
DataUnpacker
{
public:
~
ArrayUnpacker
();
bool
init
(
Worker
&
worker
,
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
);
bool
on_message
(
Connection
&
connection
,
const
char
*
data
,
size_t
size
);
void
on_data_chunk
(
const
char
*
data
,
size_t
size
);
bool
on_data_finish
(
Connection
&
connection
);
protected:
void
finish
();
bool
finish_data
();
std
::
unique_ptr
<
DataUnpacker
>
unpacker
;
Worker
*
worker
;
size_t
index
;
size_t
length
;
std
::
unique_ptr
<
std
::
shared_ptr
<
Data
>
[]
>
items
;
};
}
#endif // LIBLOOM_DATA_ARRAY_H
src/libloom/data/rawdata.cpp
View file @
d89ecb91
...
...
@@ -61,16 +61,15 @@ char* RawData::init_empty_file(Worker &worker, size_t size)
}
if
(
size
>
0
)
{
if
(
!
lseek
(
fd
,
size
-
1
,
SEEK_SET
)
)
{
if
(
lseek
(
fd
,
size
-
1
,
SEEK_SET
)
==
-
1
)
{
log_errno_abort
(
"lseek"
);
}
if
(
write
(
fd
,
""
,
1
)
!=
1
)
{
log_errno_abort
(
"write"
);
}
map
(
fd
,
true
);
}
map
(
fd
,
true
);
::
close
(
fd
);
return
data
;
}
...
...
@@ -99,8 +98,11 @@ std::string RawData::get_filename(Worker &worker) const
void
RawData
::
open
(
Worker
&
worker
)
{
assert
(
file_id
);
if
(
size
==
0
)
{
return
;
}
assert
(
file_id
);
int
fd
=
::
open
(
get_filename
(
worker
).
c_str
(),
O_RDONLY
,
S_IRUSR
|
S_IWUSR
);
if
(
fd
<
0
)
{
llog
->
critical
(
"Cannot open data {}"
,
get_filename
(
worker
));
...
...
@@ -130,13 +132,13 @@ void RawData::map(int fd, bool write)
std
::
string
RawData
::
get_info
()
{
return
"RawData"
;
return
"RawData"
;
}
/*void RawData::init_message(Worker &worker, loomcomm::Data &msg)
void
RawData
::
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
{
}
*/
msg
.
set_arg0_u64
(
size
);
}
void
RawData
::
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
)
{
...
...
@@ -150,11 +152,11 @@ RawDataUnpacker::~RawDataUnpacker()
bool
RawDataUnpacker
::
init
(
Worker
&
worker
,
Connection
&
connection
,
const
loomcomm
::
Data
&
msg
)
{
auto
data
=
std
::
make_unique
<
RawData
>
();
assert
(
msg
.
has_size
()
);
a
uto
size
=
msg
.
size
(
);
pointer
=
data
->
init_empty_file
(
worker
,
size
);
this
->
data
=
std
::
move
(
data
);
this
->
data
=
std
::
make_shared
<
RawData
>
();
RawData
&
data
=
static_cast
<
RawData
&>
(
*
this
->
data
);
a
ssert
(
msg
.
has_arg0_u64
()
);
auto
size
=
msg
.
arg0_u64
(
);
pointer
=
data
.
init_empty_file
(
worker
,
size
);
if
(
size
==
0
)
{
return
true
;
}
...
...
src/libloom/data/rawdata.h
View file @
d89ecb91
...
...
@@ -29,6 +29,7 @@ public:
}
std
::
string
get_info
();
void
init_message
(
Worker
&
worker
,
loomcomm
::
Data
&
msg
)
const
;
void
serialize_data
(
Worker
&
worker
,
SendBuffer
&
buffer
,
std
::
shared_ptr
<
Data
>
&
data_ptr
);
char
*
init_memonly
(
size_t
size
);
...
...
src/libloom/interconnect.cpp
View file @
d89ecb91
...
...
@@ -44,7 +44,7 @@ void InterConnection::finish_data()
{
llog
->
debug
(
"Data {} sucessfully received"
,
data_id
);
worker
.
publish_data
(
data_id
,
data_unpacker
->
release
_data
());
data_unpacker
->
get
_data
());
data_unpacker
.
reset
();
data_id
=
-
1
;
}
...
...
src/libloom/loomcomm.pb.cc
View file @
d89ecb91
...
...
@@ -1709,7 +1709,8 @@ void DataPrologue::Swap(DataPrologue* other) {
#ifndef _MSC_VER
const
int
Data
::
kTypeIdFieldNumber
;
const
int
Data
::
kSizeFieldNumber
;
const
int
Data
::
kArg0U64FieldNumber
;
const
int
Data
::
kArg1U64FieldNumber
;
#endif // !_MSC_VER
Data
::
Data
()
...
...
@@ -1731,7 +1732,8 @@ Data::Data(const Data& from)
void
Data
::
SharedCtor
()
{
_cached_size_
=
0
;
type_id_
=
0
;
size_
=
GOOGLE_ULONGLONG
(
0
);
arg0_u64_
=
GOOGLE_ULONGLONG
(
0
);
arg1_u64_
=
GOOGLE_ULONGLONG
(
0
);
::
memset
(
_has_bits_
,
0
,
sizeof
(
_has_bits_
));
}
...
...
@@ -1780,7 +1782,7 @@ void Data::Clear() {
::memset(&first, 0, n); \
} while (0)
ZR_
(
size
_
,
type_id_
);
ZR_
(
arg0_u64
_
,
type_id_
);
#undef OFFSET_OF_FIELD_
#undef ZR_
...
...
@@ -1813,18 +1815,33 @@ bool Data::MergePartialFromCodedStream(
}
else
{
goto
handle_unusual
;
}
if
(
input
->
ExpectTag
(
16
))
goto
parse_
size
;
if
(
input
->
ExpectTag
(
16
))
goto
parse_
arg0_u64
;
break
;
}
// optional uint64
size
= 2;
// optional uint64
arg0_u64
= 2;
case
2
:
{
if
(
tag
==
16
)
{
parse_
size
:
parse_
arg0_u64
:
DO_
((
::
google
::
protobuf
::
internal
::
WireFormatLite
::
ReadPrimitive
<
::
google
::
protobuf
::
uint64
,
::
google
::
protobuf
::
internal
::
WireFormatLite
::
TYPE_UINT64
>
(
input
,
&
size_
)));
set_has_size
();
input
,
&
arg0_u64_
)));
set_has_arg0_u64
();
}
else
{
goto
handle_unusual
;
}
if
(
input
->
ExpectTag
(
24
))
goto
parse_arg1_u64
;
break
;
}
// optional uint64 arg1_u64 = 3;
case
3
:
{
if
(
tag
==
24
)
{
parse_arg1_u64:
DO_
((
::
google
::
protobuf
::
internal
::
WireFormatLite
::
ReadPrimitive
<
::
google
::
protobuf
::
uint64
,
::
google
::
protobuf
::
internal
::
WireFormatLite
::
TYPE_UINT64
>
(
input
,
&
arg1_u64_
)));
set_has_arg1_u64
();
}
else
{
goto
handle_unusual
;
}
...
...
@@ -1862,9 +1879,14 @@ void Data::SerializeWithCachedSizes(
::
google
::
protobuf
::
internal
::
WireFormatLite
::
WriteInt32
(
1
,
this
->
type_id
(),
output
);
}
// optional uint64 size = 2;
if
(
has_size
())
{
::
google
::
protobuf
::
internal
::
WireFormatLite
::
WriteUInt64
(
2
,
this
->
size
(),
output
);
// optional uint64 arg0_u64 = 2;
if
(
has_arg0_u64
())
{
::
google
::
protobuf
::
internal
::
WireFormatLite
::
WriteUInt64
(
2
,
this
->
arg0_u64
(),
output
);
}
// optional uint64 arg1_u64 = 3;
if
(
has_arg1_u64
())
{
::
google
::
protobuf
::
internal
::
WireFormatLite
::
WriteUInt64
(
3
,
this
->
arg1_u64
(),
output
);
}
output
->
WriteRaw
(
unknown_fields
().
data
(),
...
...
@@ -1883,11 +1905,18 @@ int Data::ByteSize() const {
this
->
type_id
());
}
// optional uint64 size = 2;
if
(
has_size
())
{
// optional uint64 arg0_u64 = 2;
if
(
has_arg0_u64
())
{
total_size
+=
1
+
::
google
::
protobuf
::
internal
::
WireFormatLite
::
UInt64Size
(
this
->
arg0_u64
());
}
// optional uint64 arg1_u64 = 3;
if
(
has_arg1_u64
())
{
total_size
+=
1
+
::
google
::
protobuf
::
internal
::
WireFormatLite
::
UInt64Size
(
this
->
size
());
this
->
arg1_u64
());
}
}
...
...
@@ -1910,8 +1939,11 @@ void Data::MergeFrom(const Data& from) {
if
(
from
.
has_type_id
())
{
set_type_id
(
from
.
type_id
());
}
if
(
from
.
has_size
())
{
set_size
(
from
.
size
());
if
(
from
.
has_arg0_u64
())
{
set_arg0_u64
(
from
.
arg0_u64
());
}
if
(
from
.
has_arg1_u64
())
{
set_arg1_u64
(
from
.
arg1_u64
());
}
}
mutable_unknown_fields
()
->
append
(
from
.
unknown_fields
());
...
...
@@ -1932,7 +1964,8 @@ bool Data::IsInitialized() const {
void
Data
::
Swap
(
Data
*
other
)
{
if
(
other
!=
this
)
{
std
::
swap
(
type_id_
,
other
->
type_id_
);
std
::
swap
(
size_
,
other
->
size_
);
std
::
swap
(
arg0_u64_
,
other
->
arg0_u64_
);
std
::
swap
(
arg1_u64_
,
other
->
arg1_u64_
);
std
::
swap
(
_has_bits_
[
0
],
other
->
_has_bits_
[
0
]);
_unknown_fields_
.
swap
(
other
->
_unknown_fields_
);
std
::
swap
(
_cached_size_
,
other
->
_cached_size_
);
...
...
src/libloom/loomcomm.pb.h
View file @
d89ecb91
...
...
@@ -861,25 +861,35 @@ class Data : public ::google::protobuf::MessageLite {
inline
::
google
::
protobuf
::
int32
type_id
()
const
;
inline
void
set_type_id
(
::
google
::
protobuf
::
int32
value
);
// optional uint64 size = 2;
inline
bool
has_size
()
const
;
inline
void
clear_size
();
static
const
int
kSizeFieldNumber
=
2
;
inline
::
google
::
protobuf
::
uint64
size
()
const
;
inline
void
set_size
(
::
google
::
protobuf
::
uint64
value
);
// optional uint64 arg0_u64 = 2;
inline
bool
has_arg0_u64
()
const
;
inline
void
clear_arg0_u64
();
static
const
int
kArg0U64FieldNumber
=
2
;
inline
::
google
::
protobuf
::
uint64
arg0_u64
()
const
;
inline
void
set_arg0_u64
(
::
google
::
protobuf
::
uint64
value
);
// optional uint64 arg1_u64 = 3;
inline
bool
has_arg1_u64
()
const
;
inline
void
clear_arg1_u64
();
static
const
int
kArg1U64FieldNumber
=
3
;
inline
::
google
::
protobuf
::
uint64
arg1_u64
()
const
;
inline
void
set_arg1_u64
(
::
google
::
protobuf
::
uint64
value
);
// @@protoc_insertion_point(class_scope:loomcomm.Data)