#include "Uncertainity.h" #include "UncertainityOptions.h" #include "AbstractParam.h" #include "AbstractRandom.h" #include "PrecipitationUncertainity.h" #include "CSVWriter.h" #include "UniformRandom.h" #include "NormalRandom.h" #include "ManningUncertainity.h" #include "CnUncertainity.h" #include "easylogging++.h" #include <iostream> #include <algorithm> #include <cmath> #include <memory> #include <vector> #include <mpi.h> #include <omp.h> #define _ELPP_THREAD_SAFE namespace math1d_cl { Uncertainity::Uncertainity(std::string config_xml, std::shared_ptr<math1d_cl::MatData> matData, uint32_t mcCount) { MPI_Comm_size(MPI_COMM_WORLD, &m_numProc); MPI_Comm_rank(MPI_COMM_WORLD, &m_rank); m_options = SetOptions(config_xml); // overwrite number of Monte Carlo samples to be simulated m_options.simulationCount = mcCount; m_matData = matData; // Determine chunks of iterations per process int modulo = m_options.simulationCount % m_numProc; int size = (int) std::floor(m_options.simulationCount / m_numProc); for (int p = 0; p < m_numProc; ++p) { int sizePerProcess = size; if( modulo > 0 ) { sizePerProcess++; modulo--; } m_options.chunkSizes.push_back(sizePerProcess); if(m_rank == 0) { CLOG(INFO, "montecarlo") << "[MPI Rank " << p << "]: Chunk distribution "<< m_options.chunkSizes[p] << "/" << m_options.simulationCount; } } if(m_rank == 0) { // Copy original hydrographs for (size_t i = 0; i < m_matData->getChannels().size(); ++i) { m_origHydrographs.push_back(m_matData->getChannels()[i]->getHydrograph()); } } // Check for any parameter selected if((m_options.simParameter.precipitations || m_options.simParameter.manning || m_options.simParameter.cn) == false) { CLOG(FATAL,"montecarlo") << "ERROR: No parameter selected for Monte Carlo simulation."; std::exit(-1); } /* Precipitations */ if(m_options.simParameter.precipitations == true) { std::shared_ptr<AbstractRandom> precipRandom; // Create selected random function switch(m_options.precipRandType) { case RandomType::KERNEL_DENSITY : precipRandom = std::make_shared<KernelDensity>(m_options); break; default: CLOG(FATAL,"montecarlo") << "ERROR: Unknown random function, check precipitation random function options."; std::exit(-1); break; } // Create precipitation uncertainity and save it precipRandom->seed(); m_uncertainParameters.push_back(std::make_shared<math1d_cl::PrecipitationUncertainity>(precipRandom, m_options, m_matData)); } /* Manning's coefficient */ if(m_options.simParameter.manning == true) { std::shared_ptr<AbstractRandom> manningRandom; // Create selected random function switch(m_options.nRandType) { case RandomType::UNIFORM : manningRandom = std::make_shared<UniformRandom>(m_options); break; case RandomType::NORMAL : manningRandom = std::make_shared<NormalRandom>(m_options); break; default: CLOG(FATAL,"montecarlo") << "ERROR: Unknown random function, check Mannings random function options."; std::exit(-1); break; } manningRandom->seed(); m_uncertainParameters.push_back(std::make_shared<math1d_cl::ManningUncertainity>(manningRandom, m_options, m_matData)); } /* CN Curve number */ if(m_options.simParameter.cn == true) { std::shared_ptr<AbstractRandom> cnRandom; // Create selected random function switch(m_options.cnRandType) { case RandomType::UNIFORM : cnRandom = std::make_shared<UniformRandom>(m_options); break; case RandomType::NORMAL : cnRandom = std::make_shared<NormalRandom>(m_options); break; default: CLOG(FATAL,"montecarlo") << "ERROR: Unknown random function, check CN random function options."; std::exit(-1); break; } cnRandom->seed(); m_uncertainParameters.push_back(std::make_shared<math1d_cl::CnUncertainity>(cnRandom, m_options, m_matData)); } // Compute timestep count m_timeSteps = 1 + (m_options.daysMeasured + m_options.daysPredicted) * m_options.valuesPerDay; // Get station count m_stationsCount = m_matData->getPrecipitations().front().second.size(); // Check if timeSteps data are correct if(m_timeSteps != m_matData->getPrecipitations().size()) { CLOG(FATAL,"montecarlo") << "ERROR: Timestep count of input precipitations is incorrect, check Measured/Predicted days."; std::exit(-1); } } void Uncertainity::Initialize() { CLOG(INFO,"montecarlo") << "Initializing Monte carlo simulation"; for (size_t p = 0; p < m_options.chunkSizes.size(); ++p) { m_qChunkSizes.push_back(m_options.chunkSizes[p] * m_timeSteps * m_matData->getChannels().size()); m_qChunkDispl.push_back(p * m_options.chunkSizes[p] * m_timeSteps * m_matData->getChannels().size()); //m_hChunkSizes.push_back(m_options.chunkSizes[p] * m_timeSteps * m_stationsCount); //m_hChunkDispl.push_back(p * m_options.chunkSizes[p] * m_timeSteps * m_stationsCount); } /* Generate values for monte carlo run */ for(std::vector<std::shared_ptr<math1d_cl::AbstractParam>>::const_iterator it = m_uncertainParameters.begin(); it != m_uncertainParameters.end(); ++it) { (*it)->generateValues(); } } /* Make a copy of model for each thread */ void Uncertainity::CreateModels(size_t modelsNumber) { int modelsCreated = 0; for (size_t i = m_models.size(); i < modelsNumber; ++i) { math1d_cl::MatData model(*(m_matData.get())); m_models.push_back(model); } CLOG(INFO, "montecarlo") << modelsCreated << " models created"; } void Uncertainity::RunMC(size_t threadsNumber) { CLOG(INFO,"montecarlo") << "Monte carlo simulation"; m_localQ.reserve(m_qChunkSizes[m_rank]); // Resize to fit all local hydrographs omp_set_num_threads(threadsNumber); int max_threads = omp_get_max_threads(); CLOG(INFO, "montecarlo") << "[OpenMP] Max " << max_threads << " threads available"; /* Run monte carlo loop Every process runs its corresponding number of iterations (including root rank) */ CLOG(INFO, "montecarlo") << "Running Monte Carlo on rank " << m_rank << " ..."; #pragma omp parallel { #pragma omp single CLOG(INFO, "montecarlo") << "[OpenMP] " << omp_get_num_threads() << " threads used"; #pragma omp for schedule(dynamic) for (size_t i = 0; i < threadsNumber; ++i) //m_options.chunkSizes[m_rank]; { int tid = omp_get_thread_num(); //CLOG(DEBUG, "montecarlo") << "[Rank " << m_rank << " | Thread " << tid << "] MC Run no. " << (i+1) << "/" << m_options.chunkSizes[rank]; std::cout << "." << std::flush; #pragma omp critical { // Get randomized value for each selected uncertain parameter for(std::vector<std::shared_ptr<math1d_cl::AbstractParam>>::const_iterator it = m_uncertainParameters.begin(); it != m_uncertainParameters.end(); ++it) { (*it)->setParam(m_models[tid]); } } // Run Math1D model m_models[tid].rainfallRunoffModel(); // Collect data to intermediate array #pragma omp critical { for (size_t ch = 0; ch < m_matData->getChannels().size(); ++ch) { m_localQ.insert(m_localQ.end(), m_models[tid].getChannels()[ch]->getHydrograph().getQOut().begin(), m_models[tid].getChannels()[ch]->getHydrograph().getQOut().begin() + m_timeSteps); } } }// For iterations }// OMP Parallel std::cout << std::endl; CLOG(INFO, "montecarlo") << "[Rank " << m_rank << "] DONE."; } void Uncertainity::CollectResults() { /* MPI Perform gathering of all results into array allocated on root rank */ // Gathered data std::vector<double> gatheredQ; //std::vector<double> gatheredH; if(m_rank == 0) { CLOG(INFO, "montecarlo") << "[MPI]Gathering results..."; // Resize to fit all gathered Q values gatheredQ.resize(m_options.simulationCount * m_timeSteps * m_matData->getChannels().size()); } //MPI_Gather(intermediateHydrographs, intermediateHydrographsSize, MPI_DOUBLE, gatheredHydrographs, intermediateHydrographsSize, MPI_DOUBLE, 0, MPI_COMM_WORLD); MPI_Gatherv(&m_localQ.front(), m_qChunkSizes[m_rank], MPI_DOUBLE, &gatheredQ.front(), &m_qChunkSizes.front(), &m_qChunkDispl.front(), MPI_DOUBLE, 0, MPI_COMM_WORLD); if(m_rank == 0) { CLOG(INFO, "montecarlo") << "[MPI]Gathering DONE."; } MPI_Finalize(); CLOG(INFO, "montecarlo") << "Collect quantiles..."; std::vector<std::vector<math1d_cl::Hydrograph>> hydrographs(m_options.quantiles.size()); // Holds hydrographs for each quantile and each channel for(size_t ch = 0; ch < m_matData->getChannels().size(); ++ch) { CLOG(DEBUG, "montecarlo") << "Channel " << ch; std::vector<std::vector<double>> channelHydrographQuantiles; // Holds hydrographs for one channels and all quantiles channelHydrographQuantiles.resize(m_options.quantiles.size()); for(size_t tm = 0; tm < m_timeSteps; ++tm) { std::vector<double> tmResult; // Holds values for all iterations, one channel, one timestep for(size_t it = 0; it < (size_t)m_options.simulationCount; ++it) { double *valptr = gatheredQ.data() + tm + (m_timeSteps * ch) + (m_timeSteps * m_matData->getChannels().size() * it); tmResult.push_back((*valptr)); } // Get quantiles std::vector<double> quantilesPerTimestep = GetQuantile(tmResult, m_options.quantiles); tmResult.clear(); // Append quantiles to output hydrographs for (size_t q = 0; q < quantilesPerTimestep.size(); ++q) { channelHydrographQuantiles[q].push_back(quantilesPerTimestep[q]); } } // Timesteps // Insert resulting quantiles for one channel for (size_t chq = 0; chq < channelHydrographQuantiles.size(); ++chq) { math1d_cl::Hydrograph hydrograph; hydrograph.setQOut(channelHydrographQuantiles[chq]); hydrographs[chq].push_back(hydrograph); } channelHydrographQuantiles.clear(); } // Channels // Save results math1d_cl::CSVWriter csvwriter; for (size_t q = 0; q < m_options.quantiles.size(); ++q) { CLOG(INFO, "montecarlo") << "Saving " << m_options.quantiles[q] << "pct quantile..."; std::string qFileName = m_options.resultsDir + "/Q_" + std::to_string((int)m_options.quantiles[q]) + "_quantile.csv"; std::string hFileName = m_options.resultsDir + "/H_" + std::to_string((int)m_options.quantiles[q]) + "_quantile.csv"; csvwriter.saveMCResult(m_matData,hydrographs[q], m_timeSteps, qFileName, hFileName); CLOG(INFO, "montecarlo") << "OK"; } } std::vector<double> Uncertainity::GetQuantile(std::vector<double> &input, std::vector<double> quantiles) { std::vector<double> result; result.reserve(quantiles.size()); std::sort(input.begin(),input.end(),std::less<double>()); // Sort the vector for (size_t i = 0; i < quantiles.size(); ++i) { // Get quantile int qIdx = floor(input.size()*(quantiles[i]/100)); double value = 0.0; if(input.size() % 2 == 0 && qIdx+1 < (int)input.size()) // Check vector size { // Even element count value = (input[qIdx]+input[qIdx+1])/2; } else { // Odd element count value = input[qIdx]; } result.push_back(value); } // Return quantiles return result; } math1d_cl::UncertainityOptions Uncertainity::SetOptions(std::string fileName) { pugi::xml_document doc; pugi::xml_parse_result result = doc.load_file(fileName.c_str()); if (result.status != pugi::status_ok) { // Config file load unsuccesfull CLOG(FATAL, "montecarlo") << "Configuration load result: " << std::string(result.description()); std::exit(EXIT_FAILURE); } pugi::xml_node params = doc.child("conf").child("uncertainity").child("params"); // Set options math1d_cl::UncertainityOptions options; options.simulationCount = params.child("mcCount").text().as_int(); pugi::xml_node quantiles = params.child("quantiles"); for (pugi::xml_node q = quantiles.first_child(); q; q = q.next_sibling()) { options.quantiles.push_back(q.text().as_double()); } std::string precRandType = params.child("precRandType").text().as_string(); if (precRandType == "KERNEL_DENSITY") { options.precipRandType = math1d_cl::RandomType::KERNEL_DENSITY; } options.daysMeasured = params.child("daysMeasured").text().as_int(); options.daysPredicted = params.child("daysPredicted").text().as_int(); options.valuesPerDay = params.child("valuesPerDay").text().as_int(); options.resultsDir = doc.child("conf").child("resourcesPath").text().as_string(); options.precipDeviation = params.child("precDeviation").text().as_double(); pugi::xml_node limits = params.child("precLimits"); for (pugi::xml_node l = limits.first_child(); l; l = l.next_sibling()) { options.limits.push_back({ l.child("value").text().as_double(), l.child("file").text().as_string() }); }// Add precip. limits // Set parameters to simulate options.simParameter.precipitations = params.child("precEnabled").text().as_bool(); // Manning's N options.simParameter.manning = params.child("nEnabled").text().as_bool(); options.nDeviation = params.child("nDeviation").text().as_double(); options.nLimits.lower = params.child("nLimitLower").text().as_double(); options.nLimits.upper = params.child("nLimitUpper").text().as_double(); // CN Curve number options.simParameter.cn = params.child("cnEnabled").text().as_bool(); options.cnDeviation = params.child("cnDeviation").text().as_double(); options.cnLimits.lower = params.child("cnLimitLower").text().as_double(); options.cnLimits.upper = params.child("cnLimitUpper").text().as_double(); options.chunkSizes.reserve(1); // Chunk sizes should be initialized before copying*/ return options; } /* math1d_cl::Hydrograph Uncertainity::GetQuantile(std::vector<math1d_cl::Hydrograph> hydrographs, double quantile, size_t currentChannel) { math1d_cl::Hydrograph result; std::vector<double> qOut; /// Qout for one timestep and all simulations qOut.reserve(m_timeSteps); std::vector<double> qOutRes; /// Qout with computed quantile values qOutRes.reserve(m_timeSteps); // size_t predictStart = (m_options.daysMeasured > 0) ? (m_options.daysMeasured * m_options.valuesPerDay) + 1 : 0; for(size_t tm = 0; tm < m_timeSteps; ++tm) { if (tm < predictStart) { // Copy value made from (earlier) measured precipitations // Measured values are equal across all montecarlo simulations //qOutRes.push_back(hydrographs.front().getQOut()[tm]); qOutRes.push_back(m_origHydrographs[currentChannel].getQOut()[tm]); } else { } // Get values from predicted values for each iteration and for one timestep for (size_t i = 0; i < hydrographs.size(); ++i) { qOut.push_back(hydrographs[i].getQOut()[tm]); } // Sort them asc std::sort(qOut.begin(),qOut.end(),std::less<double>()); // Get quantile int qIdx = floor(qOut.size()*(quantile/100)); double value = 0.0; if(qOut.size() % 2 == 0 && qIdx+1 < (int)qOut.size()) // Check vector size { // Even element count value = (qOut[qIdx]+qOut[qIdx+1])/2; } else { // Odd element count value = qOut[qIdx]; } qOutRes.push_back(value); qOut.clear(); } result.setQOut(qOutRes); return result; } */ }