Skip to content
Snippets Groups Projects
Commit a91f5ab1 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

ISS-1207: directory creation

parent dcfcfa6c
No related branches found
No related tags found
No related merge requests found
...@@ -153,6 +153,9 @@ public abstract class PersistentSynchronizationProcess<T> { ...@@ -153,6 +153,9 @@ public abstract class PersistentSynchronizationProcess<T> {
fileTransfered(p); fileTransfered(p);
} }
catch (InterruptedIOException | HaaSClientException e) { catch (InterruptedIOException | HaaSClientException e) {
if (e instanceof HaaSClientException) {
log.warn("process ", e);
}
toProcessQueue.clear(); toProcessQueue.clear();
interrupted = true; interrupted = true;
} }
......
...@@ -4,7 +4,6 @@ package cz.it4i.fiji.haas.data_transfer; ...@@ -4,7 +4,6 @@ package cz.it4i.fiji.haas.data_transfer;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
...@@ -18,6 +17,7 @@ import java.util.concurrent.Executors; ...@@ -18,6 +17,7 @@ import java.util.concurrent.Executors;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -140,10 +140,9 @@ public class Synchronization implements Closeable { ...@@ -140,10 +140,9 @@ public class Synchronization implements Closeable {
@Override @Override
protected Collection<Path> getItems() throws IOException { protected Collection<Path> getItems() throws IOException {
try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory, try (Stream<Path> ds = Files.walk(inputDirectory)) {
Synchronization.this::canUpload)) { return ds.filter(p -> !Files.isDirectory(p) && canUpload(p) &&
return StreamSupport.stream(ds.spliterator(), false).filter( !filesUploaded.contains(p)).collect(Collectors.toList());
p -> !filesUploaded.contains(p)).collect(Collectors.toList());
} }
} }
...@@ -152,15 +151,14 @@ public class Synchronization implements Closeable { ...@@ -152,15 +151,14 @@ public class Synchronization implements Closeable {
protected void processItem(final HaaSFileTransfer tr, final Path p) protected void processItem(final HaaSFileTransfer tr, final Path p)
throws InterruptedIOException throws InterruptedIOException
{ {
final UploadingFile uf = new UploadingFileImpl(p); final UploadingFile uf = new UploadingFileImpl(p, inputDirectory);
tr.upload(uf); tr.upload(uf);
filesUploaded.insert(inputDirectory.resolve(p.toString())); filesUploaded.insert(inputDirectory.resolve(p.toString()));
try { try {
filesUploaded.storeToWorkingFile(); filesUploaded.storeToWorkingFile();
} } catch (final IOException e) {
catch (final IOException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
} }
@Override @Override
......
...@@ -41,8 +41,7 @@ class HaaSFileTransferImp implements HaaSFileTransfer { ...@@ -41,8 +41,7 @@ class HaaSFileTransferImp implements HaaSFileTransfer {
@Override @Override
public void upload(final UploadingFile file) throws InterruptedIOException { public void upload(final UploadingFile file) throws InterruptedIOException {
final String destFile = "'" + ft.getSharedBasepath() + "/" + file final String destFile = ft.getSharedBasepath() + "/" + file.getName();
.getName() + "'";
try (InputStream is = file.getInputStream()) { try (InputStream is = file.getInputStream()) {
if (!scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), if (!scpClient.upload(is, destFile, file.getLength(), file.getLastTime(),
progress)) progress))
......
...@@ -14,8 +14,16 @@ public class UploadingFileImpl implements UploadingFile { ...@@ -14,8 +14,16 @@ public class UploadingFileImpl implements UploadingFile {
private final Path path; private final Path path;
public UploadingFileImpl(Path path) { private Path baseDirPath;
this.path = path;
public UploadingFileImpl(Path p) {
this(p, null);
}
public UploadingFileImpl(Path p, Path baseDirPath) {
this.path = p;
this.baseDirPath = baseDirPath;
} }
@Override @Override
...@@ -25,7 +33,10 @@ public class UploadingFileImpl implements UploadingFile { ...@@ -25,7 +33,10 @@ public class UploadingFileImpl implements UploadingFile {
@Override @Override
public String getName() { public String getName() {
return path.getFileName().toString(); if (baseDirPath == null || !path.startsWith(baseDirPath)) {
return path.getFileName().toString();
}
return baseDirPath.relativize(path).toString();
} }
@Override @Override
......
...@@ -126,7 +126,7 @@ public class ObservableBenchmarkJob extends ...@@ -126,7 +126,7 @@ public class ObservableBenchmarkJob extends
@Override @Override
public synchronized void setCount(int count, int total) { public synchronized void setCount(int count, int total) {
if (total < -1) { if (total <= -1) {
clearProgress(); clearProgress();
} }
else if (start != null) { else if (start != null) {
......
...@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; ...@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List; import java.util.List;
...@@ -33,9 +34,11 @@ import cz.it4i.fiji.commons.DoActionEventualy; ...@@ -33,9 +34,11 @@ import cz.it4i.fiji.commons.DoActionEventualy;
public class ScpClient implements Closeable { public class ScpClient implements Closeable {
private static final String NO_SUCH_FILE_OR_DIRECTORY_ERROR_TEXT = "No such file or directory";
public static final Logger log = LoggerFactory.getLogger( public static final Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.scpclient.ScpClient.class); cz.it4i.fiji.scpclient.ScpClient.class);
private static final int MAX_NUMBER_OF_CONNECTION_ATTEMPTS = 5; private static final int MAX_NUMBER_OF_CONNECTION_ATTEMPTS = 5;
private static final long TIMEOUT_BETWEEN_CONNECTION_ATTEMPTS = 500; private static final long TIMEOUT_BETWEEN_CONNECTION_ATTEMPTS = 500;
...@@ -162,6 +165,7 @@ public class ScpClient implements Closeable { ...@@ -162,6 +165,7 @@ public class ScpClient implements Closeable {
// System.out.println("filesize="+filesize+", file="+file); // System.out.println("filesize="+filesize+", file="+file);
// send '\0' // send '\0'
buf[0] = 0; buf[0] = 0;
out.write(buf, 0, 1); out.write(buf, 0, 1);
out.flush(); out.flush();
...@@ -223,68 +227,28 @@ public class ScpClient implements Closeable { ...@@ -223,68 +227,28 @@ public class ScpClient implements Closeable {
long lastModified, TransferFileProgress progress) throws JSchException, long lastModified, TransferFileProgress progress) throws JSchException,
IOException IOException
{ {
boolean ptimestamp = true; int noSuchFileExceptionThrown = 0;
// exec 'scp -t rfile' remotely do {
String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + fileName;
Channel channel = getConnectedSession().openChannel("exec"); try {
((ChannelExec) channel).setCommand(command); return scp2Server(is, fileName, length, lastModified, progress);
// get I/O streams for remote scp } catch (NoSuchFileException e) {
try (OutputStream out = channel.getOutputStream(); if(noSuchFileExceptionThrown > MAX_NUMBER_OF_CONNECTION_ATTEMPTS) {
InputStream in = channel.getInputStream()) break;
{
channel.connect();
if (checkAck(in) != 0) {
return false;
}
if (ptimestamp) {
command = "T " + (lastModified / 1000) + " 0";
// The access time should be sent here,
// but it is not accessible with JavaAPI ;-<
command += (" " + (lastModified / 1000) + " 0\n");
out.write(command.getBytes());
out.flush();
if (checkAck(in) != 0) {
return false;
} }
if (noSuchFileExceptionThrown > 0) {
try {
Thread.sleep(TIMEOUT_BETWEEN_CONNECTION_ATTEMPTS);
}
catch (InterruptedException exc) {
}
}
mkdir(e.getFile());
noSuchFileExceptionThrown++;
continue;
} }
} while(true);
// send "C0644 filesize filename", where filename should not include '/' return false;
long filesize = length;
command = "C0644 " + filesize + " ";
command += Paths.get(fileName).getFileName().toString();
command += "\n";
out.write(command.getBytes());
out.flush();
if (checkAck(in) != 0) {
return false;
}
byte[] buf = new byte[getBufferSize()];
// send a content of lfile
while (true) {
int len = is.read(buf, 0, buf.length);
if (len <= 0) break;
out.write(buf, 0, len); // out.flush();
progress.dataTransfered(len);
}
// send '\0'
buf[0] = 0;
out.write(buf, 0, 1);
out.flush();
if (checkAck(in) != 0) {
return false;
}
out.close();
}
catch (ClosedByInterruptException e) {
Thread.interrupted();
throw new InterruptedIOException();
}
finally {
channel.disconnect();
}
return true;
} }
public long size(String lfile) throws JSchException, IOException { public long size(String lfile) throws JSchException, IOException {
...@@ -427,6 +391,98 @@ public class ScpClient implements Closeable { ...@@ -427,6 +391,98 @@ public class ScpClient implements Closeable {
return session; return session;
} }
private boolean scp2Server(InputStream is, String fileName, long length,
long lastModified, TransferFileProgress progress) throws JSchException,
IOException, InterruptedIOException
{
boolean ptimestamp = true;
// exec 'scp -t rfile' remotely
String command = "scp " + (ptimestamp ? "-p" : "") + " -t '" + fileName + "'";
Channel channel = getConnectedSession().openChannel("exec");
((ChannelExec) channel).setCommand(command);
// get I/O streams for remote scp
try (OutputStream out = channel.getOutputStream();
InputStream in = channel.getInputStream())
{
channel.connect();
if (checkAck(in) != 0) {
return false;
}
if (ptimestamp) {
command = "T " + (lastModified / 1000) + " 0";
// The access time should be sent here,
// but it is not accessible with JavaAPI ;-<
command += (" " + (lastModified / 1000) + " 0\n");
out.write(command.getBytes());
out.flush();
if (checkAck(in) != 0) {
return false;
}
}
// send "C0644 filesize filename", where filename should not include '/'
long filesize = length;
command = "C0644 " + filesize + " ";
command += Paths.get(fileName).getFileName().toString();
command += "\n";
out.write(command.getBytes());
out.flush();
StringBuilder sb = new StringBuilder();
int result;
if ((result = checkAck(in, sb)) != 0) {
if (result == 1 && sb.toString().contains(NO_SUCH_FILE_OR_DIRECTORY_ERROR_TEXT) ) {
throw new NoSuchFileException(getParent(fileName));
}
return false;
}
byte[] buf = new byte[getBufferSize()];
// send a content of lfile
while (true) {
int len = is.read(buf, 0, buf.length);
if (len <= 0) break;
out.write(buf, 0, len); // out.flush();
progress.dataTransfered(len);
}
// send '\0'
buf[0] = 0;
out.write(buf, 0, 1);
out.flush();
if (checkAck(in) != 0) {
return false;
}
out.close();
}
catch (ClosedByInterruptException e) {
Thread.interrupted();
throw new InterruptedIOException();
}
finally {
channel.disconnect();
}
return true;
}
private int mkdir(String file) throws JSchException {
ChannelExec channel = (ChannelExec) getConnectedSession().openChannel("exec");
channel.setCommand("mkdir -p '" + file + "'");
try {
channel.connect();
return channel.getExitStatus();
} finally {
channel.disconnect();
}
}
private String getParent(String fileName) {
int index = fileName.lastIndexOf('/');
if (index == -1) {
return null;
}
return fileName.substring(0, index);
}
private class P_UserInfo implements UserInfo { private class P_UserInfo implements UserInfo {
@Override @Override
...@@ -460,6 +516,14 @@ public class ScpClient implements Closeable { ...@@ -460,6 +516,14 @@ public class ScpClient implements Closeable {
} }
static int checkAck(InputStream in) throws IOException { static int checkAck(InputStream in) throws IOException {
StringBuilder sb = new StringBuilder();
int result = checkAck(in, sb);
if (result != 0) {
log.warn(sb.toString());
}
return result;
}
static int checkAck(InputStream in, StringBuilder sb) throws IOException {
int b = in.read(); int b = in.read();
// b may be 0 for success, // b may be 0 for success,
// 1 for error, // 1 for error,
...@@ -469,19 +533,12 @@ public class ScpClient implements Closeable { ...@@ -469,19 +533,12 @@ public class ScpClient implements Closeable {
if (b == -1) return b; if (b == -1) return b;
if (b == 1 || b == 2) { if (b == 1 || b == 2) {
StringBuffer sb = new StringBuffer();
int c; int c;
do { do {
c = in.read(); c = in.read();
sb.append((char) c); sb.append((char) c);
} }
while (c != '\n'); while (c != '\n');
if (b == 1) { // error
System.out.print(sb.toString());
}
if (b == 2) { // fatal error
System.out.print(sb.toString());
}
} }
return b; return b;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment