Newer
Older
Martin Beseda
committed
#include <algorithm>
#include <limits>
#include <vector>
#include "../mpi_wrapper.h"
Martin Beseda
committed
#include <boost/serialization/export.hpp>
Martin Beseda
committed
#include "DataSetSerialization.h"
#include "exceptions.h"
Martin Beseda
committed
BOOST_CLASS_EXPORT_IMPLEMENT(lib4neuro::DataSet);
Martin Beseda
committed
namespace lib4neuro {
Martin Beseda
committed
Martin Beseda
committed
DataSet::DataSet() {
this->n_elements = 0;
this->input_dim = 0;
this->output_dim = 0;
this->normalization_strategy = std::make_shared<DoubleUnitStrategy>(DoubleUnitStrategy());
Martin Beseda
committed
}
void DataSet::MPI_redistribute_data( ){
COUT_INFO("Shuffling DataSet among " << lib4neuro::mpi_nranks << " MPI processes...");
size_t local_len = this->data.size();
std::vector<int> n_entries_per_process( lib4neuro::mpi_nranks );
std::fill(n_entries_per_process.begin(), n_entries_per_process.end(), 0);
n_entries_per_process[ lib4neuro::mpi_rank ] = local_len;
MPI_Allreduce( MPI_IN_PLACE, &n_entries_per_process[0], n_entries_per_process.size(), MPI_INT, MPI_SUM, lib4neuro::mpi_active_comm );
std::vector<int> n_target_entries_per_process( lib4neuro::mpi_nranks );
int total_n_entries = 0;
for( auto el: n_entries_per_process ){
total_n_entries += el;
}
int remainder = total_n_entries % lib4neuro::mpi_nranks;
int base = total_n_entries / lib4neuro::mpi_nranks;
for( int i = 0; i < n_target_entries_per_process.size(); ++i ){
n_target_entries_per_process[ i ] = base;
if( i < remainder ){
n_target_entries_per_process[ i ]++;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
}
std::vector<std::vector<int>> transfer_matrix(lib4neuro::mpi_nranks);
for( int i = 0; i < lib4neuro::mpi_nranks; ++i ){
transfer_matrix[ i ].resize(lib4neuro::mpi_nranks);
std::fill(transfer_matrix[ i ].begin(), transfer_matrix[ i ].end(), 0);
}
for( int sender_proc = 0; sender_proc < lib4neuro::mpi_nranks; ++sender_proc ){
int entries_to_send = n_entries_per_process[ sender_proc ] - n_target_entries_per_process[ sender_proc ];
if( entries_to_send <= 0 ){
continue;
}
for( int recieve_proc = 0; recieve_proc < lib4neuro::mpi_nranks; ++recieve_proc ){
if( recieve_proc == sender_proc ){
continue;
}
int entries_equalized = n_entries_per_process[ recieve_proc ];
if( entries_equalized == n_target_entries_per_process[recieve_proc] ){
//nothing left to do
continue;
}
int missing_entries = n_target_entries_per_process[ recieve_proc ] - entries_equalized;
transfer_matrix[ sender_proc ][ recieve_proc ] = std::min( missing_entries, entries_to_send );
transfer_matrix[ recieve_proc ][ sender_proc ] = -transfer_matrix[ sender_proc ][ recieve_proc ];
entries_to_send -= transfer_matrix[ sender_proc ][ recieve_proc ];
n_entries_per_process[ recieve_proc ] += transfer_matrix[ sender_proc ][ recieve_proc ];
if(entries_to_send == 0){
break;
}
}
}
// if( lib4neuro::mpi_rank == 0 ){
// std::cout << "Transfer matrix" << std::endl;
// for( int sender_proc = 0; sender_proc < lib4neuro::mpi_nranks; ++sender_proc ){
// for( int recieve_proc = 0; recieve_proc < lib4neuro::mpi_nranks; ++recieve_proc ){
// std::cout << "[" << transfer_matrix[ sender_proc ][ recieve_proc ] << "]";
// }
// std::cout << std::endl;
// }
// std::cout << std::endl;
// }
//TODO can be optimized
/* do we send some data? */
size_t item_idx_to_send = n_target_entries_per_process[lib4neuro::mpi_rank];
for( int recieve_proc = 0; recieve_proc < lib4neuro::mpi_nranks; ++recieve_proc ){
if( transfer_matrix[lib4neuro::mpi_rank][recieve_proc] > 0 ){
size_t len = transfer_matrix[lib4neuro::mpi_rank][recieve_proc];
std::vector<double> data_vector;
data_vector.reserve(len * (this->input_dim + this->output_dim));
for( size_t item_idx = item_idx_to_send; item_idx < item_idx_to_send + len; ++item_idx ){
for( auto val: this->data[ item_idx ].first ){
data_vector.push_back( val );
}
for( auto val: this->data[ item_idx ].second ){
data_vector.push_back( val );
}
}
// std::cout << "MPI " << lib4neuro::mpi_rank << " sending " << data_vector.size() << " entries to MPI " << recieve_proc << std::endl;
MPI_Send(&data_vector[0], data_vector.size(), MPI_DOUBLE, recieve_proc, 0, lib4neuro::mpi_active_comm );
item_idx_to_send += len;
}
}
/* do we recieve some data? */
for( int sender_proc = 0; sender_proc < lib4neuro::mpi_nranks; ++sender_proc ){
if( transfer_matrix[lib4neuro::mpi_rank][sender_proc] < 0 ){
size_t len = -transfer_matrix[lib4neuro::mpi_rank][sender_proc];
std::vector<double> data_vector(len * (this->input_dim + this->output_dim));
// std::cout << "MPI " << lib4neuro::mpi_rank << " receiving " << data_vector.size() << " entries from MPI " << sender_proc << std::endl;
MPI_Recv(&data_vector[0], data_vector.size(), MPI_DOUBLE, sender_proc, 0, lib4neuro::mpi_active_comm, MPI_STATUS_IGNORE );
for( size_t i = 0; i < len; ++i ){
std::vector<double> inp(this->input_dim);
std::vector<double> out(this->output_dim);
size_t first_t = i * (this->input_dim + this->output_dim);
for( size_t j = first_t; j < first_t + this->input_dim; ++j ){
inp[j - first_t] = data_vector[ j ];
}
first_t += this->input_dim;
for( size_t j = first_t; j < first_t + this->output_dim; ++j ){
out[j - first_t] = data_vector[ j ];
}
this->data.emplace_back(std::make_pair(inp, out));
}
Martin Beseda
committed
}
Martin Beseda
committed
}
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
this->data.resize(n_target_entries_per_process[lib4neuro::mpi_rank]);
this->n_elements = this->data.size();
COUT_INFO("Shuffling finished!");
std::cout << "MPI[" << lib4neuro::mpi_rank << "] -> " << this->n_elements << " data entries" << std::endl;
}
void DataSet::MPI_gather_data_on_master( ){
size_t local_len = this->data.size();
std::cout << "MPI[" << lib4neuro::mpi_rank << "] -> " << local_len << " data entries" << std::endl;
}
DataSet::DataSet(std::string file_path) {
if( lib4neuro::mpi_rank == 0 ){
std::ifstream ifs(file_path);
if (ifs.is_open()) {
try {
boost::archive::text_iarchive ia(ifs);
ia >> *this;
}
catch (boost::archive::archive_exception& e) {
MPI_INTERRUPT
THROW_RUNTIME_ERROR(
"Serialized archive error: '" + e.what() + "'! Please, check if your file is really "
"the serialized DataSet.");
}
ifs.close();
} else {
MPI_INTERRUPT
THROW_RUNTIME_ERROR("File " + file_path + " couldn't be open!");
}
}
MPI_ERROR_CHECK
this->MPI_redistribute_data( );
Martin Beseda
committed
this->normalization_strategy = std::make_shared<DoubleUnitStrategy>(DoubleUnitStrategy());
Martin Beseda
committed
}
Martin Beseda
committed
DataSet::DataSet(std::vector<std::pair<std::vector<double>, std::vector<double>>>* data_ptr,
NormalizationStrategy* ns) {
this->data.clear();
Martin Beseda
committed
this->n_elements = data_ptr->size();
this->data = *data_ptr;
this->input_dim = this->data[0].first.size();
Martin Beseda
committed
this->output_dim = this->data[0].second.size();
Martin Beseda
committed
std::shared_ptr<NormalizationStrategy> ns_tmp;
ns_tmp.reset(ns);
this->normalization_strategy = ns_tmp;
Martin Beseda
committed
//TODO check the complete data set for input/output dimensions
}
Martin Beseda
committed
DataSet::DataSet(size_t inp_dim, size_t out_dim, NormalizationStrategy* ns) {
this->n_elements = 0;
this->input_dim = inp_dim;
this->output_dim = out_dim;
if (ns) {
std::shared_ptr<NormalizationStrategy> ns_tmp;
ns_tmp.reset(ns);
this->normalization_strategy = ns_tmp;
}
//TODO check the complete data set for input/output dimensions
}
DataSet::DataSet(double lower_bound,
double upper_bound,
unsigned int size,
double output,
NormalizationStrategy* ns) {
Martin Beseda
committed
std::vector<std::pair<std::vector<double>, std::vector<double>>> new_data_vec;
Martin Beseda
committed
this->n_elements = 0;
Martin Beseda
committed
this->output_dim = 1;
std::shared_ptr<NormalizationStrategy> ns_tmp(ns);
this->add_isotropic_data(lower_bound,
upper_bound,
size,
output);
Martin Beseda
committed
}
unsigned int no_elems_in_one_dim,
std::vector<double> (* output_func)(std::vector<double>&),
unsigned int output_dim,
NormalizationStrategy* ns) {
Martin Beseda
committed
std::vector<std::pair<std::vector<double>, std::vector<double>>> new_data_vec;
this->data = new_data_vec;
this->input_dim = bounds.size() / 2;
Martin Beseda
committed
this->output_dim = output_dim;
this->n_elements = 0;
std::shared_ptr<NormalizationStrategy> ns_tmp;
ns_tmp.reset(ns);
this->normalization_strategy = ns_tmp;
this->add_isotropic_data(bounds,
no_elems_in_one_dim,
output_func);
Martin Beseda
committed
}
Martin Beseda
committed
DataSet::~DataSet() {}
void DataSet::shift_outputs_to_zero() {
auto first_elem = this->data.at(0).second;
for(size_t j = 0; j < this->data.size(); ++j){
for(size_t i = 0; i < this->get_output_dim(); ++i){
Martin Beseda
committed
this->data.at(j).second.at(i) -= first_elem.at(i);
}
}
}
void DataSet::add_data_pair(std::vector<double>& inputs,
std::vector<double>& outputs) {
if (this->n_elements == 0 && this->input_dim == 0 && this->output_dim == 0) {
this->output_dim = outputs.size();
}
Martin Beseda
committed
if (inputs.size() != this->input_dim) {
THROW_RUNTIME_ERROR("Bad input dimension.");
Martin Beseda
committed
} else if (outputs.size() != this->output_dim) {
THROW_RUNTIME_ERROR("Bad output dimension.");
Martin Beseda
committed
}
Martin Beseda
committed
this->n_elements++;
this->data.emplace_back(std::make_pair(inputs,
outputs));
Martin Beseda
committed
}
void DataSet::add_isotropic_data(double lower_bound,
double upper_bound,
unsigned int size,
double output) {
Martin Beseda
committed
if (this->input_dim != 1 || this->output_dim != 1) {
THROW_RUNTIME_ERROR("Cannot add data with dimensionality 1:1 when the data set "
"is of different dimensionality!");
Martin Beseda
committed
}
THROW_INVALID_ARGUMENT_ERROR("Size of added data has to be >=1 !");
} else if (size == 1) {
frac = 1;
} else {
frac = (upper_bound - lower_bound) / (size - 1);
}
Martin Beseda
committed
std::vector<double> inp, out;
Martin Beseda
committed
out = {output};
Martin Beseda
committed
for (unsigned int i = 0; i < size; ++i) {
inp = {frac * i};
this->data.emplace_back(std::make_pair(inp,
out));
Martin Beseda
committed
}
Martin Beseda
committed
this->n_elements += size;
void DataSet::add_isotropic_data(std::vector<double>& bounds,
unsigned int no_elems_in_one_dim,
std::vector<double> (* output_func)(std::vector<double>&)) {
Martin Beseda
committed
// TODO add check of dataset dimensions
Martin Beseda
committed
std::vector<std::vector<double>> grid;
std::vector<double> tmp;
double frac;
THROW_INVALID_ARGUMENT_ERROR("Number of elements in one dimension has to be >=1 !");
}
Martin Beseda
committed
for (unsigned int i = 0; i < bounds.size(); i += 2) {
frac = (bounds[i] - bounds[i + 1]) / (no_elems_in_one_dim - 1);
Martin Beseda
committed
tmp.clear();
for (double j = bounds[i]; j <= bounds[i + 1]; j += frac) {
tmp.emplace_back(j);
}
Martin Beseda
committed
grid.emplace_back(tmp);
Martin Beseda
committed
grid = this->cartesian_product(&grid);
for (auto vec : grid) {
this->n_elements++;
this->data.emplace_back(std::make_pair(vec,
output_func(vec)));
Martin Beseda
committed
}
std::vector<std::pair<std::vector<double>, std::vector<double>>>* DataSet::get_data() {
Martin Beseda
committed
return &(this->data);
}
Martin Beseda
committed
size_t DataSet::get_n_elements() {
return this->n_elements;
Martin Beseda
committed
size_t DataSet::get_input_dim() {
return this->input_dim;
}
Martin Beseda
committed
size_t DataSet::get_output_dim() {
return this->output_dim;
}
void DataSet::print_data() {
if (n_elements) {
for( int i = 0; i < lib4neuro::mpi_nranks; ++i ){
if( lib4neuro::mpi_rank == i ){
for (auto p : this->data) {
/* INPUT */
for (auto v : std::get<0>(p)) {
std::cout << v << " ";
}
std::cout << "-> ";
/* OUTPUT */
for (auto v : std::get<1>(p)) {
std::cout << v << " ";
}
std::cout << std::endl;
}
Martin Beseda
committed
}
MPI_Barrier( lib4neuro::mpi_active_comm );
Martin Beseda
committed
}
Martin Beseda
committed
void DataSet::store_text(std::string file_path) {
this->MPI_gather_data_on_master( );
if( lib4neuro::mpi_rank == 0 ){
std::ofstream ofs(file_path);
if (!ofs.is_open()) {
MPI_INTERRUPT
THROW_RUNTIME_ERROR("File " + file_path + " couldn't be opened!");
} else {
boost::archive::text_oarchive oa(ofs);
oa << *this;
ofs.close();
}
void DataSet::store_data_text(std::ofstream* file_path) {
if( lib4neuro::mpi_rank == 0 ){
for (auto e : this->data) {
/* First part of the pair */
for (unsigned int i = 0; i < e.first.size() - 1; i++) {
*file_path << this->get_denormalized_value(e.first.at(i)) << ",";
}
*file_path << this->get_denormalized_value(e.first.back()) << " ";
/* Second part of the pair */
for (unsigned int i = 0; i < e.second.size() - 1; i++) {
*file_path << this->get_denormalized_value(e.second.at(i)) << ",";
}
*file_path << this->get_denormalized_value(e.second.back()) << std::endl;
}
}
}
void DataSet::store_data_text(std::string file_path) {
this->MPI_gather_data_on_master( );
if( lib4neuro::mpi_rank == 0 ){
std::ofstream ofs(file_path);
if (!ofs.is_open()) {
MPI_INTERRUPT
THROW_RUNTIME_ERROR("File " + file_path + " couldn't be opened!");
} else {
this->store_data_text(&ofs);
ofs.close();
}
Martin Beseda
committed
template<class T>
std::vector<std::vector<T>> DataSet::cartesian_product(const std::vector<std::vector<T>>* v) {
Martin Beseda
committed
std::vector<std::vector<double>> v_combined_old, v_combined, v_tmp;
Martin Beseda
committed
tmp = {e};
v_combined.emplace_back(tmp);
}
Martin Beseda
committed
for (unsigned int i = 1; i < v->size(); i++) { // Iterate through remaining vectors of 'v'
v_combined_old = v_combined;
v_combined.clear();
for (const auto& e : v->at(i)) {
for (const auto& vec : v_combined_old) {
Martin Beseda
committed
tmp = vec;
tmp.emplace_back(e);
/* Add only unique elements */
if (std::find(v_combined.begin(),
v_combined.end(),
tmp) == v_combined.end()) {
Martin Beseda
committed
v_combined.emplace_back(tmp);
}
Martin Beseda
committed
return v_combined;
Martin Beseda
committed
void DataSet::normalize() {

Michal Kravcenko
committed
this->normalized = false;
THROW_INVALID_ARGUMENT_ERROR("There is no normalization strategy given for this data set, so it can not be "
"normalized!");
Martin Beseda
committed
/* Find maximum and minimum values */
this->max_min_inp_val.resize( 2 );
this->max_min_inp_val[0] = 0.0;
this->max_min_inp_val[1] = std::numeric_limits<double>::max();
Martin Beseda
committed
/* Finding maximum */
//TODO make more efficiently
tmp = *std::max_element(pair.first.begin(),
pair.first.end());
tmp2 = *std::max_element(pair.second.begin(),
pair.second.end());
Martin Beseda
committed
this->max_min_inp_val.at(0) = std::max(this->max_min_inp_val.at(0), std::max(tmp, tmp2));
Martin Beseda
committed
/* Finding minimum */
tmp = *std::min_element(pair.first.begin(),
pair.first.end());
tmp2 = *std::min_element(pair.second.begin(),
pair.second.end());
Martin Beseda
committed
this->max_min_inp_val.at(0) = std::min(this->max_min_inp_val.at(0), std::min(tmp, tmp2));
Martin Beseda
committed
}
MPI_Allreduce( MPI_IN_PLACE, &this->max_min_inp_val[0], 1, MPI_DOUBLE, MPI_MAX, lib4neuro::mpi_active_comm);
MPI_Allreduce( MPI_IN_PLACE, &this->max_min_inp_val[1], 1, MPI_DOUBLE, MPI_MIN, lib4neuro::mpi_active_comm);
Martin Beseda
committed
/* Normalize every number in the data set */
for (auto& pair : this->data) {
for (auto& v : pair.first) {
v = this->normalization_strategy->normalize(v,
this->max_min_inp_val.at(0),
this->max_min_inp_val.at(1));
Martin Beseda
committed
}
for (auto& v : pair.second) {
v = this->normalization_strategy->normalize(v,
this->max_min_inp_val.at(0),
this->max_min_inp_val.at(1));
Martin Beseda
committed
}
}

Michal Kravcenko
committed
this->normalized = true;
Martin Beseda
committed
}
double DataSet::get_normalized_value(double val) {
if (!this->normalized || !this->normalization_strategy) {

Michal Kravcenko
committed
return val;
}
return this->normalization_strategy->normalize(val,
this->max_min_inp_val.at(0),
this->max_min_inp_val.at(1));

Michal Kravcenko
committed
}
double DataSet::get_denormalized_value(double val) {
if (!this->normalized || !this->normalization_strategy) {
return val;
}
return this->normalization_strategy->de_normalize(val);
void DataSet::get_input(std::vector<double>& d,
size_t idx) {
assert(d.size() == this->data[idx].first.size());
for (size_t j = 0; j < this->data[idx].first.size(); ++j) {
d[j] = this->data[idx].first[j];
}
}
void DataSet::get_output(std::vector<double>& d,
size_t idx) {
assert(d.size() == this->data[idx].second.size());
for (size_t j = 0; j < this->data[idx].second.size(); ++j) {
d[j] = this->data[idx].second[j];
}
}
void DataSet::de_normalize() {
std::vector<double> tmp_inp(this->data.at(0).first.size());
std::vector<double> tmp_out(this->data.at(0).second.size());
for (auto& pair: this->data) {
for (size_t i = 0; i < pair.first.size(); i++) {
tmp_inp.at(i) = this->normalization_strategy->de_normalize(pair.first.at(i));
}
pair.first = tmp_inp;
}
for (auto& pair: this->data) {
for (size_t i = 0; i < pair.second.size(); i++) {
tmp_out.at(i) = this->normalization_strategy->de_normalize(pair.second.at(i));
}
pair.second = tmp_out;
}
Martin Beseda
committed
/* Remove found max and minimal values, because of is_normalized() method */
this->max_min_inp_val.clear();
void DataSet::de_normalize_single(std::vector<double>& d1,
std::vector<double>& d2) {
assert(d1.size() == d2.size());
for (size_t j = 0; j < d1.size(); ++j) {
d2[j] = this->normalization_strategy->de_normalize(d1[j]);
NormalizationStrategy* DataSet::get_normalization_strategy() {
void DataSet::set_normalization_strategy(NormalizationStrategy* ns) {
this->normalization_strategy.reset(ns);
}
}
bool DataSet::is_normalized() {
return !this->max_min_inp_val.empty();
}
double DataSet::get_max_inp_val() {
Martin Beseda
committed
return this->max_min_inp_val.at(0);
}
double DataSet::get_min_inp_val() {
Martin Beseda
committed
return this->max_min_inp_val.at(1);
Martin Beseda
committed
/**
* Method returning random amount of data pairs between 1-max
*/
std::vector<std::pair<std::vector<double>, std::vector<double>>> DataSet::get_random_data_batch(size_t max) {
Martin Beseda
committed
return this->data;
} else {
std::vector<std::pair<std::vector<double>, std::vector<double>>> newData;
size_t n_chosen = rand() % std::min(max,
this->data.size()) + 1;
std::vector<size_t> chosens;
for (size_t i = 0; i < n_chosen; i++) {
auto it = std::find(chosens.begin(),
chosens.end(),
chosen);
Martin Beseda
committed
if (it != chosens.end()) {
i--;
} else {
newData.push_back(this->data.at(chosen));
return newData;
}
}
void DataSet::add_zero_output_columns(size_t n_columns)
{
for (size_t i = 0; i < this->n_elements; i++)
{
for (size_t j = 0; j < n_columns; j++)
{
this->data.at(i).second.push_back(0);
}
}
this->output_dim += n_columns;
}
arma::Mat<double>* DataSet::get_inputs_matrix() {
this->inputs_matrix = new arma::Mat<double>(this->data.size(), this->data.at(0).first.size());
// arma::Mat<double> m(this->data.size(), this->data.at(0).first.size());
for (size_t i = 0; i < this->data.size(); i++) {
this->inputs_matrix->row(i) = arma::Row<double>(this->data.at(i).first);
}
// this->inputs_matrix = &m;
return this->inputs_matrix;
}
arma::Mat<double>* DataSet::get_outputs_matrix() {
this->outputs_matrix = new arma::Mat<double>(this->data.size(), this->data.at(0).second.size());
for(size_t i = 0; i < this->data.size(); i++) {
this->outputs_matrix->row(i) = arma::Row<double>(this->data.at(i).second);
}
// this->outputs_matrix = &m;
return this->outputs_matrix;
}