Skip to content
Snippets Groups Projects
Commit 5d71c64a authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

fix: problem with counting values

parent 6f2f14a9
Branches
Tags
No related merge requests found
...@@ -81,8 +81,6 @@ public class Job { ...@@ -81,8 +81,6 @@ public class Job {
this(jobManager, haasClientSupplier); this(jobManager, haasClientSupplier);
setJobDirectory(jobDirectory); setJobDirectory(jobDirectory);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME)); propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
resumeUpload();
resumeDownload();
} }
...@@ -129,6 +127,19 @@ public class Job { ...@@ -129,6 +127,19 @@ public class Job {
} }
} }
public synchronized void resumeUpload() {
if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
synchronization.resumeUpload();
}
}
public synchronized void resumeDownload() {
if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) {
synchronization.resumeDownload();
}
}
public boolean canBeDownload() { public boolean canBeDownload() {
return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED)); return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED));
} }
...@@ -295,6 +306,14 @@ public class Job { ...@@ -295,6 +306,14 @@ public class Job {
} }
} }
public void setDownloadNotifier(ProgressNotifier notifier) {
synchronization.setDownloadNotifier(notifier);
}
public void setUploadNotifier(ProgressNotifier notifier) {
synchronization.setUploadNotifier(notifier);
}
private void setJobDirectory(Path jobDirectory) { private void setJobDirectory(Path jobDirectory) {
this.jobDir = jobDirectory; this.jobDir = jobDirectory;
try { try {
...@@ -316,18 +335,7 @@ public class Job { ...@@ -316,18 +335,7 @@ public class Job {
return haasClientSupplier.get().startFileTransfer(getId(), progress); return haasClientSupplier.get().startFileTransfer(getId(), progress);
} }
private synchronized void resumeUpload() {
if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
synchronization.resumeUpload();
}
}
private synchronized void resumeDownload() {
if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) {
synchronization.resumeDownload();
}
}
private void setName(String name) { private void setName(String name) {
setProperty(JOB_NAME, name); setProperty(JOB_NAME, name);
...@@ -366,12 +374,4 @@ public class Job { ...@@ -366,12 +374,4 @@ public class Job {
setProperty(JOB_CAN_BE_DOWNLOADED, b); setProperty(JOB_CAN_BE_DOWNLOADED, b);
} }
public void setDownloadNotifier(ProgressNotifier notifier) {
synchronization.setDownloadNotifier(notifier);
}
public void setUploadNotifier(ProgressNotifier notifier) {
synchronization.setUploadNotifier(notifier);
}
} }
...@@ -12,89 +12,86 @@ import java.util.function.Supplier; ...@@ -12,89 +12,86 @@ import java.util.function.Supplier;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
import cz.it4i.fiji.scpclient.TransferFileProgress;
public abstract class PersitentSynchronizationProcess<T> { public abstract class PersitentSynchronizationProcess<T> {
private boolean startFinished = true; private boolean startFinished = true;
public static final Logger log = LoggerFactory public static final Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class); .getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class);
private static final TransferFileProgress DUMMY_FILE_PROGRESS = new TransferFileProgress() { private static final TransferFileProgressForHaaSClient DUMMY_FILE_PROGRESS = new TransferFileProgressForHaaSClient(
@Override 0, HaaSClient.DUMMY_PROGRESS_NOTIFIER);
public void dataTransfered(long bytesTransfered) {
}
};
private PersistentIndex<T> index; private PersistentIndex<T> index;
private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>(); private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>();
private SimpleThreadRunner runner; private SimpleThreadRunner runner;
private Supplier<HaaSFileTransfer> fileTransferSupplier; private Supplier<HaaSFileTransfer> fileTransferSupplier;
private Runnable processFinishedNotifier; private Runnable processFinishedNotifier;
private String name;
private ProgressNotifier notifier; private ProgressNotifier notifier;
public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException { public PersitentSynchronizationProcess(ExecutorService service,
Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,
Function<String, T> convertor) throws IOException {
runner = new SimpleThreadRunner(service); runner = new SimpleThreadRunner(service);
this.name = name;
this.fileTransferSupplier = fileTransferSupplier; this.fileTransferSupplier = fileTransferSupplier;
this.processFinishedNotifier = processFinishedNotifier; this.processFinishedNotifier = processFinishedNotifier;
this.index = new PersistentIndex<>(indexFile, convertor); this.index = new PersistentIndex<>(indexFile, convertor);
} }
public synchronized void start() throws IOException { public synchronized void start() throws IOException {
startFinished = false; startFinished = false;
index.clear(); index.clear();
try{ try {
for (T item : getItems()) { for (T item : getItems()) {
index.insert(item); index.insert(item);
toProcessQueue.add(item); toProcessQueue.add(item);
} }
runner.runIfNotRunning(this::doProcess); runner.runIfNotRunning(this::doProcess);
} finally { } finally {
startFinished = true; startFinished = true;
index.storeToFile(); index.storeToFile();
} }
} }
public void stop() throws IOException { public void stop() throws IOException {
toProcessQueue.clear(); toProcessQueue.clear();
index.clear(); index.clear();
notifier.setCount(-1, -1);
} }
public void resume() { public void resume() {
index.fillQueue(toProcessQueue); index.fillQueue(toProcessQueue);
runner.runIfNotRunning(this::doProcess); runner.runIfNotRunning(this::doProcess);
} }
abstract protected Iterable<T> getItems() throws IOException; abstract protected Iterable<T> getItems() throws IOException;
abstract protected void processItem(HaaSFileTransfer tr, T p); abstract protected void processItem(HaaSFileTransfer tr, T p);
private void doProcess(AtomicBoolean reRun) { private void doProcess(AtomicBoolean reRun) {
try(HaaSFileTransfer tr = fileTransferSupplier.get()) { try (HaaSFileTransfer tr = fileTransferSupplier.get()) {
tr.setProgress(getTransferFileProgress(tr)); TransferFileProgressForHaaSClient notifier;
tr.setProgress(notifier = getTransferFileProgress(tr));
while (!toProcessQueue.isEmpty()) { while (!toProcessQueue.isEmpty()) {
T p = toProcessQueue.poll(); T p = toProcessQueue.poll();
String item = p.toString();
log.info(name + "ing: " + p); notifier.addItem(item);
processItem(tr, p); processItem(tr, p);
fileUploaded(p); fileUploaded(p);
log.info(name + "ed: " + p); notifier.itemDone(item);
reRun.set(false); reRun.set(false);
} }
notifier.done();
} finally { } finally {
synchronized (this) { synchronized (this) {
if (startFinished) { if (startFinished) {
...@@ -119,12 +116,11 @@ public abstract class PersitentSynchronizationProcess<T> { ...@@ -119,12 +116,11 @@ public abstract class PersitentSynchronizationProcess<T> {
this.notifier = notifier; this.notifier = notifier;
} }
private TransferFileProgress getTransferFileProgress(HaaSFileTransfer tr) { private TransferFileProgressForHaaSClient getTransferFileProgress(HaaSFileTransfer tr) {
if(notifier == null) { if (notifier == null) {
return DUMMY_FILE_PROGRESS; return DUMMY_FILE_PROGRESS;
} }
return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier); return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier);
} }
} }
...@@ -58,7 +58,7 @@ public class Synchronization { ...@@ -58,7 +58,7 @@ public class Synchronization {
} }
public void setDownloadNotifier(ProgressNotifier notifier) { public void setDownloadNotifier(ProgressNotifier notifier) {
// TODO Auto-generated method stub downloadProcess.setNotifier(notifier);
} }
...@@ -94,7 +94,7 @@ public class Synchronization { ...@@ -94,7 +94,7 @@ public class Synchronization {
private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier,
ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { ExecutorService service, Runnable uploadFinishedNotifier) throws IOException {
return new PersitentSynchronizationProcess<Path>("upload", service, fileTransferSupplier, uploadFinishedNotifier, return new PersitentSynchronizationProcess<Path>(service, fileTransferSupplier, uploadFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) { workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) {
@Override @Override
...@@ -138,7 +138,7 @@ public class Synchronization { ...@@ -138,7 +138,7 @@ public class Synchronization {
public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier,
Runnable processFinishedNotifier) throws IOException { Runnable processFinishedNotifier) throws IOException {
super("download",service, fileTransferSupplier, processFinishedNotifier, super(service, fileTransferSupplier, processFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name); workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name);
} }
......
...@@ -57,4 +57,8 @@ public class TransferFileProgressForHaaSClient implements TransferFileProgress { ...@@ -57,4 +57,8 @@ public class TransferFileProgressForHaaSClient implements TransferFileProgress {
} }
return result; return result;
} }
public void done() {
notifier.done();
}
} }
...@@ -73,6 +73,14 @@ public class BenchmarkJobManager { ...@@ -73,6 +73,14 @@ public class BenchmarkJobManager {
computationAccessor = getComputationAccessor(); computationAccessor = getComputationAccessor();
} }
public void setDownloadNotifier(Progress progress) {
job.setDownloadNotifier(convertTo(progress));
}
public void setUploadNotifier(Progress progress) {
job.setUploadNotifier(convertTo(progress));
}
public synchronized void startJob(Progress progress) throws IOException { public synchronized void startJob(Progress progress) throws IOException {
job.uploadFile(Constants.CONFIG_YAML, new P_ProgressNotifierAdapter(progress)); job.uploadFile(Constants.CONFIG_YAML, new P_ProgressNotifierAdapter(progress));
String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML)); String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML));
...@@ -208,11 +216,20 @@ public class BenchmarkJobManager { ...@@ -208,11 +216,20 @@ public class BenchmarkJobManager {
return computationAccessor.getActualOutput(content); return computationAccessor.getActualOutput(content);
} }
public void resumeTransfer() {
job.resumeDownload();
job.resumeUpload();
}
@Override @Override
public String toString() { public String toString() {
return "" + getId(); return "" + getId();
} }
private ProgressNotifier convertTo(Progress progress) {
return progress == null ? null : new P_ProgressNotifierAdapter(progress);
}
private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) { private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
JobState state = job.getState(); JobState state = job.getState();
if (state != JobState.Finished) { if (state != JobState.Finished) {
...@@ -400,23 +417,6 @@ public class BenchmarkJobManager { ...@@ -400,23 +417,6 @@ public class BenchmarkJobManager {
Stream<BenchmarkError> taskSpecificErrors = tasks.stream().flatMap(s -> s.getErrors().stream()); Stream<BenchmarkError> taskSpecificErrors = tasks.stream().flatMap(s -> s.getErrors().stream());
return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList()); return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList());
} }
public void setDownloadNotifier(Progress progress) {
if(progress == null) {
job.setDownloadNotifier(null);
} else {
job.setDownloadNotifier(new P_ProgressNotifierAdapter(progress));
}
}
public void setUploadNotifier(Progress downloadProgress) {
if(downloadProgress == null) {
job.setUploadNotifier(null);
} else {
job.setUploadNotifier(new P_ProgressNotifierAdapter(downloadProgress));
}
}
} }
public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException {
......
...@@ -14,13 +14,25 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob ...@@ -14,13 +14,25 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob
public static final Logger log = LoggerFactory public static final Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class); .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class);
private Progress downloadProgress = new P_DownloadProgress(); private P_DownloadProgress downloadProgress = new P_DownloadProgress();
public interface TransferProgress {
public Long getRemainingSeconds();
public boolean isDownloaded();
public boolean isDonwloadind();
public Float getRemainingPercents();
}
public UpdatableBenchmarkJob(BenchmarkJob wrapped, Function<BenchmarkJob, UpdateStatus> updateFunction, public UpdatableBenchmarkJob(BenchmarkJob wrapped, Function<BenchmarkJob, UpdateStatus> updateFunction,
Function<BenchmarkJob, Object> stateProvider) { Function<BenchmarkJob, Object> stateProvider) {
super(wrapped, updateFunction, stateProvider); super(wrapped, updateFunction, stateProvider);
wrapped.setDownloadNotifier(downloadProgress); wrapped.setDownloadNotifier(downloadProgress);
wrapped.resumeTransfer();
} }
public void removed() { public void removed() {
...@@ -28,47 +40,83 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob ...@@ -28,47 +40,83 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob
} }
private class P_DownloadProgress implements Progress {
private class P_DownloadProgress implements Progress, TransferProgress {
private boolean downloading;
private boolean downloaded;
private long start;
private Long remainingSeconds;
private Float remainingPercents;
@Override @Override
public void setTitle(String title) { public void setTitle(String title) {
// TODO Auto-generated method stub
} }
@Override @Override
public void setCount(int count, int total) { public synchronized void setCount(int count, int total) {
log.info("setCount count=" + count + ", total:" + total);
if(total < -1) {
downloading = false;
remainingSeconds = null;
remainingPercents = null;
} else {
long delta = System.currentTimeMillis() - start;
remainingSeconds = (long) ((double) delta / count * (total - count)) / 1000;
remainingPercents = (((float)total - count) / total * 100);
}
fireValueChangedEvent();
} }
@Override @Override
public void addItem(Object item) { public void addItem(Object item) {
// TODO Auto-generated method stub if (!downloading) {
downloaded = false;
downloading = true;
start = System.currentTimeMillis();
}
fireValueChangedEvent();
}
@Override
public void done() {
if (downloading) {
downloaded = true;
}
downloading = false;
remainingSeconds = 0l;
remainingPercents = 0.f;
fireValueChangedEvent();
} }
@Override @Override
public void setItemCount(int count, int total) { public boolean isDownloaded() {
// TODO Auto-generated method stub return downloaded;
} }
@Override @Override
public void itemDone(Object item) { public boolean isDonwloadind() {
// TODO Auto-generated method stub return downloading;
} }
@Override @Override
public void done() { public Long getRemainingSeconds() {
// TODO Auto-generated method stub return remainingSeconds;
} }
}
@Override
public Float getRemainingPercents() {
return remainingPercents;
}
@Override
public void setItemCount(int count, int total) {
}
@Override
public void itemDone(Object item) {
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment