Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ struct OPENPMDAPI_EXPORT
}

// in parameters
bool queryOnly = false; // query if the backend supports this
Offset offset;
Extent extent;
Datatype dtype = Datatype::UNDEFINED;
Expand Down
68 changes: 44 additions & 24 deletions include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,6 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
{
verifyChunk<T>(o, e);

/*
* The openPMD backend might not yet know about this dataset.
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush_impl</* flush_entire_series = */ false>(
{FlushLevel::SkeletonOnly});

size_t size = 1;
for (auto ext : e)
{
Expand All @@ -102,33 +94,61 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
* Flushing the skeleton does not create datasets,
* so we might need to do it now.
*/
if (!written())
auto &rc = get();
if (!rc.m_dataset.has_value())
{
auto &rc = get();
if (!rc.m_dataset.has_value())
{
throw error::WrongAPIUsage(
"[RecordComponent] Must specify dataset type and extent before "
"using storeChunk() (see RecordComponent::resetDataset()).");
}
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
IOHandler()->enqueue(IOTask(this, dCreate));
throw error::WrongAPIUsage(
"[RecordComponent] Must specify dataset type and extent before "
"using storeChunk() (see RecordComponent::resetDataset()).");
}

Parameter<Operation::GET_BUFFER_VIEW> query;
query.queryOnly = true;
IOHandler()->enqueue(IOTask(this, query));
IOHandler()->flush(internal::defaultFlushParams);

Parameter<Operation::GET_BUFFER_VIEW> getBufferView;
getBufferView.offset = o;
getBufferView.extent = e;
getBufferView.dtype = getDatatype();
IOHandler()->enqueue(IOTask(this, getBufferView));
IOHandler()->flush(internal::defaultFlushParams);
auto &out = *getBufferView.out;
if (!out.backendManagedBuffer)

if (query.out->backendManagedBuffer)
{
// Need to initialize the dataset for the Span API
// But this is a non-collective call and initializing the dataset is
// collective in HDF5 So we do this only in backends that actually
// support the Span API (i.e. ADIOS2) which do not share this
// restriction
// TODO: Add some form of collective ::commitDefinitions() call to
// RecordComponents to be called by users before the Span API
if (!written())
{
/*
* The openPMD backend might not yet know about this dataset.
* Flush the openPMD hierarchy to the backend without flushing any
* actual data yet.
*/
seriesFlush_impl</* flush_entire_series = */ false>(
{FlushLevel::SkeletonOnly});
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
IOHandler()->enqueue(IOTask(this, dCreate));

setWritten(true, EnqueueAsynchronously::OnlyAsync);
}

IOHandler()->enqueue(IOTask(this, getBufferView));
IOHandler()->flush(internal::defaultFlushParams);
}

// The backend might still refuse the operation even if backend managed
// buffers are generally supported, so check again
if (!getBufferView.out->backendManagedBuffer)
{
// note that data might have either
// type shared_ptr<T> or shared_ptr<T[]>
auto data = std::forward<F>(createBuffer)(size);
out.ptr = static_cast<void *>(data.get());
getBufferView.out->ptr = static_cast<void *>(data.get());
if (size > 0)
{
storeChunk(std::move(data), std::move(o), std::move(e));
Expand Down
5 changes: 3 additions & 2 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,10 @@ OPENPMD_protected
{
return writable().written;
}
enum class EnqueueAsynchronously : bool
enum class EnqueueAsynchronously : uint8_t
{
Yes,
OnlyAsync,
Both,
No
};
/*
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,12 @@ void ADIOS2IOHandlerImpl::getBufferView(
parameters.out->backendManagedBuffer = false;
return;
}
else if (parameters.queryOnly)
{
parameters.out->backendManagedBuffer = true;
return;
}

setAndGetFilePosition(writable);
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError);
Expand Down
6 changes: 3 additions & 3 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void Iteration::flushFileBased(
* If it was written before, then in the context of another iteration.
*/
auto &attr = s.get().m_rankTable.m_attributable;
attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes);
attr.setWritten(false, Attributable::EnqueueAsynchronously::Both);
s.get()
.m_rankTable.m_attributable.get()
.m_writable.abstractFilePosition.reset();
Expand Down Expand Up @@ -853,7 +853,7 @@ auto Iteration::beginStep(
{
bool previous = series.iterations.written();
series.iterations.setWritten(
false, Attributable::EnqueueAsynchronously::Yes);
false, Attributable::EnqueueAsynchronously::Both);
auto oldStatus = IOHandl->m_seriesStatus;
IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing;
try
Expand All @@ -870,7 +870,7 @@ auto Iteration::beginStep(
}
IOHandl->m_seriesStatus = oldStatus;
series.iterations.setWritten(
previous, Attributable::EnqueueAsynchronously::Yes);
previous, Attributable::EnqueueAsynchronously::Both);
}
else if (thisObject.has_value())
{
Expand Down
6 changes: 3 additions & 3 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1503,9 +1503,9 @@ void Series::flushFileBased(
* current iteration by the backend)
*/
this->setWritten(
false, Attributable::EnqueueAsynchronously::Yes);
false, Attributable::EnqueueAsynchronously::Both);
series.iterations.setWritten(
false, Attributable::EnqueueAsynchronously::Yes);
false, Attributable::EnqueueAsynchronously::Both);

setDirty(dirty() || it->second.dirty());
std::string filename = iterationFilename(it->first);
Expand Down Expand Up @@ -1996,7 +1996,7 @@ void Series::readOneIterationFileBased(std::string const &filePath)
setWritten(false, Attributable::EnqueueAsynchronously::No);
setIterationEncoding_internal(
encoding_out, internal::default_or_explicit::explicit_);
setWritten(old_written, Attributable::EnqueueAsynchronously::Yes);
setWritten(old_written, Attributable::EnqueueAsynchronously::Both);
}
else
throw std::runtime_error(
Expand Down
10 changes: 8 additions & 2 deletions src/backend/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,12 +524,18 @@ void Attributable::setWritten(bool val, EnqueueAsynchronously ea)
switch (ea)
{

case EnqueueAsynchronously::Yes: {
case EnqueueAsynchronously::OnlyAsync: {
Parameter<Operation::SET_WRITTEN> param;
param.target_status = val;
IOHandler()->enqueue(IOTask(this, param));
return;
}
case EnqueueAsynchronously::Both: {
Parameter<Operation::SET_WRITTEN> param;
param.target_status = val;
IOHandler()->enqueue(IOTask(this, param));
break;
}
break;
case EnqueueAsynchronously::No:
break;
}
Expand Down
Loading