diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index 0f3763a970c728c2bdd31edadf66008d5c6d98cf..c2aa23c2afad67cbec975cf18d453a7c4bdff3f0 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -163,6 +163,8 @@ public class HaaSClient { private String projectId; + private Map<Long, P_FileTransferPool> filetransferPoolMap = new HashMap<>(); + public static ProgressNotifier DUMMY_NOTIFIER = new ProgressNotifier() { @Override @@ -246,10 +248,7 @@ public class HaaSClient { public HaaSFileTransfer startFileTransfer(long jobId, ProgressNotifier notifier) { try { - //TODO it may be usefull reuse fileTransfer for concurrent transfers - //- count using is needed - FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); - return new HaaSFileTransferImp(ft, getSessionID(), jobId, getFileTransfer(), getScpClient(ft), notifier); + return getFileTransferMethod(jobId, notifier); } catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) { throw new HaaSClientException(e); } @@ -306,7 +305,7 @@ public class HaaSClient { throw new HaaSClientException(e); } } - + public Collection<String> getChangedFiles(long jobId) { try { return Arrays.asList(getFileTransfer().listChangedFilesForJob(jobId, getSessionID())); @@ -314,8 +313,7 @@ public class HaaSClient { throw new HaaSClientException(e); } } - - + public void cancelJob(Long jobId) { try { getJobManagement().cancelJob(jobId, getSessionID()); @@ -324,6 +322,27 @@ public class HaaSClient { } } + private HaaSFileTransferImp getFileTransferMethod(long jobId, ProgressNotifier notifier) + throws RemoteException, UnsupportedEncodingException, ServiceException, JSchException { + P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId, id -> new P_FileTransferPool(id)); + FileTransferMethodExt ft = pool.obtain(); + try { + return new HaaSFileTransferImp(ft, getScpClient(ft), notifier) { + public void close() { + super.close(); + try { + pool.release(); + } catch (RemoteException | ServiceException e) { + throw new HaaSClientException(e); + } + }; + }; + } catch (UnsupportedEncodingException | JSchException e) { + pool.release(); + throw e; + } + } + private void doSubmitJob(long jobId) throws RemoteException, ServiceException { getJobManagement().submitJob(jobId, getSessionID()); } @@ -471,4 +490,43 @@ public class HaaSClient { } } + private interface P_Supplier<T> { + + T get() throws RemoteException, ServiceException; + } + + private interface P_Consumer<T> { + + void accept(T val) throws RemoteException, ServiceException; + } + + private class P_FileTransferPool { + private FileTransferMethodExt holded; + private int counter; + private final P_Supplier<FileTransferMethodExt> factory; + private final P_Consumer<FileTransferMethodExt> destroyer; + + public P_FileTransferPool(long jobId) { + this.factory = () -> getFileTransfer().getFileTransferMethod(jobId, getSessionID()); + this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val, sessionID); + ; + } + + public synchronized FileTransferMethodExt obtain() throws RemoteException, ServiceException { + if (holded == null) { + holded = factory.get(); + } + counter++; + return holded; + } + + public synchronized void release() throws RemoteException, ServiceException { + if (--counter == 0) { + destroyer.accept(holded); + holded = null; + } + } + + } + } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java index 14146fbb3d24f1e9b7bfa4415db3a32cdf70d0fd..c48e2e04997072c829637e73ea714104aea68682 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java @@ -4,7 +4,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; -import java.rmi.RemoteException; import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; @@ -18,7 +17,6 @@ import com.jcraft.jsch.JSchException; import cz.it4i.fiji.haas_java_client.HaaSClient.P_ProgressNotifierDecorator4Size; import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt; -import cz.it4i.fiji.haas_java_client.proxy.FileTransferWsSoap; import cz.it4i.fiji.scpclient.ScpClient; class HaaSFileTransferImp implements HaaSFileTransfer { @@ -28,30 +26,17 @@ class HaaSFileTransferImp implements HaaSFileTransfer { private FileTransferMethodExt ft; private ScpClient scpClient; - private FileTransferWsSoap fileTransfer; - private String sessionId; - private long jobId; private ProgressNotifier notifier; - public HaaSFileTransferImp(FileTransferMethodExt ft, String sessionId, long jobId, FileTransferWsSoap fileTransfer, - ScpClient scpClient, ProgressNotifier notifier) { + public HaaSFileTransferImp(FileTransferMethodExt ft, ScpClient scpClient, ProgressNotifier notifier) { this.ft = ft; this.scpClient = scpClient; - this.fileTransfer = fileTransfer; - this.sessionId = sessionId; - this.jobId = jobId; this.notifier = notifier; } @Override public void close() { scpClient.close(); - try { - fileTransfer.endFileTransfer(jobId, ft, sessionId); - } catch (RemoteException e) { - throw new HaaSClientException(e); - } - } @Override diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient2.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestConcurentAccessToHaaSFileTransfer.java similarity index 80% rename from haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient2.java rename to haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestConcurentAccessToHaaSFileTransfer.java index 8667667a4074772fd2eacdf8e942b138cb3677e3..cdeedef7f7bcb4a58bdd249520d82c5dfa6eae2b 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient2.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestConcurentAccessToHaaSFileTransfer.java @@ -8,20 +8,17 @@ import javax.xml.rpc.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestHaaSJavaClient2 { +public class TestConcurentAccessToHaaSFileTransfer { - private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestHaaSJavaClient2.class); + private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestConcurentAccessToHaaSFileTransfer.class); public static void main(String[] args) throws ServiceException, IOException { HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600, 7l, "OPEN-12-20")); HaaSFileTransfer tr1 = client.startFileTransfer(250, new DummyProgressNotifier()); - HaaSFileTransfer tr2 = client.startFileTransfer(250, new DummyProgressNotifier()); - + HaaSFileTransfer tr2 = client.startFileTransfer(249, new DummyProgressNotifier()); log.info("config.yaml - size:" + tr1.obtainSize(Arrays.asList("config.yaml"))); - log.info("config.yaml - size:" + tr2.obtainSize(Arrays.asList("config.yaml"))); - tr1.close(); - + log.info("config.yaml - size:" + tr2.obtainSize(Arrays.asList("config.yaml"))); tr2.close(); } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java index 2786964c7ac8c2a9ea9de2e9fb684a7496acfed2..2b3de110b2065a43dbcf1581996bd65bd2bd797e 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java @@ -78,15 +78,12 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl private Timer timer; private ObservableTaskRegistry registry; private ExecutorService executorServiceWS; - private ExecutorService executorServiceScp; private Executor executorFx = new FXFrameExecutorService(); private Window root; public SPIMPipelineProgressViewController() { executorServiceWS = Executors.newSingleThreadExecutor(); - executorServiceScp = Executors.newSingleThreadExecutor(); init(); - } public SPIMPipelineProgressViewController(BenchmarkJob job) { @@ -108,7 +105,6 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl public void close() { timer.cancel(); executorServiceWS.shutdown(); - executorServiceScp.shutdown(); } @Override @@ -132,7 +128,7 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl } private void proof(ObservableValue<Task> task, int columnIndex) { - ModalDialogs.doModal(new TaskComputationWindow(root, task.getValue().getComputations().get(columnIndex - 1), executorServiceScp), + ModalDialogs.doModal(new TaskComputationWindow(root, task.getValue().getComputations().get(columnIndex - 1)), WindowConstants.DISPOSE_ON_CLOSE); } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java index d7127acdb322381a970b08b9fe0c5ac9d0d3a0e2..5b2d5efc55fb6cd9d054613b873ef6b546e730f1 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java @@ -6,9 +6,6 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -19,7 +16,6 @@ import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation; import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation.Log; import javafx.beans.value.ObservableValue; import javafx.beans.value.ObservableValueBase; -//TASK: fix occasional auth fails with ssh //TASK: improve performance public class TaskComputationAdapter implements Closeable { @@ -34,25 +30,15 @@ public class TaskComputationAdapter implements Closeable { private Timer timer; - private ExecutorService scpExecutor; - - public TaskComputationAdapter(TaskComputation computation, ExecutorService scpExecutor) { + public TaskComputationAdapter(TaskComputation computation) { this.computation = computation; - this.scpExecutor = scpExecutor; timer = new Timer(); } public void init() { - Future<?> future = scpExecutor.submit(() -> { - Map<String, Long> sizes = computation.getOutFileSizes(); - computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, sizes.get(outputFile))); - computation.getLogs().forEach(log -> logs.add(new ObservableLog(log))); - }); - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - log.error(e.getMessage(), e); - } + Map<String, Long> sizes = computation.getOutFileSizes(); + computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, sizes.get(outputFile))); + computation.getLogs().forEach(log -> logs.add(new ObservableLog(log))); synchronized (this) { if(timer != null) { timer.schedule(new P_TimerTask(), Constants.HAAS_TIMEOUT, Constants.HAAS_TIMEOUT); @@ -172,18 +158,12 @@ public class TaskComputationAdapter implements Closeable { @Override public void run() { - try { - scpExecutor.submit(() -> { - Map<String, Long> sizes = computation.getOutFileSizes(); - Map<String, Log> logs = computation.getLogs().stream() - .collect(Collectors.<Log, String, Log>toMap((Log log) -> log.getName(), (Log log) -> log)); - TaskComputationAdapter.this.logs - .forEach(log -> ((ObservableLog) log).setContentValue(logs.get(log.getName()))); - outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get(value.getValue().getName()))); - }).get(); - } catch (InterruptedException | ExecutionException e) { - log.error(e.getMessage(), e); - } + Map<String, Long> sizes = computation.getOutFileSizes(); + Map<String, Log> logs = computation.getLogs().stream() + .collect(Collectors.<Log, String, Log>toMap((Log log) -> log.getName(), (Log log) -> log)); + TaskComputationAdapter.this.logs + .forEach(log -> ((ObservableLog) log).setContentValue(logs.get(log.getName()))); + outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get(value.getValue().getName()))); } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationControl.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationControl.java index 7d0891e6b27c9f36d11edca3b4263b969d814ce5..c354adf04b1badb986ea9aa9d159c1fa0bc0e27a 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationControl.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationControl.java @@ -39,12 +39,10 @@ public class TaskComputationControl extends TabPane implements CloseableControl, private TaskComputation computation; - private ExecutorService scpExecutor; - public TaskComputationControl(TaskComputation computation, ExecutorService scpExecutor) { + public TaskComputationControl(TaskComputation computation) { JavaFXRoutines.initRootAndController("TaskComputationView.fxml", this); this.computation = computation; - this.scpExecutor = scpExecutor; } @Override @@ -53,7 +51,7 @@ public class TaskComputationControl extends TabPane implements CloseableControl, ProgressDialog dialog = ModalDialogs.doModal(new ProgressDialog(parameter, "Updating infos..."), WindowConstants.DO_NOTHING_ON_CLOSE); try { - adapter = new TaskComputationAdapter(computation, scpExecutor); + adapter = new TaskComputationAdapter(computation); adapter.init(); } finally { dialog.done(); diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationWindow.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationWindow.java index e5226cfa286134c7d7ada35c8433d85d3db5a6f7..98f8f2aed4ea8afcefc63d18b253940befd16961 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationWindow.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationWindow.java @@ -1,7 +1,6 @@ package cz.it4i.fiji.haas_spim_benchmark.ui; import java.awt.Window; -import java.util.concurrent.ExecutorService; import cz.it4i.fiji.haas.ui.FXFrame; import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation; @@ -10,8 +9,8 @@ public class TaskComputationWindow extends FXFrame<TaskComputationControl> { private static final long serialVersionUID = 1L; - public TaskComputationWindow(Window applicationFrame,TaskComputation computation, ExecutorService scpExecutor) { - super(applicationFrame,()->new TaskComputationControl(computation, scpExecutor)); + public TaskComputationWindow(Window applicationFrame,TaskComputation computation) { + super(applicationFrame,()->new TaskComputationControl(computation)); }