Skip to content
Snippets Groups Projects
Commit 82e9f818 authored by Vit Ptosek's avatar Vit Ptosek
Browse files

Merge branch 'db_time_range'

parents 1214fda6 351052fb
No related branches found
No related tags found
No related merge requests found
......@@ -4,8 +4,11 @@ namespace FcdAggregationLauncher.Data {
public class Aggregates {
public long SegmentId { get; set; }
public float SegmentSpeed { get; set; }
public DateTime FcdTime { get; set; }
public float Reliability { get; set; }
public float SegmentSpeed { get; set; }
public DateTime FcdTimeStart { get; set; }
public DateTime FcdTimeFinish { get; set; }
}
}
......@@ -24,9 +24,10 @@ namespace FcdAggregationLauncher.Data {
MapProperty(2, x => x.Reliability);
}
public List<Aggregates> MapRecords(string lines, DateTime dateTime) {
public List<Aggregates> MapRecords(string lines, DateTime startTime, ushort minuteOffset) {
CreateParser();
var finishTime = startTime.AddMinutes(minuteOffset);
var aggregates = _parser.ReadFromString(ReaderOptions, lines)
.Where(x => x.IsValid)
.Select(x => x.Result)
......@@ -35,7 +36,8 @@ namespace FcdAggregationLauncher.Data {
Parallel.ForEach(aggregates, new ParallelOptions() {
MaxDegreeOfParallelism = ParsingOptions.DegreeOfParallelism
}, (point) => {
point.FcdTime = dateTime;
point.FcdTimeStart = startTime;
point.FcdTimeFinish = finishTime;
});
return aggregates;
......
......@@ -19,50 +19,54 @@ namespace FcdAggregationLauncher.Db {
internal int ExportData(string filePath, ushort minuteOffset) {
var content = File.ReadAllText(filePath);
var timestamp = DateTime.UtcNow.ToStartingTimestep(minuteOffset);
var data = new AggregatesMapper().MapRecords(content, timestamp);
var startTime = DateTime.UtcNow.ToStartingTimestep(minuteOffset);
Console.WriteLine($"[{DateTime.Now}]\tData Start Time:\t{startTime.ToShortFormat()}");
GetPartitionTable(out string schemaName, out string tableName, timestamp);
if (GetPartitionTable(out string schemaName, out string tableName, startTime)) {
var data = new AggregatesMapper().MapRecords(content, startTime, minuteOffset);
try {
using (var connection = new NpgsqlConnection(ConnectionString)) {
connection.Open();
DbMapper.MapData(schemaName, tableName).SaveAll(connection, data);
try {
using (var connection = new NpgsqlConnection(ConnectionString)) {
connection.Open();
DbMapper.MapData(schemaName, tableName).SaveAll(connection, data);
}
}
}
catch (Exception) {
Console.WriteLine("Can't store to a partition table");
return 0;
catch (Exception) {
Console.WriteLine("Can't store to a partition table");
}
return data.Count;
}
return data.Count;
return 0;
}
private void GetPartitionTable(out string schemaName, out string tableName, DateTime partitionTime) {
private bool GetPartitionTable(out string schemaName, out string tableName, DateTime partitionTime) {
schemaName = String.Empty;
tableName = String.Empty;
var procedureName = "fcd_prototype.create_partition_trnava_fcd";
try {
using (var connection = new NpgsqlConnection(ConnectionString))
using (var cmd = new NpgsqlCommand(procedureName, connection)) {
connection.Open();
using (var connection = new NpgsqlConnection(ConnectionString))
using (var cmd = new NpgsqlCommand(procedureName, connection)) {
connection.Open();
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandTimeout = 0;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandTimeout = 0;
cmd.Parameters.AddWithValue("time_value", NpgsqlTypes.NpgsqlDbType.TimestampTZ, partitionTime);
cmd.Parameters.AddWithValue("time_value", NpgsqlTypes.NpgsqlDbType.TimestampTZ, partitionTime);
try {
var result = (string)cmd.ExecuteScalar();
schemaName = result.Split('.')[0];
tableName = result.Split('.')[1];
}
catch (Exception) {
Console.WriteLine("Can't get partition table");
}
}
catch (Exception) {
Console.WriteLine("Can't get partition table");
return false;
}
return true;
}
}
}
......@@ -9,9 +9,10 @@ namespace FcdAggregationLauncher.Db {
internal static PostgreSQLCopyHelper<Aggregates> MapData(string schemaName, string tableName) {
return new PostgreSQLCopyHelper<Aggregates>(schemaName, tableName)
.MapReal("speed", x => x.SegmentSpeed)
.MapTimeStamp("fcd_time", x => x.FcdTime)
.MapReal("reliability", x => x.Reliability)
.MapBigInt("segment_gid", x => (long?)x.SegmentId);
.MapBigInt("segment_gid", x => (long?)x.SegmentId)
.MapTimeStamp("fcd_time_end", x => x.FcdTimeFinish)
.MapTimeStamp("fcd_time_start", x => x.FcdTimeStart);
}
}
}
......@@ -7,6 +7,10 @@ namespace FcdAggregationLauncher.Extensions {
return dateTime.ToString("yyyy-MM-dd'_'HH-mm-ss");
}
public static string ToShortFormat(this DateTime dateTime) {
return dateTime.ToString("HH:mm:ss.fff");
}
public static DateTime ToStartingTimestep(this DateTime dateTime, ushort minuteOffset) {
return dateTime.AddTicks(-(dateTime.Ticks % TimeSpan.FromMinutes(minuteOffset * -1).Ticks));
}
......
......@@ -41,7 +41,7 @@ namespace FcdAggregationLauncher {
int count = Launcher.FireUpMerging(directoryPath);
if (count > 0) {
Console.WriteLine($"[{DateTime.Now}]\tFiles Merged:\t\t{count}");
Console.WriteLine($"{Environment.NewLine}[{DateTime.Now}]\tFiles Merged:\t\t{count}");
}
}
......@@ -51,7 +51,7 @@ namespace FcdAggregationLauncher {
if (count > 0) {
count = Launcher.FireUpDbExport(_dbPassword, outputFilePath);
Console.WriteLine($"[{DateTime.Now}]\tRows Inserted:\t\t{count}");
Console.WriteLine($"[{DateTime.Now}]\tRows Affected:\t\t{count}");
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment