...
 
Commits (11)
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="target/generated-sources/wsimport">
<attributes>
<attribute name="ignore_optional_problems" value="true"/>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>haas-java-client</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding//target/generated-sources/wsimport=UTF-8
encoding/<project>=UTF-8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cz.it4i.fiji</groupId>
<artifactId>haas-java-client</artifactId>
<version>1.0.0</version>
<name>HaaS library for Java</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<scm>
<url>https://code.it4i.cz/fiji/haas-java-client.git</url>
<connection>scm:git:https://code.it4i.cz/fiji/haas-java-client.git</connection>
</scm>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>create</goal>
</goals>
</execution>
</executions>
<configuration>
<doCheck>false</doCheck>
<doUpdate>false</doUpdate>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
<manifestEntries>
<Implementation-Build>${buildNumber}</Implementation-Build>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>compile</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- usage of jax-ws maven plugin -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxws-maven-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>wsimport-from-jdk</id>
<goals>
<goal>wsimport</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- using wsdl from an url -->
<wsdlUrls>
<wsdlUrl>http://localhost/wsdl/UserAndLimitationManagementWs.wsdl</wsdlUrl>
<wsdlUrl>http://localhost/wsdl/JobManagementWs.wsdl</wsdlUrl>
<wsdlUrl>http://localhost/wsdl/DataTransferWs.wsdl</wsdlUrl>
<wsdlUrl>http://localhost/wsdl/FileTransferWs.wsdl</wsdlUrl>
</wsdlUrls>
<catalog>${project.basedir}/src/main/resources/META-INF/jax-ws-catalog.xml</catalog>
<!-- or using wsdls file directory -->
<!-- <wsdlDirectory>src/wsdl</wsdlDirectory> -->
<!-- which wsdl file -->
<!-- <wsdlFiles> <wsdlFile>UserAndLimitationManagementWs.wsdl</wsdlFile>
<wsdlFile>JobManagementWs.wsdl</wsdlFile> <wsdlFile>DataTransferWs.wsdl</wsdlFile>
<wsdlFile>FileTransferWs.wsdl</wsdlFile> </wsdlFiles> -->
<!-- Keep generated files -->
<!-- Package name -->
<!-- generated source files destination -->
<extension>true</extension>
<keep>true</keep>
<packageName>cz.it4i.fiji.haas_java_client.proxy</packageName>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.jcraft/jsch -->
<dependency>
<groupId>cz.it4i.fiji</groupId>
<artifactId>java-scpclient</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.xml/jaxrpc-api -->
<dependency>
<groupId>javax.xml</groupId>
<artifactId>jaxrpc-api</artifactId>
<version>1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>it4i</id>
<name>IT4I repository</name>
<url>https://artifactory.cs.vsb.cz/it4i/</url>
</repository>
<snapshotRepository>
<id>it4i</id>
<name>II4I repository</name>
<url>https://artifactory.cs.vsb.cz/it4i/</url>
</snapshotRepository>
</distributionManagement>
</project>
package cz.it4i.fiji.commons;
import java.io.Closeable;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiPredicate;
import org.slf4j.Logger;
abstract public class UncaughtExceptionHandlerDecorator implements
UncaughtExceptionHandler, Closeable
{
private final UncaughtExceptionHandler previousHandler;
private UncaughtExceptionHandler decoratedHandler;
private final Collection<BiPredicate<Thread, Throwable>> handlers =
new LinkedList<>();
private boolean closed;
public static UncaughtExceptionHandlerDecorator setDefaultHandler() {
return setDefaultHandler(null);
}
public static UncaughtExceptionHandlerDecorator setDefaultHandler(
final Logger logger)
{
final UncaughtExceptionHandlerDecorator result =
new UncaughtExceptionHandlerDecorator(Thread
.getDefaultUncaughtExceptionHandler(), logger)
{
@Override
protected void setPreviousHandler(
final UncaughtExceptionHandler handler)
{
Thread.setDefaultUncaughtExceptionHandler(handler);
}
@Override
public void activate() {
Thread.setDefaultUncaughtExceptionHandler(this);
}
};
return result;
}
public static UncaughtExceptionHandlerDecorator setHandler(
final Thread thread, final Logger logger)
{
final UncaughtExceptionHandlerDecorator result =
new UncaughtExceptionHandlerDecorator(thread
.getUncaughtExceptionHandler(), logger)
{
@Override
protected void setPreviousHandler(
final UncaughtExceptionHandler handler)
{
thread.setUncaughtExceptionHandler(handler);
}
@Override
public void activate() {
thread.setUncaughtExceptionHandler(this);
}
};
thread.setUncaughtExceptionHandler(result);
return result;
}
@SafeVarargs
public static ThreadFactory createThreadFactory(
final BiPredicate<Thread, Throwable>... handlers)
{
final ThreadFactory result = new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r);
final UncaughtExceptionHandlerDecorator uehd = setHandler(t, null);
for (final BiPredicate<Thread, Throwable> handler : handlers) {
uehd.registerHandler(handler);
}
uehd.activate();
return t;
}
};
return result;
}
private UncaughtExceptionHandlerDecorator(
final UncaughtExceptionHandler previousHandler, final Logger logger)
{
this.previousHandler = previousHandler;
if (previousHandler != null) {
this.decoratedHandler = previousHandler;
}
else {
this.decoratedHandler = new UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
if (logger != null) {
logger.error(e.getMessage(), e);
}
else {
e.printStackTrace(System.err);
}
}
};
}
}
public UncaughtExceptionHandlerDecorator registerHandler(
final BiPredicate<Thread, Throwable> handler)
{
handlers.add(handler);
return this;
}
@Override
public void uncaughtException(final Thread t, final Throwable e) {
for (final BiPredicate<Thread, Throwable> handler : handlers) {
if (handler.test(t, e)) {
return;
}
}
decoratedHandler.uncaughtException(t, e);
}
@Override
synchronized public void close() {
if (!closed) {
if (previousHandler != null) {
setPreviousHandler(previousHandler);
}
closed = true;
}
}
abstract public void activate();
abstract protected void setPreviousHandler(UncaughtExceptionHandler handler);
}
package cz.it4i.fiji.commons;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final public class WebRoutines {
private static final Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.commons.WebRoutines.class);
public static boolean doesURLExist(final URL url) {
// We want to check the current URL
HttpURLConnection.setFollowRedirects(false);
// We don't need to get data
try {
final HttpURLConnection httpURLConnection = (HttpURLConnection) url
.openConnection();
httpURLConnection.setRequestMethod("HEAD");
// Some websites don't like programmatic access so pretend to be a browser
httpURLConnection.setRequestProperty("User-Agent",
"Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.1.2) Gecko/20090729 Firefox/3.5.2 (.NET CLR 3.5.30729)");
final int responseCode = httpURLConnection.getResponseCode();
return responseCode == HttpURLConnection.HTTP_OK;
// We only accept response code 200
}
catch (final IOException exc) {
log.error(exc.getMessage(), exc);
return false;
}
}
private WebRoutines() {}
}
/**
*
*/
/**
* @author koz01
*
*/
package cz.it4i.fiji.commons;
package cz.it4i.fiji.haas.data_transfer;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentIndex<T> {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.PersistentIndex.class);
private final Path workingFile;
private final Set<T> indexedFiles = new LinkedHashSet<>();
private final Function<String, T> fromStringConvertor;
public PersistentIndex(Path workingFile,Function<String, T> fromStringConvertor) throws IOException {
this.workingFile = workingFile;
this.fromStringConvertor = fromStringConvertor;
loadFromWorkingFile();
}
public synchronized void storeToWorkingFile() throws IOException {
try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
for (T file : indexedFiles) {
bw.write(file.toString() + "\n");
}
}
}
public synchronized boolean insert(T file) {
return indexedFiles.add(file);
}
public synchronized void remove(T p) {
indexedFiles.remove(p);
}
public synchronized Set<T> getIndexedItems() {
return Collections.unmodifiableSet(indexedFiles);
}
public synchronized void clear() throws IOException {
indexedFiles.clear();
storeToWorkingFile();
}
public synchronized boolean contains(Path file) {
return indexedFiles.contains(file);
}
private void loadFromWorkingFile() throws IOException {
indexedFiles.clear();
if (Files.exists(workingFile)) {
try (BufferedReader br = Files.newBufferedReader(workingFile)) {
String line;
while (null != (line = br.readLine())) {
processLine(line);
}
}
}
}
private void processLine(String line) {
indexedFiles.add(fromStringConvertor.apply(line));
}
}
package cz.it4i.fiji.haas.data_transfer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.commons.DoActionEventualy;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClientException;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
public abstract class PersistentSynchronizationProcess<T> {
public static final String FAILED_ITEM = "Failed item";
private static final Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas.data_transfer.PersistentSynchronizationProcess.class);
private static final TransferFileProgressForHaaSClient DUMMY_FILE_PROGRESS = new TransferFileProgressForHaaSClient(
0, HaaSClient.DUMMY_PROGRESS_NOTIFIER);
private final static String INIT_TRANSFER_ITEM = "init transfer";
private static final long WAIT_FOR_CLOSE_SEESION_TIMEOUT = 500;
private final PersistentIndex<T> index;
private final Queue<T> toProcessQueue = new LinkedBlockingQueue<>();
private final Set<Thread> runningTransferThreads = Collections.synchronizedSet(new HashSet<>());
private final SimpleThreadRunner runner;
private final Supplier<HaaSFileTransfer> fileTransferSupplier;
private final Runnable processFinishedNotifier;
private ProgressNotifier notifier;
private boolean startFinished = true;
private final AtomicInteger runningProcessCounter = new AtomicInteger();
private final Collection<P_HolderOfOpenClosables> openedClosables = new LinkedList<>();
public PersistentSynchronizationProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier,
Runnable processFinishedNotifier, Path indexFile, Function<String, T> convertor) throws IOException {
runner = new SimpleThreadRunner(service);
this.fileTransferSupplier = fileTransferSupplier;
this.processFinishedNotifier = processFinishedNotifier;
this.index = new PersistentIndex<>(indexFile, convertor);
}
public synchronized CompletableFuture<?> start() throws IOException {
startFinished = false;
index.clear();
try {
for (T item : getItems()) {
index.insert(item);
toProcessQueue.add(item);
}
index.storeToWorkingFile();
return runner.runIfNotRunning(this::doProcess);
} finally {
startFinished = true;
index.storeToWorkingFile();
}
}
public void stop() throws IOException {
toProcessQueue.clear();
index.clear();
notifyStop();
runningTransferThreads.forEach(t -> t.interrupt());
}
public void shutdown() {
synchronized (this) {
toProcessQueue.clear();
runningTransferThreads.forEach(t -> t.interrupt());
}
try(DoActionEventualy action = new DoActionEventualy(WAIT_FOR_CLOSE_SEESION_TIMEOUT, this::closeOpennedClosables)) {
waitForFinishAllProcesses();
}
}
public void resume() {
toProcessQueue.addAll(index.getIndexedItems());
runner.runIfNotRunning(this::doProcess);
}
public Set<T> getIndexedItems() {
return index.getIndexedItems();
}
public void setNotifier(ProgressNotifier notifier) {
this.notifier = notifier;
}
public synchronized boolean isWorking() {
return !toProcessQueue.isEmpty();
}
abstract protected Iterable<T> getItems() throws IOException;
abstract protected void processItem(HaaSFileTransfer tr, T p) throws InterruptedIOException;
abstract protected long getTotalSize(Iterable<T> items, HaaSFileTransfer tr) throws InterruptedIOException;
private void doProcess(AtomicBoolean reRun) {
runningProcessCounter.incrementAndGet();
boolean interrupted = false;
this.notifier.addItem(INIT_TRANSFER_ITEM);
runningTransferThreads.add(Thread.currentThread());
TransferFileProgressForHaaSClient actualnotifier = DUMMY_FILE_PROGRESS;
try (P_HolderOfOpenClosables transferHolder = new P_HolderOfOpenClosables(fileTransferSupplier.get())) {
HaaSFileTransfer tr = transferHolder.getTransfer();
try {
tr.setProgress(actualnotifier = getTransferFileProgress(tr));
} catch (InterruptedIOException e1) {
interrupted = true;
}
this.notifier.itemDone(INIT_TRANSFER_ITEM);
this.notifier.done();
do {
synchronized (this) {
synchronized (reRun) {
interrupted |= Thread.interrupted();
if (interrupted || toProcessQueue.isEmpty()) {
reRun.set(false);
break;
}
}
}
T p = toProcessQueue.poll();
String item = p.toString();
actualnotifier.addItem(item);
try {
processItem(tr, p);
fileTransfered(p);
actualnotifier.itemDone(item);
}
catch (InterruptedIOException | HaaSClientException e) {
synchronized (this) {
toProcessQueue.clear();
interrupted = true;
if (e instanceof HaaSClientException) {
log.warn("process ", e);
actualnotifier.addItem(FAILED_ITEM);
}
}
}
} while(true);
} finally {
runningTransferThreads.remove(Thread.currentThread());
synchronized (this) {
if (startFinished) {
if (!interrupted && !Thread.interrupted()) {
processFinishedNotifier.run();
actualnotifier.done();
} else {
notifyStop();
reRun.set(false);
}
}
}
synchronized (runningProcessCounter) {
runningProcessCounter.decrementAndGet();
runningProcessCounter.notifyAll();
}
}
}
private void fileTransfered(T p) {
try {
index.remove(p);
index.storeToWorkingFile();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
private TransferFileProgressForHaaSClient getTransferFileProgress(HaaSFileTransfer tr)
throws InterruptedIOException {
if (notifier == null) {
return DUMMY_FILE_PROGRESS;
}
return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier);
}
private void notifyStop() {
notifier.setCount(-1, -1);
}
private void waitForFinishAllProcesses() {
synchronized (runningProcessCounter) {
while (runningProcessCounter.get() != 0) {
try {
runningProcessCounter.wait();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}
protected void closeOpennedClosables() {
synchronized(openedClosables) {
for(P_HolderOfOpenClosables closeable: openedClosables) {
closeable.close();
}
}
}
private class P_HolderOfOpenClosables implements Closeable{
final private HaaSFileTransfer transfer;
public P_HolderOfOpenClosables(HaaSFileTransfer transfer) {
this.transfer = transfer;
synchronized(openedClosables) {
openedClosables.add(this);
}
}
public HaaSFileTransfer getTransfer() {
return transfer;
}
@Override
public void close() {
synchronized(openedClosables) {
try {
transfer.close();
} finally {
openedClosables.remove(this);
}
}
}
}
}
package cz.it4i.fiji.haas.data_transfer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public class SimpleThreadRunner {
private final ExecutorService service;
private final AtomicBoolean reRun = new AtomicBoolean(false);
private CompletableFuture<?> lastRun;
public SimpleThreadRunner(ExecutorService service) {
this.service = service;
}
synchronized public CompletableFuture<?> runIfNotRunning(Consumer<AtomicBoolean> r) {
synchronized (reRun) {
if (reRun.get()) {
return lastRun;
}
reRun.set(true);
}
return lastRun = CompletableFuture.runAsync(() -> {
do {
r.accept(reRun);
} while (reRun.get());
}, service);
}
}
package cz.it4i.fiji.haas.data_transfer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.FileTransferInfo;
import cz.it4i.fiji.haas_java_client.FileTransferState;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.UploadingFile;
import cz.it4i.fiji.haas_java_client.UploadingFileImpl;
public class Synchronization implements Closeable {
private final static Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.haas.data_transfer.Synchronization.class);
private static final String FILE_INDEX_TO_UPLOAD_FILENAME = ".toUploadFiles";
private static final String FILE_INDEX_UPLOADED_FILENAME = ".uploaded";
private static final String FILE_INDEX_TO_DOWNLOAD_FILENAME =
".toDownloadFiles";
private static final String FILE_INDEX_DOWNLOADED_FILENAME = ".downloaded";
private final Path workingDirectory;
private final Path inputDirectory;
private final Path outputDirectory;
private final PersistentIndex<Path> filesDownloaded;
private final PersistentIndex<Path> filesUploaded;
private final PersistentSynchronizationProcess<Path> uploadProcess;
private final P_PersistentDownloadProcess downloadProcess;
private final ExecutorService service;
private final Predicate<Path> uploadFilter;
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, Path inputDirectory,
Path outputDirectory, Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier, Predicate<Path> uploadFilter)
throws IOException {
this.workingDirectory = workingDirectory;
this.inputDirectory = inputDirectory;
this.outputDirectory = outputDirectory;
this.service = Executors.newFixedThreadPool(2);
this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(
FILE_INDEX_DOWNLOADED_FILENAME), name -> Paths.get(name));
this.filesUploaded = new PersistentIndex<>(workingDirectory.resolve(
FILE_INDEX_UPLOADED_FILENAME), name -> Paths.get(name));
this.uploadProcess = createUploadProcess(fileTransferSupplier, service,
uploadFinishedNotifier);
this.downloadProcess = createDownloadProcess(fileTransferSupplier, service,
downloadFinishedNotifier);
this.uploadFilter = uploadFilter;
}
public synchronized void setUploadNotifier(ProgressNotifier notifier) {
uploadProcess.setNotifier(notifier);
}
public void setDownloadNotifier(ProgressNotifier notifier) {
downloadProcess.setNotifier(notifier);
}
public synchronized void startUpload() throws IOException {
uploadProcess.start();
}
public void stopUpload() throws IOException {
uploadProcess.stop();
}
public void resumeUpload() {
uploadProcess.resume();
}
public boolean isUploading() {
return uploadProcess.isWorking();
}
public synchronized CompletableFuture<?> startDownload(Collection<String> files) throws IOException {
this.downloadProcess.setItems(files);
return this.downloadProcess.start();
}
public synchronized void stopDownload() throws IOException {
this.downloadProcess.stop();
}
public synchronized void resumeDownload() {
this.downloadProcess.resume();
}
public boolean isDownloading() {
return downloadProcess.isWorking();
}
public List<FileTransferInfo> getFileTransferInfo() {
final List<FileTransferInfo> list = new LinkedList<>();
filesUploaded.getIndexedItems().forEach(ii -> {
list.add(new FileTransferInfo(ii, FileTransferState.Finished));
});
uploadProcess.getIndexedItems().forEach(ii -> {
list.add(new FileTransferInfo(ii, FileTransferState.Queuing));
});
return list;
}
@Override
public void close() {
service.shutdown();
uploadProcess.shutdown();
downloadProcess.shutdown();
}
private boolean canUpload(Path file) {
return uploadFilter.test(file) && !file.getFileName().toString().matches("[.][^.]+")
&& !filesDownloaded.contains(file);
}
private PersistentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier,
ExecutorService executorService, Runnable uploadFinishedNotifier) throws IOException {
return new PersistentSynchronizationProcess<Path>(executorService, fileTransferSupplier, uploadFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), name -> inputDirectory.resolve(name)) {
@Override
protected Collection<Path> getItems() throws IOException {
try (Stream<Path> ds = Files.walk(inputDirectory)) {
return ds.filter(p -> !Files.isDirectory(p) && canUpload(p) &&
!filesUploaded.contains(p)).collect(Collectors.toList());
}
}
@Override
protected void processItem(final HaaSFileTransfer tr, final Path p)
throws InterruptedIOException
{
final UploadingFile uf = new UploadingFileImpl(p, inputDirectory);
tr.upload(uf);
filesUploaded.insert(inputDirectory.resolve(p.toString()));
try {
filesUploaded.storeToWorkingFile();
} catch (final IOException e) {
log.error(e.getMessage(), e);
}
}
@Override
protected long getTotalSize(Iterable<Path> items, HaaSFileTransfer tr) {
return StreamSupport.stream(items.spliterator(), false).map(p -> {
try {
return Files.size(p);
} catch (IOException e) {
log.error(e.getMessage(), e);
return 0;
}
}).collect(Collectors.summingLong(val -> val.longValue()));
}
};
}
private P_PersistentDownloadProcess createDownloadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier,
ExecutorService executorService, Runnable uploadFinishedNotifier) throws IOException {
return new P_PersistentDownloadProcess(executorService, fileTransferSupplier, uploadFinishedNotifier);
}
private class P_PersistentDownloadProcess extends PersistentSynchronizationProcess<String> {
private Collection<String> items = Collections.emptyList();
public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier,
Runnable processFinishedNotifier) throws IOException {
super(service, fileTransferSupplier, processFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name);
}
private synchronized void setItems(Collection<String> items) {
this.items = new LinkedList<>(items);
}
@Override
protected synchronized Collection<String> getItems() throws IOException {
return items;
}
@Override
protected void processItem(final HaaSFileTransfer tr, final String file)
throws InterruptedIOException
{
tr.download(file, outputDirectory);
filesDownloaded.insert(outputDirectory.resolve(file));
try {
filesDownloaded.storeToWorkingFile();
}
catch (final IOException e) {
log.error(e.getMessage(), e);
}
}
@Override
protected long getTotalSize(Iterable<String> files, HaaSFileTransfer tr) throws InterruptedIOException {
return tr.obtainSize(StreamSupport.stream(files.spliterator(), false).collect(Collectors.toList())).stream()
.collect(Collectors.summingLong(val -> val));
}
}
}
/**
*
*/
/**
* @author koz01
*
*/
package cz.it4i.fiji.haas.data_transfer;
\ No newline at end of file
package cz.it4i.fiji.haas_java_client;
public class AuthenticationException extends HaaSClientException {
public AuthenticationException() {
}
public AuthenticationException(String message) {
super(message);
}
public AuthenticationException(Throwable cause) {
super(cause);
}
public AuthenticationException(String message, Throwable cause) {
super(message, cause);
}
public AuthenticationException(String message, Throwable cause,
boolean enableSuppression, boolean writableStackTrace)
{
super(message, cause, enableSuppression, writableStackTrace);
}
}
package cz.it4i.fiji.haas_java_client;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Configuration {
private Properties properties;
public Configuration(String configFile) {
try (InputStream is = this.getClass().getClassLoader().getResourceAsStream(configFile)) {
if (is == null) {
throw new IllegalArgumentException("Resource " + configFile + " does not exist. Copy " + configFile
+ ".template and fill it, please!");
}
this.properties = new Properties();
this.properties.load(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected String getValue(String key) {
return this.properties.getProperty(key);
}
}
package cz.it4i.fiji.haas_java_client;
class Constants extends Configuration{
public Constants(String configFileName) {
super(configFileName);
}
public String getUserName() {
return getValue("USER_NAME");
}
public String getPhone() {
return getValue("PHONE");
}
public String getPassword() {
return getValue("PASSWORD");
}
public String getEmail() {
return getValue("EMAIL");
}
}
\ No newline at end of file
package cz.it4i.fiji.haas_java_client;
import java.nio.file.Path;
public class FileTransferInfo {
private final Path path;
private final FileTransferState state;
public FileTransferInfo(final Path path, final FileTransferState state) {
this.path = path;
this.state = state;
}
public String getFileNameAsString() {
return path.getFileName().toString();
}
public FileTransferState getState() {
return state;
}
}
package cz.it4i.fiji.haas_java_client;
public enum FileTransferState {
Unknown, Queuing, InProcess, Finished;
}
\ No newline at end of file
package cz.it4i.fiji.haas_java_client;
public class HaaSClientException extends RuntimeException {
private static final long serialVersionUID = 1L;
public HaaSClientException() {
}
public HaaSClientException(String message) {
super(message);
}
public HaaSClientException(Throwable cause) {
super(cause);
}
public HaaSClientException(String message, Throwable cause) {
super(message, cause);
}
public HaaSClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
package cz.it4i.fiji.haas_java_client;
public interface HaaSClientSettings {
String getUserName();
String getPassword();
String getEmail();
String getPhone();
String getProjectId();
}
package cz.it4i.fiji.haas_java_client;
import java.io.Closeable;
public interface HaaSDataTransfer extends Closeable{
void write(byte []buffer);
byte[] read();
void closeConnection();
}
package cz.it4i.fiji.haas_java_client;
import java.io.Closeable;
import java.io.InterruptedIOException;
import java.nio.file.Path;
import java.util.List;
import cz.it4i.fiji.scpclient.TransferFileProgress;
public interface HaaSFileTransfer extends Closeable {
@Override
void close();
void upload(UploadingFile file) throws InterruptedIOException;
void download(String files, Path workDirectory) throws InterruptedIOException;
List<Long> obtainSize(List<String> files) throws InterruptedIOException;
List<String> getContent(List<String> logs);
void setProgress(TransferFileProgress progress);
}
package cz.it4i.fiji.haas_java_client;
import com.jcraft.jsch.JSchException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt;
import cz.it4i.fiji.scpclient.ScpClient;
import cz.it4i.fiji.scpclient.TransferFileProgress;
class HaaSFileTransferImp implements HaaSFileTransfer {
@SuppressWarnings("unused")
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSFileTransferImp.class);
private final FileTransferMethodExt ft;
private final ScpClient scpClient;
private TransferFileProgress progress;
public HaaSFileTransferImp(FileTransferMethodExt ft, ScpClient scpClient, TransferFileProgress progress) {
this.ft = ft;
this.scpClient = scpClient;
this.progress = progress;
}
@Override
public void close() {
scpClient.close();
}
@Override
public void upload(final UploadingFile file) throws InterruptedIOException {
final String destFile = ft.getSharedBasepath() + "/" + file.getName();
try (InputStream is = file.getInputStream()) {
scpClient.upload(is, destFile, file.getLength(), file.getLastTime(),
progress);
}
catch (JSchException | IOException e) {
throw new HaaSClientException("An upload of " + file + " to " + destFile +
" failed: " + e.getMessage(), e);
}
}
@Override
public void download(String fileName, final Path workDirectory)
throws InterruptedIOException
{
try {
fileName = fileName.replaceFirst("/", "");
final Path rFile = workDirectory.resolve(fileName);
final String fileToDownload = ft.getSharedBasepath() + "/" + fileName;
scpClient.download(fileToDownload, rFile, progress);
}
catch (JSchException | IOException e) {
throw new HaaSClientException("A download of " + fileName + " to " +
workDirectory + " failed: " + e.getMessage());
}
}
@Override
public void setProgress(TransferFileProgress progress) {
this.progress = progress;
}