Select Git revision
.clang-tidy
AbstractFileSystemManager.cs 10.25 KiB
using System;
using System.Collections.Generic;
using System.Linq;
using HaaSMiddleware.DomainObjects.ClusterInformation;
using HaaSMiddleware.DomainObjects.FileTransfer;
using HaaSMiddleware.DomainObjects.JobManagement;
using HaaSMiddleware.DomainObjects.JobManagement.JobInformation;
using HaaSMiddleware.MiddlewareUtils;
namespace HaaSMiddleware.FileTransferFramework {
public abstract class AbstractFileSystemManager : IRexFileSystemManager {
#region Instance Fields
protected Dictionary<SynchronizableFiles, Dictionary<string, IFileSynchronizer>> _fileSynchronizers;
protected FileTransferMethod _fileSystem;
protected FileSystemFactory _synchronizerFactory;
#endregion
#region Constructors
public AbstractFileSystemManager(FileTransferMethod configuration, FileSystemFactory synchronizerFactory) {
this._fileSystem = configuration;
this._synchronizerFactory = synchronizerFactory;
this._fileSynchronizers = new Dictionary<SynchronizableFiles, Dictionary<string, IFileSynchronizer>>();
}
#endregion
#region Abstract Methods
public abstract byte[] DownloadFileFromCluster(SubmittedJobInfo jobInfo, string relativeFilePath);
public abstract void DeleteSessionFromCluster(SubmittedJobInfo jobInfo);
protected abstract void CopyAll(string source, string target, bool overwrite, DateTime? lastModificationLimit,
string[] excludedFiles, ClusterAuthenticationCredentials credentials);
protected abstract ICollection<string> ListChangedFilesForTask(string taskClusterDirectoryPath, DateTime? jobSubmitTime, ClusterAuthenticationCredentials clusterAuthenticationCredentials);
protected abstract IFileSynchronizer CreateFileSynchronizer(FullFileSpecification fileInfo, ClusterAuthenticationCredentials credentials);
#endregion
#region IRexFileSystemManager Members
public virtual void CopyInputFilesToCluster(SubmittedJobInfo jobInfo, string localJobDirectory) {
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(
_fileSystem.SharedBasepath, jobInfo.Specification);
/*foreach (Task task in jobSpecification.JobSpecification.Tasks)
{
string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(jobClusterDirectoryPath, task.ClusterTaskSubdirectory);
CopyAll(task.WorkDirectory, taskClusterDirectoryPath, false, null);
}*/
// copy the whole folder (data can be referred outside
// of the task's work directory)
CopyAll(localJobDirectory, jobClusterDirectoryPath, false, null, null, jobInfo.Specification.ClusterUser);
}
public virtual ICollection<JobFileContent> CopyStdOutputFilesFromCluster(SubmittedJobInfo jobInfo) {
return PerformSynchronizationForType(jobInfo, SynchronizableFiles.StandardOutputFile);
}
public virtual ICollection<JobFileContent> CopyStdErrorFilesFromCluster(SubmittedJobInfo jobInfo) {
return PerformSynchronizationForType(jobInfo, SynchronizableFiles.StandardErrorFile);
}
public virtual ICollection<JobFileContent> CopyProgressFilesFromCluster(SubmittedJobInfo jobInfo) {
return PerformSynchronizationForType(jobInfo, SynchronizableFiles.ProgressFile);
}
public virtual ICollection<JobFileContent> CopyLogFilesFromCluster(SubmittedJobInfo jobInfo) {
return PerformSynchronizationForType(jobInfo, SynchronizableFiles.LogFile);
}
public ICollection<JobFileContent> DownloadPartOfJobFileFromCluster(SubmittedTaskInfo taskInfo, SynchronizableFiles fileType, long offset) {
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(_fileSystem.SharedBasepath, taskInfo.Specification.JobSpecification); string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(jobClusterDirectoryPath, taskInfo.Specification.ClusterTaskSubdirectory);
FullFileSpecification fileInfo = CreateSynchronizableFileInfoForType(taskInfo.Specification, taskClusterDirectoryPath, fileType);
IFileSynchronizer synchronizer = CreateFileSynchronizer(fileInfo, taskInfo.Specification.JobSpecification.ClusterUser);
synchronizer.Offset = offset;
synchronizer.SyncFileInfo.DestinationDirectory = null;
ICollection<JobFileContent> result = synchronizer.SynchronizeFiles();
if ( result != null ) {
foreach ( JobFileContent content in result ) {
content.FileType = fileType;
content.SubmittedTaskInfoId = taskInfo.Id;
}
}
return result;
}
public virtual void CopyCreatedFilesFromCluster(SubmittedJobInfo jobInfo, DateTime jobSubmitTime) {
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(_fileSystem.SharedBasepath, jobInfo.Specification);
foreach (SubmittedTaskInfo taskInfo in jobInfo.Tasks) {
string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(
jobClusterDirectoryPath, taskInfo.Specification.ClusterTaskSubdirectory);
string[] excludedFiles = {
taskInfo.Specification.LogFile.RelativePath,
taskInfo.Specification.ProgressFile.RelativePath,
taskInfo.Specification.StandardOutputFile,
taskInfo.Specification.StandardErrorFile
};
CopyAll(taskClusterDirectoryPath, taskInfo.Specification.LocalDirectory, true, jobSubmitTime, excludedFiles, jobInfo.Specification.ClusterUser);
}
}
public ICollection<string> ListChangedFilesForJob(SubmittedJobInfo jobInfo, DateTime jobSubmitTime) {
List<string> result = new List<string>();
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(_fileSystem.SharedBasepath, jobInfo.Specification);
foreach ( SubmittedTaskInfo taskInfo in jobInfo.Tasks ) {
string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(jobClusterDirectoryPath, taskInfo.Specification.ClusterTaskSubdirectory);
result.AddRange(ListChangedFilesForTask(taskClusterDirectoryPath, jobSubmitTime, jobInfo.Specification.ClusterUser));
}
return result;
}
#endregion
#region Local Methods
protected virtual ICollection<JobFileContent> PerformSynchronizationForType(SubmittedJobInfo jobInfo, SynchronizableFiles fileType) {
List<JobFileContent> result = new List<JobFileContent>();
if (!_fileSynchronizers.ContainsKey(fileType))
{
_fileSynchronizers[fileType] = new Dictionary<string, IFileSynchronizer>(jobInfo.Tasks.Count);
}
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(_fileSystem.SharedBasepath, jobInfo.Specification);
foreach (SubmittedTaskInfo taskInfo in jobInfo.Tasks)
{
string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(jobClusterDirectoryPath,
taskInfo.Specification.ClusterTaskSubdirectory);
FullFileSpecification fileInfo = CreateSynchronizableFileInfoForType(taskInfo.Specification, taskClusterDirectoryPath, fileType);
string sourceFilePath = FileSystemUtils.ConcatenatePaths(fileInfo.SourceDirectory, fileInfo.RelativePath);
if (!_fileSynchronizers[fileType].ContainsKey(sourceFilePath))
{
_fileSynchronizers[fileType][sourceFilePath] = CreateFileSynchronizer(fileInfo, jobInfo.Specification.ClusterUser);
}
ICollection<JobFileContent> subresult = _fileSynchronizers[fileType][sourceFilePath].SynchronizeFiles();
if ( subresult != null ) {
foreach ( JobFileContent content in subresult ) {
content.FileType = fileType;
content.SubmittedTaskInfoId = taskInfo.Id;
result.Add(content);
}
}
}
return result;
}
protected virtual void CreateSynchronizersForType(JobSpecification jobSpecification, SynchronizableFiles fileType) {
_fileSynchronizers[fileType] =
new Dictionary<string, IFileSynchronizer>(jobSpecification.Tasks.Count);
string jobClusterDirectoryPath = FileSystemUtils.GetJobClusterDirectoryPath(_fileSystem.SharedBasepath, jobSpecification);
foreach (TaskSpecification task in jobSpecification.Tasks) {
string taskClusterDirectoryPath = FileSystemUtils.GetTaskClusterDirectoryPath(jobClusterDirectoryPath,
task.ClusterTaskSubdirectory);
FullFileSpecification fileInfo = CreateSynchronizableFileInfoForType(task, taskClusterDirectoryPath, fileType);
string sourceFilePath = FileSystemUtils.ConcatenatePaths(fileInfo.SourceDirectory, fileInfo.RelativePath);
if (!_fileSynchronizers[fileType].ContainsKey(sourceFilePath)) {
_fileSynchronizers[fileType][sourceFilePath] = CreateFileSynchronizer(fileInfo, jobSpecification.ClusterUser);
}
}
}
protected virtual FullFileSpecification CreateSynchronizableFileInfoForType(TaskSpecification task, string taskClusterDirectoryPath,
SynchronizableFiles fileType) {
FullFileSpecification fileInfo = new FullFileSpecification {
DestinationDirectory = task.LocalDirectory,
SourceDirectory = taskClusterDirectoryPath,
};
CompleteFileInfoForType(fileInfo, task, fileType);
return fileInfo;
}
protected virtual void CompleteFileInfoForType(FileSpecification fileInfo, TaskSpecification task, SynchronizableFiles fileType) {
switch (fileType) {
case SynchronizableFiles.StandardOutputFile:
fileInfo.RelativePath = task.StandardOutputFile;
fileInfo.NameSpecification = FileNameSpecification.FullName;
fileInfo.SynchronizationType = FileSynchronizationType.IncrementalAppend;
break;
case SynchronizableFiles.StandardErrorFile:
fileInfo.RelativePath = task.StandardErrorFile;
fileInfo.NameSpecification = FileNameSpecification.FullName;
fileInfo.SynchronizationType = FileSynchronizationType.IncrementalAppend;
break;
case SynchronizableFiles.LogFile:
fileInfo.RelativePath = task.LogFile.RelativePath;
fileInfo.NameSpecification = task.LogFile.NameSpecification;
fileInfo.SynchronizationType = task.LogFile.SynchronizationType;
break;
case SynchronizableFiles.ProgressFile:
fileInfo.RelativePath = task.ProgressFile.RelativePath;
fileInfo.NameSpecification = task.ProgressFile.NameSpecification;
fileInfo.SynchronizationType = task.ProgressFile.SynchronizationType;
break;
}
}
protected virtual ICollection<JobFileContent> SynchronizeAllFilesOfType(SynchronizableFiles fileType) {
List<JobFileContent> results = new List<JobFileContent>( /*_fileSynchronizers[fileType].Values.Count*/);
foreach (IFileSynchronizer synchronizer in _fileSynchronizers[fileType].Values) {
ICollection<JobFileContent> result = synchronizer.SynchronizeFiles();
if (result != null)
results.AddRange(result);
}
return results;
}
#endregion
}
}