Skip to content
Snippets Groups Projects
Commit 53e465c6 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Fix: solve JAuth exception during concurrent access to HaaS file transf.

Error occured for same sessionid and jobid.
Solved with a pool of file transfer method
parent 5e147e14
Branches
Tags
1 merge request!14Iss1026
......@@ -163,6 +163,8 @@ public class HaaSClient {
private String projectId;
private Map<Long, P_FileTransferPool> filetransferPoolMap = new HashMap<>();
public static ProgressNotifier DUMMY_NOTIFIER = new ProgressNotifier() {
@Override
......@@ -246,10 +248,7 @@ public class HaaSClient {
public HaaSFileTransfer startFileTransfer(long jobId, ProgressNotifier notifier) {
try {
//TODO it may be usefull reuse fileTransfer for concurrent transfers
//- count using is needed
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
return new HaaSFileTransferImp(ft, getSessionID(), jobId, getFileTransfer(), getScpClient(ft), notifier);
return getFileTransferMethod(jobId, notifier);
} catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) {
throw new HaaSClientException(e);
}
......@@ -306,7 +305,7 @@ public class HaaSClient {
throw new HaaSClientException(e);
}
}
public Collection<String> getChangedFiles(long jobId) {
try {
return Arrays.asList(getFileTransfer().listChangedFilesForJob(jobId, getSessionID()));
......@@ -314,8 +313,7 @@ public class HaaSClient {
throw new HaaSClientException(e);
}
}
public void cancelJob(Long jobId) {
try {
getJobManagement().cancelJob(jobId, getSessionID());
......@@ -324,6 +322,27 @@ public class HaaSClient {
}
}
private HaaSFileTransferImp getFileTransferMethod(long jobId, ProgressNotifier notifier)
throws RemoteException, UnsupportedEncodingException, ServiceException, JSchException {
P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId, id -> new P_FileTransferPool(id));
FileTransferMethodExt ft = pool.obtain();
try {
return new HaaSFileTransferImp(ft, getScpClient(ft), notifier) {
public void close() {
super.close();
try {
pool.release();
} catch (RemoteException | ServiceException e) {
throw new HaaSClientException(e);
}
};
};
} catch (UnsupportedEncodingException | JSchException e) {
pool.release();
throw e;
}
}
private void doSubmitJob(long jobId) throws RemoteException, ServiceException {
getJobManagement().submitJob(jobId, getSessionID());
}
......@@ -471,4 +490,43 @@ public class HaaSClient {
}
}
private interface P_Supplier<T> {
T get() throws RemoteException, ServiceException;
}
private interface P_Consumer<T> {
void accept(T val) throws RemoteException, ServiceException;
}
private class P_FileTransferPool {
private FileTransferMethodExt holded;
private int counter;
private final P_Supplier<FileTransferMethodExt> factory;
private final P_Consumer<FileTransferMethodExt> destroyer;
public P_FileTransferPool(long jobId) {
this.factory = () -> getFileTransfer().getFileTransferMethod(jobId, getSessionID());
this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val, sessionID);
;
}
public synchronized FileTransferMethodExt obtain() throws RemoteException, ServiceException {
if (holded == null) {
holded = factory.get();
}
counter++;
return holded;
}
public synchronized void release() throws RemoteException, ServiceException {
if (--counter == 0) {
destroyer.accept(holded);
holded = null;
}
}
}
}
......@@ -4,7 +4,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.rmi.RemoteException;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
......@@ -18,7 +17,6 @@ import com.jcraft.jsch.JSchException;
import cz.it4i.fiji.haas_java_client.HaaSClient.P_ProgressNotifierDecorator4Size;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferWsSoap;
import cz.it4i.fiji.scpclient.ScpClient;
class HaaSFileTransferImp implements HaaSFileTransfer {
......@@ -28,30 +26,17 @@ class HaaSFileTransferImp implements HaaSFileTransfer {
private FileTransferMethodExt ft;
private ScpClient scpClient;
private FileTransferWsSoap fileTransfer;
private String sessionId;
private long jobId;
private ProgressNotifier notifier;
public HaaSFileTransferImp(FileTransferMethodExt ft, String sessionId, long jobId, FileTransferWsSoap fileTransfer,
ScpClient scpClient, ProgressNotifier notifier) {
public HaaSFileTransferImp(FileTransferMethodExt ft, ScpClient scpClient, ProgressNotifier notifier) {
this.ft = ft;
this.scpClient = scpClient;
this.fileTransfer = fileTransfer;
this.sessionId = sessionId;
this.jobId = jobId;
this.notifier = notifier;
}
@Override
public void close() {
scpClient.close();
try {
fileTransfer.endFileTransfer(jobId, ft, sessionId);
} catch (RemoteException e) {
throw new HaaSClientException(e);
}
}
@Override
......
......@@ -8,20 +8,17 @@ import javax.xml.rpc.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestHaaSJavaClient2 {
public class TestConcurentAccessToHaaSFileTransfer {
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestHaaSJavaClient2.class);
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestConcurentAccessToHaaSFileTransfer.class);
public static void main(String[] args) throws ServiceException, IOException {
HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600, 7l, "OPEN-12-20"));
HaaSFileTransfer tr1 = client.startFileTransfer(250, new DummyProgressNotifier());
HaaSFileTransfer tr2 = client.startFileTransfer(250, new DummyProgressNotifier());
HaaSFileTransfer tr2 = client.startFileTransfer(249, new DummyProgressNotifier());
log.info("config.yaml - size:" + tr1.obtainSize(Arrays.asList("config.yaml")));
log.info("config.yaml - size:" + tr2.obtainSize(Arrays.asList("config.yaml")));
tr1.close();
log.info("config.yaml - size:" + tr2.obtainSize(Arrays.asList("config.yaml")));
tr2.close();
}
......
......@@ -78,15 +78,12 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl
private Timer timer;
private ObservableTaskRegistry registry;
private ExecutorService executorServiceWS;
private ExecutorService executorServiceScp;
private Executor executorFx = new FXFrameExecutorService();
private Window root;
public SPIMPipelineProgressViewController() {
executorServiceWS = Executors.newSingleThreadExecutor();
executorServiceScp = Executors.newSingleThreadExecutor();
init();
}
public SPIMPipelineProgressViewController(BenchmarkJob job) {
......@@ -108,7 +105,6 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl
public void close() {
timer.cancel();
executorServiceWS.shutdown();
executorServiceScp.shutdown();
}
@Override
......@@ -132,7 +128,7 @@ public class SPIMPipelineProgressViewController extends BorderPane implements Cl
}
private void proof(ObservableValue<Task> task, int columnIndex) {
ModalDialogs.doModal(new TaskComputationWindow(root, task.getValue().getComputations().get(columnIndex - 1), executorServiceScp),
ModalDialogs.doModal(new TaskComputationWindow(root, task.getValue().getComputations().get(columnIndex - 1)),
WindowConstants.DISPOSE_ON_CLOSE);
}
......
......@@ -6,9 +6,6 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
......@@ -19,7 +16,6 @@ import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation;
import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation.Log;
import javafx.beans.value.ObservableValue;
import javafx.beans.value.ObservableValueBase;
//TASK: fix occasional auth fails with ssh
//TASK: improve performance
public class TaskComputationAdapter implements Closeable {
......@@ -34,25 +30,15 @@ public class TaskComputationAdapter implements Closeable {
private Timer timer;
private ExecutorService scpExecutor;
public TaskComputationAdapter(TaskComputation computation, ExecutorService scpExecutor) {
public TaskComputationAdapter(TaskComputation computation) {
this.computation = computation;
this.scpExecutor = scpExecutor;
timer = new Timer();
}
public void init() {
Future<?> future = scpExecutor.submit(() -> {
Map<String, Long> sizes = computation.getOutFileSizes();
computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, sizes.get(outputFile)));
computation.getLogs().forEach(log -> logs.add(new ObservableLog(log)));
});
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
}
Map<String, Long> sizes = computation.getOutFileSizes();
computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, sizes.get(outputFile)));
computation.getLogs().forEach(log -> logs.add(new ObservableLog(log)));
synchronized (this) {
if(timer != null) {
timer.schedule(new P_TimerTask(), Constants.HAAS_TIMEOUT, Constants.HAAS_TIMEOUT);
......@@ -172,18 +158,12 @@ public class TaskComputationAdapter implements Closeable {
@Override
public void run() {
try {
scpExecutor.submit(() -> {
Map<String, Long> sizes = computation.getOutFileSizes();
Map<String, Log> logs = computation.getLogs().stream()
.collect(Collectors.<Log, String, Log>toMap((Log log) -> log.getName(), (Log log) -> log));
TaskComputationAdapter.this.logs
.forEach(log -> ((ObservableLog) log).setContentValue(logs.get(log.getName())));
outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get(value.getValue().getName())));
}).get();
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
}
Map<String, Long> sizes = computation.getOutFileSizes();
Map<String, Log> logs = computation.getLogs().stream()
.collect(Collectors.<Log, String, Log>toMap((Log log) -> log.getName(), (Log log) -> log));
TaskComputationAdapter.this.logs
.forEach(log -> ((ObservableLog) log).setContentValue(logs.get(log.getName())));
outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get(value.getValue().getName())));
}
}
......
......@@ -39,12 +39,10 @@ public class TaskComputationControl extends TabPane implements CloseableControl,
private TaskComputation computation;
private ExecutorService scpExecutor;
public TaskComputationControl(TaskComputation computation, ExecutorService scpExecutor) {
public TaskComputationControl(TaskComputation computation) {
JavaFXRoutines.initRootAndController("TaskComputationView.fxml", this);
this.computation = computation;
this.scpExecutor = scpExecutor;
}
@Override
......@@ -53,7 +51,7 @@ public class TaskComputationControl extends TabPane implements CloseableControl,
ProgressDialog dialog = ModalDialogs.doModal(new ProgressDialog(parameter, "Updating infos..."),
WindowConstants.DO_NOTHING_ON_CLOSE);
try {
adapter = new TaskComputationAdapter(computation, scpExecutor);
adapter = new TaskComputationAdapter(computation);
adapter.init();
} finally {
dialog.done();
......
package cz.it4i.fiji.haas_spim_benchmark.ui;
import java.awt.Window;
import java.util.concurrent.ExecutorService;
import cz.it4i.fiji.haas.ui.FXFrame;
import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation;
......@@ -10,8 +9,8 @@ public class TaskComputationWindow extends FXFrame<TaskComputationControl> {
private static final long serialVersionUID = 1L;
public TaskComputationWindow(Window applicationFrame,TaskComputation computation, ExecutorService scpExecutor) {
super(applicationFrame,()->new TaskComputationControl(computation, scpExecutor));
public TaskComputationWindow(Window applicationFrame,TaskComputation computation) {
super(applicationFrame,()->new TaskComputationControl(computation));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment