Skip to content
Snippets Groups Projects
Commit e135ccdb authored by Tobias Pietzsch's avatar Tobias Pietzsch
Browse files

parallelize hdf5 export:

* 1 thread is responsible for writing data blocks to hdf5. It maintains a
  bounded queue of blocks to be written.
* n threads prepare blocks by copying or downsampling from the input image,
  submitting completed to the write queue. Processing of the next input
  image may start before all blocks of the previous image were written.
parent 8136c040
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import mpicbg.spim.data.XmlHelpers;
import mpicbg.spim.data.generic.sequence.AbstractSequenceDescription;
......@@ -159,6 +163,9 @@ public class WriteSequenceToHdf5
*/
public static void writeHdf5PartitionFile( final AbstractSequenceDescription< ?, ?, ? > seq, final Map< Integer, ExportMipmapInfo > perSetupMipmapInfo, final Partition partition, ProgressWriter progressWriter )
{
final int numThreads = Math.max( 1, Runtime.getRuntime().availableProcessors() - 2 );
final int blockWriterQueueLength = 100;
if ( progressWriter == null )
progressWriter = new ProgressWriterConsole();
......@@ -203,6 +210,8 @@ public class WriteSequenceToHdf5
e.printStackTrace();
hdf5Access = new HDF5Access( hdf5Writer );
}
final HDF5BlockWriterThread writerQueue = new HDF5BlockWriterThread( hdf5Access, blockWriterQueueLength );
writerQueue.start();
// write Mipmap descriptions
for ( final Entry< Integer, Integer > entry : partition.getSetupIdSequenceToPartition().entrySet() )
......@@ -216,6 +225,14 @@ public class WriteSequenceToHdf5
progressWriter.setProgress( ( double ) numCompletedTasks++ / numTasks );
// write image data for all views to the HDF5 file
final CellCreatorThread[] cellCreatorThreads = new CellCreatorThread[ numThreads ];
for ( int threadNum = 0; threadNum < numThreads; ++threadNum )
{
cellCreatorThreads[ threadNum ] = new CellCreatorThread();
cellCreatorThreads[ threadNum ].setName( "CellCreatorThread " + threadNum );
cellCreatorThreads[ threadNum ].start();
}
final int n = 3;
final long[] dimensions = new long[ n ];
int timepointIndex = 0;
......@@ -272,7 +289,6 @@ public class WriteSequenceToHdf5
final Interval spanInterval = new FinalInterval( spanDim );
final NeighborhoodsAccessible< UnsignedShortType > neighborhoods = new NeighborhoodsAccessible< UnsignedShortType >( img, spanInterval, f );
final RandomAccess< Neighborhood< UnsignedShortType > > block = neighborhoods.randomAccess();
final long[] minRequiredInput = new long[ n ];
final long[] maxRequiredInput = new long[ n ];
......@@ -282,16 +298,13 @@ public class WriteSequenceToHdf5
final RandomAccessibleInterval< UnsignedShortType > extendedImg = Views.interval( Views.extendBorder( img ), new FinalInterval( minRequiredInput, maxRequiredInput ) );
final NeighborhoodsAccessible< UnsignedShortType > extendedNeighborhoods = new NeighborhoodsAccessible< UnsignedShortType >( extendedImg, spanInterval, f );
final RandomAccess< Neighborhood< UnsignedShortType > > extendedBlock = extendedNeighborhoods.randomAccess();
final RandomAccess< UnsignedShortType > in = img.randomAccess();
final int[] cellDimensions = subdivisions[ level ];
final ViewId viewIdPartition = new ViewId( timepointIdPartition, setupIdPartition );
hdf5Writer.object().createGroup( Util.getGroupPath( viewIdPartition, level ) );
final String path = Util.getCellsPath( viewIdPartition, level );
// hdf5Writer.int16().createMDArray( path, reorder( dimensions ), reorder( cellDimensions ), HDF5IntStorageFeatures.INT_AUTO_SCALING );
hdf5Writer.int16().createMDArray( path, reorder( dimensions ), reorder( cellDimensions ), HDF5IntStorageFeatures.INT_AUTO_SCALING_DEFLATE );
// writerQueue.createAndOpenDataset( path, dimensions.clone(), cellDimensions.clone(), HDF5IntStorageFeatures.INT_AUTO_SCALING );
writerQueue.createAndOpenDataset( path, dimensions.clone(), cellDimensions.clone(), HDF5IntStorageFeatures.INT_AUTO_SCALING_DEFLATE );
final long[] numCells = new long[ n ];
final int[] borderSize = new int[ n ];
......@@ -301,66 +314,266 @@ public class WriteSequenceToHdf5
borderSize[ d ] = ( int ) ( dimensions[ d ] - ( numCells[ d ] - 1 ) * cellDimensions[ d ] );
}
final LocalizingZeroMinIntervalIterator i = new LocalizingZeroMinIntervalIterator( numCells );
final long[] currentCellMin = new long[ n ];
final long[] currentCellMax = new long[ n ];
final long[] currentCellDim = new long[ n ];
final long[] currentCellPos = new long[ n ];
final long[] blockMin = new long[ n ];
try
final CountDownLatch doneSignal = new CountDownLatch( numThreads );
for ( int threadNum = 0; threadNum < numThreads; ++threadNum )
{
hdf5Access.openDataset( path );
while ( i.hasNext() )
cellCreatorThreads[ threadNum ].run( new Runnable()
{
i.fwd();
i.localize( currentCellPos );
boolean isBorderCell = false;
for ( int d = 0; d < n; ++d )
@Override
public void run()
{
currentCellMin[ d ] = currentCellPos[ d ] * cellDimensions[ d ];
blockMin[ d ] = currentCellMin[ d ] * factor[ d ];
final boolean isBorderCellInThisDim = ( currentCellPos[ d ] + 1 == numCells[ d ] );
currentCellDim[ d ] = isBorderCellInThisDim ? borderSize[ d ] : cellDimensions[ d ];
currentCellMax[ d ] = currentCellMin[ d ] + currentCellDim[ d ] - 1;
isBorderCell |= isBorderCellInThisDim;
}
final ArrayImg< UnsignedShortType, ? > cell = ArrayImgs.unsignedShorts( currentCellDim );
final RandomAccess< UnsignedShortType > out = cell.randomAccess();
if ( fullResolution )
{
copyBlock( out, currentCellDim, in, blockMin );
}
else
{
boolean requiresExtension = false;
if ( isBorderCell )
final long[] currentCellMin = new long[ n ];
final long[] currentCellMax = new long[ n ];
final long[] currentCellDim = new long[ n ];
final long[] currentCellPos = new long[ n ];
final long[] blockMin = new long[ n ];
final RandomAccess< Neighborhood< UnsignedShortType > > block = neighborhoods.randomAccess();
final RandomAccess< Neighborhood< UnsignedShortType > > extendedBlock = extendedNeighborhoods.randomAccess();
final RandomAccess< UnsignedShortType > in = img.randomAccess();
while ( true )
{
synchronized ( i )
{
if ( !i.hasNext() )
break;
i.fwd();
i.localize( currentCellPos );
}
boolean isBorderCell = false;
for ( int d = 0; d < n; ++d )
if ( ( currentCellMax[ d ] + 1 ) * factor[ d ] > img.dimension( d ) )
requiresExtension = true;
downsampleBlock( out, currentCellDim, requiresExtension ? extendedBlock : block, blockMin, factor, scale );
{
currentCellMin[ d ] = currentCellPos[ d ] * cellDimensions[ d ];
blockMin[ d ] = currentCellMin[ d ] * factor[ d ];
final boolean isBorderCellInThisDim = ( currentCellPos[ d ] + 1 == numCells[ d ] );
currentCellDim[ d ] = isBorderCellInThisDim ? borderSize[ d ] : cellDimensions[ d ];
currentCellMax[ d ] = currentCellMin[ d ] + currentCellDim[ d ] - 1;
isBorderCell |= isBorderCellInThisDim;
}
final ArrayImg< UnsignedShortType, ? > cell = ArrayImgs.unsignedShorts( currentCellDim );
final RandomAccess< UnsignedShortType > out = cell.randomAccess();
if ( fullResolution )
{
copyBlock( out, currentCellDim, in, blockMin );
}
else
{
boolean requiresExtension = false;
if ( isBorderCell )
for ( int d = 0; d < n; ++d )
if ( ( currentCellMax[ d ] + 1 ) * factor[ d ] > img.dimension( d ) )
requiresExtension = true;
downsampleBlock( out, currentCellDim, requiresExtension ? extendedBlock : block, blockMin, factor, scale );
}
writerQueue.writeBlockWithOffset( ( ( ShortArray ) cell.update( null ) ).getCurrentStorageArray(), currentCellDim.clone(), currentCellMin.clone() );
}
doneSignal.countDown();
}
hdf5Access.writeShortMDArrayBlockWithOffset( ( ( ShortArray ) cell.update( null ) ).getCurrentStorageArray(), currentCellDim, currentCellMin );
}
} );
}
try
{
doneSignal.await();
}
finally
catch ( final InterruptedException e )
{
hdf5Access.closeDataset();
e.printStackTrace();
}
writerQueue.closeDataset();
progressWriter.setProgress( ( double ) numCompletedTasks++ / numTasks );
}
}
}
for ( int threadNum = 0; threadNum < numThreads; ++threadNum )
cellCreatorThreads[ threadNum ].interrupt();
writerQueue.shutdown();
hdf5Writer.close();
}
private static class CellCreatorThread extends Thread
{
private Runnable currentTask = null;
public synchronized void run( final Runnable task )
{
currentTask = task;
notify();
}
@Override
public void run()
{
while ( !isInterrupted() )
{
synchronized ( this )
{
try
{
if ( currentTask == null )
wait();
else
{
currentTask.run();
currentTask = null;
}
}
catch ( final InterruptedException e )
{
break;
}
}
}
}
}
private static interface IHDF5Access
{
public void openDataset( final String datasetPath );
public void createAndOpenDataset( final String path, long[] dimensions, int[] cellDimensions, HDF5IntStorageFeatures features );
public void writeBlockWithOffset( final short[] data, final long[] blockDimensions, final long[] offset );
public void closeDataset();
}
private static class HDF5BlockWriterThread extends Thread implements IHDF5Access
{
private final IHDF5Access hdf5Access;
private static interface Hdf5Task
{
public void run( final IHDF5Access hdf5Access );
}
private final BlockingQueue< Hdf5Task > queue;
private volatile boolean shutdown;
public HDF5BlockWriterThread( final IHDF5Access hdf5Access, final int queueLength )
{
this.hdf5Access = hdf5Access;
queue = new ArrayBlockingQueue< Hdf5Task >( queueLength );
shutdown = false;
setName( "HDF5BlockWriterQueue" );
}
@Override
public void run()
{
while ( ! ( shutdown && queue.isEmpty() ) )
{
try
{
final Hdf5Task task = queue.poll( 10, TimeUnit.MILLISECONDS );
if ( task != null )
task.run( hdf5Access );
}
catch ( final InterruptedException e )
{}
}
}
public void writeShortMDArrayBlockWithOffset( final short[] data, final long[] dimensions, final long[] min );
public void shutdown()
{
shutdown = true;
try
{
join();
}
catch ( final InterruptedException e )
{
e.printStackTrace();
}
}
@Override
public void createAndOpenDataset( final String path, final long[] dimensions, final int[] cellDimensions, final HDF5IntStorageFeatures features )
{
put( new CreateAndOpenDatasetTask( path, dimensions, cellDimensions, features ) );
}
@Override
public void writeBlockWithOffset( final short[] data, final long[] blockDimensions, final long[] offset )
{
put( new WriteBlockWithOffsetTask( data, blockDimensions, offset ) );
}
@Override
public void closeDataset()
{
put( new CloseDatasetTask() );
}
private boolean put( final Hdf5Task task )
{
try
{
queue.put( task );
return true;
}
catch ( final InterruptedException e )
{
return false;
}
}
private static class CreateAndOpenDatasetTask implements Hdf5Task
{
private final String path;
private final long[] dimensions;
private final int[] cellDimensions;
private final HDF5IntStorageFeatures features;
public CreateAndOpenDatasetTask( final String path, final long[] dimensions, final int[] cellDimensions, final HDF5IntStorageFeatures features )
{
this.path = path;
this.dimensions = dimensions;
this.cellDimensions = cellDimensions;
this.features = features;
}
@Override
public void run( final IHDF5Access hdf5Access )
{
hdf5Access.createAndOpenDataset( path, dimensions, cellDimensions, features );
}
}
private static class WriteBlockWithOffsetTask implements Hdf5Task
{
private final short[] data;
private final long[] blockDimensions;
private final long[] offset;
public WriteBlockWithOffsetTask( final short[] data, final long[] blockDimensions, final long[] offset )
{
this.data = data;
this.blockDimensions = blockDimensions;
this.offset = offset;
}
@Override
public void run( final IHDF5Access hdf5Access )
{
hdf5Access.writeBlockWithOffset( data, blockDimensions, offset );
}
}
private static class CloseDatasetTask implements Hdf5Task
{
@Override
public void run( final IHDF5Access hdf5Access )
{
hdf5Access.closeDataset();
}
}
}
private static class HDF5Access implements IHDF5Access
......@@ -369,7 +582,7 @@ public class WriteSequenceToHdf5
private final long[] reorderedDimensions = new long[ 3 ];
private final long[] reorderedMin = new long[ 3 ];
private final long[] reorderedOffset = new long[ 3 ];
private String datasetPath;
......@@ -379,18 +592,19 @@ public class WriteSequenceToHdf5
}
@Override
public void writeShortMDArrayBlockWithOffset( final short[] data, final long[] dimensions, final long[] min )
public void createAndOpenDataset( final String path, final long[] dimensions, final int[] cellDimensions, final HDF5IntStorageFeatures features )
{
reorder( dimensions, reorderedDimensions );
reorder( min, reorderedMin );
final MDShortArray array = new MDShortArray( data, reorderedDimensions );
hdf5Writer.int16().writeMDArrayBlockWithOffset( datasetPath, array, reorderedMin );
hdf5Writer.int16().createMDArray( path, reorder( dimensions ), reorder( cellDimensions ), features );
this.datasetPath = path;
}
@Override
public void openDataset( final String datasetPath )
public void writeBlockWithOffset( final short[] data, final long[] blockDimensions, final long[] offset )
{
this.datasetPath = datasetPath;
reorder( blockDimensions, reorderedDimensions );
reorder( offset, reorderedOffset );
final MDShortArray array = new MDShortArray( data, reorderedDimensions );
hdf5Writer.int16().writeMDArrayBlockWithOffset( datasetPath, array, reorderedOffset );
}
@Override
......@@ -400,11 +614,13 @@ public class WriteSequenceToHdf5
private static class HDF5AccessHack implements IHDF5Access
{
private final int fileId;
private final IHDF5Writer hdf5Writer;
private final long[] reorderedDimensions = new long[ 3 ];
private final long[] reorderedMin = new long[ 3 ];
private final long[] reorderedOffset = new long[ 3 ];
private final int fileId;
private int dataSetId;
......@@ -412,6 +628,8 @@ public class WriteSequenceToHdf5
public HDF5AccessHack( final IHDF5Writer hdf5Writer ) throws ClassNotFoundException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException
{
this.hdf5Writer = hdf5Writer;
final Class< ? > k = Class.forName( "ch.systemsx.cisd.hdf5.HDF5Writer" );
final Field f = k.getDeclaredField( "baseWriter" );
f.setAccessible( true );
......@@ -424,26 +642,27 @@ public class WriteSequenceToHdf5
}
@Override
public void openDataset( final String datasetPath )
public void closeDataset()
{
dataSetId = H5Dopen( fileId, datasetPath, H5P_DEFAULT );
fileSpaceId = H5Dget_space( dataSetId );
H5Sclose( fileSpaceId );
H5Dclose( dataSetId );
}
@Override
public void closeDataset()
public void createAndOpenDataset( final String path, final long[] dimensions, final int[] cellDimensions, final HDF5IntStorageFeatures features )
{
H5Sclose( fileSpaceId );
H5Dclose( dataSetId );
hdf5Writer.int16().createMDArray( path, reorder( dimensions ), reorder( cellDimensions ), features );
dataSetId = H5Dopen( fileId, path, H5P_DEFAULT );
fileSpaceId = H5Dget_space( dataSetId );
}
@Override
public void writeShortMDArrayBlockWithOffset( final short[] data, final long[] dimensions, final long[] min )
public void writeBlockWithOffset( final short[] data, final long[] blockDimensions, final long[] offset )
{
reorder( dimensions, reorderedDimensions );
reorder( min, reorderedMin );
reorder( blockDimensions, reorderedDimensions );
reorder( offset, reorderedOffset );
final int memorySpaceId = H5Screate_simple( reorderedDimensions.length, reorderedDimensions, null );
H5Sselect_hyperslab( fileSpaceId, H5S_SELECT_SET, reorderedMin, null, reorderedDimensions, null );
H5Sselect_hyperslab( fileSpaceId, H5S_SELECT_SET, reorderedOffset, null, reorderedDimensions, null );
H5Dwrite( dataSetId, H5T_NATIVE_INT16, memorySpaceId, fileSpaceId, H5P_DEFAULT, data );
H5Sclose( memorySpaceId );
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment