diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java index e150cda76567db723677d8160c0f3467d7b1c4f0..865c4a03b3ad3d020c124372c26b13649d281510 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java @@ -153,6 +153,9 @@ public abstract class PersistentSynchronizationProcess<T> { fileTransfered(p); } catch (InterruptedIOException | HaaSClientException e) { + if (e instanceof HaaSClientException) { + log.warn("process ", e); + } toProcessQueue.clear(); interrupted = true; } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index 4504b0c775ca64da4db05322c47726875639802f..e0d7f1db1aa38437537ef07e42337a3067d87555 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -4,7 +4,6 @@ package cz.it4i.fiji.haas.data_transfer; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -18,6 +17,7 @@ import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.slf4j.Logger; @@ -140,10 +140,9 @@ public class Synchronization implements Closeable { @Override protected Collection<Path> getItems() throws IOException { - try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory, - Synchronization.this::canUpload)) { - return StreamSupport.stream(ds.spliterator(), false).filter( - p -> !filesUploaded.contains(p)).collect(Collectors.toList()); + try (Stream<Path> ds = Files.walk(inputDirectory)) { + return ds.filter(p -> !Files.isDirectory(p) && canUpload(p) && + !filesUploaded.contains(p)).collect(Collectors.toList()); } } @@ -152,15 +151,14 @@ public class Synchronization implements Closeable { protected void processItem(final HaaSFileTransfer tr, final Path p) throws InterruptedIOException { - final UploadingFile uf = new UploadingFileImpl(p); + final UploadingFile uf = new UploadingFileImpl(p, inputDirectory); tr.upload(uf); filesUploaded.insert(inputDirectory.resolve(p.toString())); try { filesUploaded.storeToWorkingFile(); - } - catch (final IOException e) { + } catch (final IOException e) { log.error(e.getMessage(), e); - } + } } @Override diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java index ace1f918ff12b9144c186b9a3a3e5128567ec25c..b27c73ec9a8c8bc2dd11024c69e6e96c2d2f02a0 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java @@ -41,8 +41,7 @@ class HaaSFileTransferImp implements HaaSFileTransfer { @Override public void upload(final UploadingFile file) throws InterruptedIOException { - final String destFile = "'" + ft.getSharedBasepath() + "/" + file - .getName() + "'"; + final String destFile = ft.getSharedBasepath() + "/" + file.getName(); try (InputStream is = file.getInputStream()) { if (!scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress)) diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/UploadingFileImpl.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/UploadingFileImpl.java index ad7cff54de090d9d0d13261ebd0a66fb6d10d0dc..62b3356f9fb3893d6f9ca198458c4439a2d8f99c 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/UploadingFileImpl.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/UploadingFileImpl.java @@ -14,8 +14,16 @@ public class UploadingFileImpl implements UploadingFile { private final Path path; - public UploadingFileImpl(Path path) { - this.path = path; + private Path baseDirPath; + + + public UploadingFileImpl(Path p) { + this(p, null); + } + + public UploadingFileImpl(Path p, Path baseDirPath) { + this.path = p; + this.baseDirPath = baseDirPath; } @Override @@ -25,7 +33,10 @@ public class UploadingFileImpl implements UploadingFile { @Override public String getName() { - return path.getFileName().toString(); + if (baseDirPath == null || !path.startsWith(baseDirPath)) { + return path.getFileName().toString(); + } + return baseDirPath.relativize(path).toString(); } @Override diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ObservableBenchmarkJob.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ObservableBenchmarkJob.java index ef8c5323ce35e8cbc2d43bc8c76c4051de7a3441..82f7426a8e49c42c7bf23c83c8815d08afa67891 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ObservableBenchmarkJob.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ObservableBenchmarkJob.java @@ -126,7 +126,7 @@ public class ObservableBenchmarkJob extends @Override public synchronized void setCount(int count, int total) { - if (total < -1) { + if (total <= -1) { clearProgress(); } else if (start != null) { diff --git a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java index 9b605b08c1c614a46c0c808ac112b1f3bc1b0447..82026467f90f840359cbd08d5ae5ba59cb467a63 100644 --- a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java +++ b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java @@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; import java.nio.channels.ClosedByInterruptException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; @@ -33,9 +34,11 @@ import cz.it4i.fiji.commons.DoActionEventualy; public class ScpClient implements Closeable { + private static final String NO_SUCH_FILE_OR_DIRECTORY_ERROR_TEXT = "No such file or directory"; + public static final Logger log = LoggerFactory.getLogger( cz.it4i.fiji.scpclient.ScpClient.class); - + private static final int MAX_NUMBER_OF_CONNECTION_ATTEMPTS = 5; private static final long TIMEOUT_BETWEEN_CONNECTION_ATTEMPTS = 500; @@ -162,6 +165,7 @@ public class ScpClient implements Closeable { // System.out.println("filesize="+filesize+", file="+file); // send '\0' + buf[0] = 0; out.write(buf, 0, 1); out.flush(); @@ -223,68 +227,28 @@ public class ScpClient implements Closeable { long lastModified, TransferFileProgress progress) throws JSchException, IOException { - boolean ptimestamp = true; - // exec 'scp -t rfile' remotely - String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + fileName; - Channel channel = getConnectedSession().openChannel("exec"); - ((ChannelExec) channel).setCommand(command); - // get I/O streams for remote scp - try (OutputStream out = channel.getOutputStream(); - InputStream in = channel.getInputStream()) - { - channel.connect(); - if (checkAck(in) != 0) { - return false; - } - - if (ptimestamp) { - command = "T " + (lastModified / 1000) + " 0"; - // The access time should be sent here, - // but it is not accessible with JavaAPI ;-< - command += (" " + (lastModified / 1000) + " 0\n"); - out.write(command.getBytes()); - out.flush(); - if (checkAck(in) != 0) { - return false; + int noSuchFileExceptionThrown = 0; + do { + + try { + return scp2Server(is, fileName, length, lastModified, progress); + } catch (NoSuchFileException e) { + if(noSuchFileExceptionThrown > MAX_NUMBER_OF_CONNECTION_ATTEMPTS) { + break; } + if (noSuchFileExceptionThrown > 0) { + try { + Thread.sleep(TIMEOUT_BETWEEN_CONNECTION_ATTEMPTS); + } + catch (InterruptedException exc) { + } + } + mkdir(e.getFile()); + noSuchFileExceptionThrown++; + continue; } - - // send "C0644 filesize filename", where filename should not include '/' - long filesize = length; - command = "C0644 " + filesize + " "; - command += Paths.get(fileName).getFileName().toString(); - command += "\n"; - out.write(command.getBytes()); - out.flush(); - if (checkAck(in) != 0) { - return false; - } - byte[] buf = new byte[getBufferSize()]; - // send a content of lfile - while (true) { - int len = is.read(buf, 0, buf.length); - if (len <= 0) break; - out.write(buf, 0, len); // out.flush(); - progress.dataTransfered(len); - } - // send '\0' - buf[0] = 0; - out.write(buf, 0, 1); - out.flush(); - if (checkAck(in) != 0) { - return false; - } - out.close(); - - } - catch (ClosedByInterruptException e) { - Thread.interrupted(); - throw new InterruptedIOException(); - } - finally { - channel.disconnect(); - } - return true; + } while(true); + return false; } public long size(String lfile) throws JSchException, IOException { @@ -427,6 +391,98 @@ public class ScpClient implements Closeable { return session; } + private boolean scp2Server(InputStream is, String fileName, long length, + long lastModified, TransferFileProgress progress) throws JSchException, + IOException, InterruptedIOException + { + boolean ptimestamp = true; + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t '" + fileName + "'"; + Channel channel = getConnectedSession().openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + // get I/O streams for remote scp + try (OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream()) + { + channel.connect(); + if (checkAck(in) != 0) { + return false; + } + + if (ptimestamp) { + command = "T " + (lastModified / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (lastModified / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + return false; + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = length; + command = "C0644 " + filesize + " "; + command += Paths.get(fileName).getFileName().toString(); + command += "\n"; + out.write(command.getBytes()); + out.flush(); + StringBuilder sb = new StringBuilder(); + int result; + if ((result = checkAck(in, sb)) != 0) { + if (result == 1 && sb.toString().contains(NO_SUCH_FILE_OR_DIRECTORY_ERROR_TEXT) ) { + throw new NoSuchFileException(getParent(fileName)); + } + return false; + } + byte[] buf = new byte[getBufferSize()]; + // send a content of lfile + while (true) { + int len = is.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); // out.flush(); + progress.dataTransfered(len); + } + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + return false; + } + out.close(); + + } + catch (ClosedByInterruptException e) { + Thread.interrupted(); + throw new InterruptedIOException(); + } + finally { + channel.disconnect(); + } + return true; + } + + private int mkdir(String file) throws JSchException { + ChannelExec channel = (ChannelExec) getConnectedSession().openChannel("exec"); + channel.setCommand("mkdir -p '" + file + "'"); + try { + channel.connect(); + return channel.getExitStatus(); + } finally { + channel.disconnect(); + } + } + + private String getParent(String fileName) { + int index = fileName.lastIndexOf('/'); + if (index == -1) { + return null; + } + return fileName.substring(0, index); + } + private class P_UserInfo implements UserInfo { @Override @@ -460,6 +516,14 @@ public class ScpClient implements Closeable { } static int checkAck(InputStream in) throws IOException { + StringBuilder sb = new StringBuilder(); + int result = checkAck(in, sb); + if (result != 0) { + log.warn(sb.toString()); + } + return result; + } + static int checkAck(InputStream in, StringBuilder sb) throws IOException { int b = in.read(); // b may be 0 for success, // 1 for error, @@ -469,19 +533,12 @@ public class ScpClient implements Closeable { if (b == -1) return b; if (b == 1 || b == 2) { - StringBuffer sb = new StringBuffer(); int c; do { c = in.read(); sb.append((char) c); } while (c != '\n'); - if (b == 1) { // error - System.out.print(sb.toString()); - } - if (b == 2) { // fatal error - System.out.print(sb.toString()); - } } return b; }