Skip to content
Snippets Groups Projects
Commit 73c92f16 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

modify interface

parent 6e2b0a13
No related branches found
No related tags found
No related merge requests found
......@@ -41,7 +41,7 @@ public class BenchmarkJobManager {
public void startJob(long jobId) throws IOException {
JobInfo jobInfo = jobs.get(jobId);
jobInfo.uploadFilesByName(() -> Arrays.asList(CONFIG_YAML).stream());
jobInfo.uploadFilesByName(Arrays.asList(CONFIG_YAML));
String outputName = getOutputName(jobInfo.openLocalFile(CONFIG_YAML));
jobInfo.submit();
jobInfo.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, outputName);
......
......@@ -13,7 +13,7 @@ import java.util.Properties;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.scijava.log.LogService;
import org.scijava.plugin.Parameter;
......@@ -98,13 +98,15 @@ public class Job {
updateState();
}
public void uploadFiles(Supplier<Stream<UploadingFile>> files) {
public void uploadFiles(Iterable<UploadingFile> files) {
HaaSClient client = this.haasClientSupplier.get();
client.uploadFiles(jobId, files, notifier);
}
public void uploadFilesByName(Supplier<Stream<String>> files) {
uploadFiles(() -> files.get().map(name -> HaaSClient.getUploadingFile(jobDir.resolve(name))));
public void uploadFilesByName(Iterable<String> files) {
Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false)
.map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList());
uploadFiles(uploadingFiles);
}
public void submit() {
......@@ -137,7 +139,7 @@ public class Job {
}
public void download() {
download(x->true, dummy);
download(x -> true, dummy);
}
public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException {
......@@ -180,7 +182,7 @@ public class Job {
public Iterable<String> getOutput(Iterable<JobSynchronizableFile> output) {
HaaSClient.SynchronizableFiles taskFileOffset = new HaaSClient.SynchronizableFiles();
long taskId = (Long) getJobInfo().getTasks().toArray()[0];
output.forEach(file->taskFileOffset.addFile(taskId, file.getType(), file.getOffset()));
output.forEach(file -> taskFileOffset.addFile(taskId, file.getType(), file.getOffset()));
return haasClientSupplier.get().downloadPartsOfJobFiles(jobId, taskFileOffset).stream().map(f -> f.getContent())
.collect(Collectors.toList());
}
......@@ -198,7 +200,7 @@ public class Job {
public String getProperty(String name) throws IOException {
return loadPropertiesIfExists().getProperty(name);
}
private synchronized void saveJobinfo() throws IOException {
Properties prop = loadPropertiesIfExists();
if (needsDownload != null) {
......@@ -225,7 +227,7 @@ public class Job {
private Properties loadPropertiesIfExists() throws IOException {
Properties prop = new Properties();
if(Files.exists(jobDir.resolve(JOB_INFO_FILE))) {
if (Files.exists(jobDir.resolve(JOB_INFO_FILE))) {
try (InputStream is = Files.newInputStream(jobDir.resolve(JOB_INFO_FILE))) {
prop.load(is);
}
......@@ -292,6 +294,4 @@ public class Job {
}
}
......@@ -9,9 +9,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -60,7 +58,7 @@ public class JobManager {
};
}
public JobInfo startJob(Supplier<Stream<UploadingFile>> files, Progress progress) throws IOException {
public JobInfo startJob(Iterable<UploadingFile> files, Progress progress) throws IOException {
JobInfo result = createJob(progress);
result.uploadFiles(files);
result.submit();
......@@ -127,11 +125,11 @@ public class JobManager {
this.job = job;
}
public void uploadFiles(Supplier<Stream<UploadingFile>> files) {
public void uploadFiles(Iterable<UploadingFile> files) {
job.uploadFiles(files);
}
public void uploadFilesByName(Supplier<Stream<String>> files) {
public void uploadFilesByName(Iterable<String> files) {
job.uploadFilesByName(files);
}
......
......@@ -51,13 +51,9 @@ public class RunWithHaaS implements Command {
public void run() {
try {
jobManager = new JobManager(getWorkingDirectoryPath(), TestingConstants.getSettings());
jobManager.startJob(() -> {
try {
return getContent(dataDirectory).stream().map(HaaSClient::getUploadingFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, ModalDialogs.doModal(new ProgressDialog(getFrame())));
jobManager.startJob(
getContent(dataDirectory).stream().map(HaaSClient::getUploadingFile).collect(Collectors.toList()),
ModalDialogs.doModal(new ProgressDialog(getFrame())));
} catch (IOException e) {
log.error(e);
}
......
......@@ -16,9 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.xml.rpc.ServiceException;
......@@ -164,10 +162,11 @@ public class HaaSClient {
}
public long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters) {
return start(files, name, templateParameters, dummyNotifier);
Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false).map(HaaSClient::getUploadingFile).collect(Collectors.toList());
return start(uploadingFiles, name, templateParameters, dummyNotifier);
}
public long start(Supplier<Stream<UploadingFile>> files, String name,
public long start(Iterable<UploadingFile> files, String name,
Collection<Entry<String, String>> templateParameters, ProgressNotifier notifier) {
notifier.setTitle("Starting job");
try {
......@@ -190,7 +189,7 @@ public class HaaSClient {
}
}
public void uploadFiles(long jobId, Supplier<Stream<UploadingFile>> files, ProgressNotifier notifier) {
public void uploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier) {
try {
doUploadFiles(jobId, files, notifier);
} catch (ServiceException | JSchException | IOException e) {
......@@ -342,16 +341,16 @@ public class HaaSClient {
notifier.done();
}
private void doUploadFiles(long jobId, Supplier<Stream<UploadingFile>> files, ProgressNotifier notifier)
private void doUploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier)
throws RemoteException, ServiceException, JSchException, IOException, UnsupportedEncodingException {
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
List<Long> totalSizes = StreamSupport.stream(files.get().spliterator(), false).map(f -> f.getLength())
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (UploadingFile file : (Iterable<UploadingFile>) files.get()::iterator) {
for (UploadingFile file : files) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
......@@ -382,14 +381,7 @@ public class HaaSClient {
return String.format("Created job: %d\n", jobId);
}
private long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
Supplier<Stream<UploadingFile>> fileStream = () -> StreamSupport.stream(files.spliterator(), false)
.map(file -> getUploadingFile(file));
return start(fileStream, name, templateParameters, notifier);
}
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException {
List<Long> result = new LinkedList<>();
......
package cz.it4i.fiji.haas_java_client;
import java.util.stream.Stream;
import java.util.Arrays;
public class Run {
public static void main(String[] args) {
Stream<String> stream = Arrays.asList("").stream();
stream.iterator();
stream.iterator();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment