diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 1ee7248b32..25e0d6ad54 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -558,6 +558,7 @@ struct OPENPMDAPI_EXPORT } // in parameters + bool queryOnly = false; // query if the backend supports this Offset offset; Extent extent; Datatype dtype = Datatype::UNDEFINED; diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 9d1d8332b4..b796ab1a93 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -85,14 +85,6 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) { verifyChunk(o, e); - /* - * The openPMD backend might not yet know about this dataset. - * Flush the openPMD hierarchy to the backend without flushing any actual - * data yet. - */ - seriesFlush_impl( - {FlushLevel::SkeletonOnly}); - size_t size = 1; for (auto ext : e) { @@ -102,33 +94,61 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) * Flushing the skeleton does not create datasets, * so we might need to do it now. */ - if (!written()) + auto &rc = get(); + if (!rc.m_dataset.has_value()) { - auto &rc = get(); - if (!rc.m_dataset.has_value()) - { - throw error::WrongAPIUsage( - "[RecordComponent] Must specify dataset type and extent before " - "using storeChunk() (see RecordComponent::resetDataset())."); - } - Parameter dCreate(rc.m_dataset.value()); - dCreate.name = Attributable::get().m_writable.ownKeyWithinParent; - IOHandler()->enqueue(IOTask(this, dCreate)); + throw error::WrongAPIUsage( + "[RecordComponent] Must specify dataset type and extent before " + "using storeChunk() (see RecordComponent::resetDataset())."); } + Parameter query; + query.queryOnly = true; + IOHandler()->enqueue(IOTask(this, query)); + IOHandler()->flush(internal::defaultFlushParams); + Parameter getBufferView; getBufferView.offset = o; getBufferView.extent = e; getBufferView.dtype = getDatatype(); - IOHandler()->enqueue(IOTask(this, getBufferView)); - IOHandler()->flush(internal::defaultFlushParams); - auto &out = *getBufferView.out; - if (!out.backendManagedBuffer) + + if (query.out->backendManagedBuffer) + { + // Need to initialize the dataset for the Span API + // But this is a non-collective call and initializing the dataset is + // collective in HDF5 So we do this only in backends that actually + // support the Span API (i.e. ADIOS2) which do not share this + // restriction + // TODO: Add some form of collective ::commitDefinitions() call to + // RecordComponents to be called by users before the Span API + if (!written()) + { + /* + * The openPMD backend might not yet know about this dataset. + * Flush the openPMD hierarchy to the backend without flushing any + * actual data yet. + */ + seriesFlush_impl( + {FlushLevel::SkeletonOnly}); + Parameter dCreate(rc.m_dataset.value()); + dCreate.name = Attributable::get().m_writable.ownKeyWithinParent; + IOHandler()->enqueue(IOTask(this, dCreate)); + + setWritten(true, EnqueueAsynchronously::OnlyAsync); + } + + IOHandler()->enqueue(IOTask(this, getBufferView)); + IOHandler()->flush(internal::defaultFlushParams); + } + + // The backend might still refuse the operation even if backend managed + // buffers are generally supported, so check again + if (!getBufferView.out->backendManagedBuffer) { // note that data might have either // type shared_ptr or shared_ptr auto data = std::forward(createBuffer)(size); - out.ptr = static_cast(data.get()); + getBufferView.out->ptr = static_cast(data.get()); if (size > 0) { storeChunk(std::move(data), std::move(o), std::move(e)); diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 1c1d3db4a5..705080be84 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -589,9 +589,10 @@ OPENPMD_protected { return writable().written; } - enum class EnqueueAsynchronously : bool + enum class EnqueueAsynchronously : uint8_t { - Yes, + OnlyAsync, + Both, No }; /* diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 2f5a698289..e9e5c7e53f 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -1316,6 +1316,12 @@ void ADIOS2IOHandlerImpl::getBufferView( parameters.out->backendManagedBuffer = false; return; } + else if (parameters.queryOnly) + { + parameters.out->backendManagedBuffer = true; + return; + } + setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 83c4c3818b..085495db93 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -256,7 +256,7 @@ void Iteration::flushFileBased( * If it was written before, then in the context of another iteration. */ auto &attr = s.get().m_rankTable.m_attributable; - attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes); + attr.setWritten(false, Attributable::EnqueueAsynchronously::Both); s.get() .m_rankTable.m_attributable.get() .m_writable.abstractFilePosition.reset(); @@ -853,7 +853,7 @@ auto Iteration::beginStep( { bool previous = series.iterations.written(); series.iterations.setWritten( - false, Attributable::EnqueueAsynchronously::Yes); + false, Attributable::EnqueueAsynchronously::Both); auto oldStatus = IOHandl->m_seriesStatus; IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing; try @@ -870,7 +870,7 @@ auto Iteration::beginStep( } IOHandl->m_seriesStatus = oldStatus; series.iterations.setWritten( - previous, Attributable::EnqueueAsynchronously::Yes); + previous, Attributable::EnqueueAsynchronously::Both); } else if (thisObject.has_value()) { diff --git a/src/Series.cpp b/src/Series.cpp index 32595f42bc..e01e95ffb4 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1503,9 +1503,9 @@ void Series::flushFileBased( * current iteration by the backend) */ this->setWritten( - false, Attributable::EnqueueAsynchronously::Yes); + false, Attributable::EnqueueAsynchronously::Both); series.iterations.setWritten( - false, Attributable::EnqueueAsynchronously::Yes); + false, Attributable::EnqueueAsynchronously::Both); setDirty(dirty() || it->second.dirty()); std::string filename = iterationFilename(it->first); @@ -1996,7 +1996,7 @@ void Series::readOneIterationFileBased(std::string const &filePath) setWritten(false, Attributable::EnqueueAsynchronously::No); setIterationEncoding_internal( encoding_out, internal::default_or_explicit::explicit_); - setWritten(old_written, Attributable::EnqueueAsynchronously::Yes); + setWritten(old_written, Attributable::EnqueueAsynchronously::Both); } else throw std::runtime_error( diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index ce1c2936cb..dd9019b255 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -524,12 +524,18 @@ void Attributable::setWritten(bool val, EnqueueAsynchronously ea) switch (ea) { - case EnqueueAsynchronously::Yes: { + case EnqueueAsynchronously::OnlyAsync: { Parameter param; param.target_status = val; IOHandler()->enqueue(IOTask(this, param)); + return; + } + case EnqueueAsynchronously::Both: { + Parameter param; + param.target_status = val; + IOHandler()->enqueue(IOTask(this, param)); + break; } - break; case EnqueueAsynchronously::No: break; }