diff --git a/crt/aws-c-http b/crt/aws-c-http index acf313990..8acb511e6 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit acf31399077300c522315612dd2be09cfe48b5b8 +Subproject commit 8acb511e681f24060c436cfe3885ebf58b827839 diff --git a/include/aws/crt/http/HttpRequestResponse.h b/include/aws/crt/http/HttpRequestResponse.h index 7114c2ba2..f38124aa5 100644 --- a/include/aws/crt/http/HttpRequestResponse.h +++ b/include/aws/crt/http/HttpRequestResponse.h @@ -60,6 +60,13 @@ namespace Aws */ bool SetBody(const std::shared_ptr &body) noexcept; + /** + * Sets an async input stream as the message body + * @param body the async input stream representing the message body + * @return future indicating success/failure + */ + std::future SetBody(const std::shared_ptr &body) noexcept; + /** * Gets the number of headers contained in this request * @return the number of headers contained in this request @@ -101,6 +108,7 @@ namespace Aws Allocator *m_allocator; struct aws_http_message *m_message; std::shared_ptr m_bodyStream; + std::shared_ptr m_asyncBodyStream; }; /** diff --git a/include/aws/crt/io/Stream.h b/include/aws/crt/io/Stream.h index b41f36413..82c21aa33 100644 --- a/include/aws/crt/io/Stream.h +++ b/include/aws/crt/io/Stream.h @@ -7,8 +7,11 @@ #include #include #include +#include #include +#include + namespace Aws { namespace Crt @@ -192,6 +195,48 @@ namespace Aws private: std::shared_ptr m_stream; }; + + /** + * Interface for asynchronous input streams. + * Used for async HTTP request bodies. + */ + class AWS_CRT_CPP_API AsyncInputStream : public std::enable_shared_from_this, + public RefCounted + { + public: + virtual ~AsyncInputStream(); + + AsyncInputStream(const AsyncInputStream &) = delete; + AsyncInputStream &operator=(const AsyncInputStream &) = delete; + AsyncInputStream(AsyncInputStream &&) = delete; + AsyncInputStream &operator=(AsyncInputStream &&) = delete; + + explicit operator bool() const noexcept { return IsValid(); } + + virtual bool IsValid() const noexcept = 0; + + /// @private + aws_async_input_stream *GetUnderlyingStream() noexcept { return &m_underlying_stream; } + + protected: + Allocator *m_allocator; + aws_async_input_stream m_underlying_stream; + + AsyncInputStream(Aws::Crt::Allocator *allocator = ApiAllocator()); + + /** + * Asynchronously read into buffer. + * @return future - true on success (including EOF/no data available), false on error + */ + virtual std::future ReadImpl(ByteBuf &buffer) noexcept = 0; + + private: + static void s_Destroy(aws_async_input_stream *stream); + static aws_future_bool *s_Read(aws_async_input_stream *stream, aws_byte_buf *dest); + + static aws_async_input_stream_vtable s_vtable; + }; + } // namespace Io } // namespace Crt } // namespace Aws diff --git a/source/http/HttpRequestResponse.cpp b/source/http/HttpRequestResponse.cpp index 9be9aed87..8c766b404 100644 --- a/source/http/HttpRequestResponse.cpp +++ b/source/http/HttpRequestResponse.cpp @@ -57,12 +57,24 @@ namespace Aws bool HttpMessage::SetBody(const std::shared_ptr &body) noexcept { m_bodyStream = body; + m_asyncBodyStream = nullptr; aws_http_message_set_body_stream( m_message, m_bodyStream && *m_bodyStream ? m_bodyStream->GetUnderlyingStream() : nullptr); return true; } + std::future HttpMessage::SetBody(const std::shared_ptr &body) noexcept + { + m_asyncBodyStream = body; + m_bodyStream = nullptr; + aws_http_message_set_async_body_stream(m_message, nullptr); + + std::promise promise; + promise.set_value(body == nullptr || body->IsValid()); + return promise.get_future(); + } + size_t HttpMessage::GetHeaderCount() const noexcept { return aws_http_message_get_header_count(m_message); diff --git a/source/io/Stream.cpp b/source/io/Stream.cpp index b16267f10..8466eff8d 100644 --- a/source/io/Stream.cpp +++ b/source/io/Stream.cpp @@ -6,7 +6,9 @@ #include #include #include +#include +#include #include namespace Aws @@ -232,6 +234,47 @@ namespace Aws { return m_stream->peek(); } + + /*** AsyncInputStream implementation ***/ + + AsyncInputStream::~AsyncInputStream() {} + + void AsyncInputStream::s_Destroy(aws_async_input_stream *stream) + { + auto impl = static_cast(stream->impl); + impl->ReleaseRef(); + } + + aws_future_bool *AsyncInputStream::s_Read(aws_async_input_stream *stream, aws_byte_buf *dest) + { + auto impl = static_cast(stream->impl); + auto future = aws_future_bool_new(impl->m_allocator); + + std::shared_future cppFuture = impl->ReadImpl(*dest).share(); + + aws_future_bool_acquire(future); + std::thread( + [future, cppFuture]() + { + bool result = cppFuture.get(); + aws_future_bool_set_result(future, result); + aws_future_bool_release(future); + }) + .detach(); + + return future; + } + + aws_async_input_stream_vtable AsyncInputStream::s_vtable = { + AsyncInputStream::s_Destroy, + AsyncInputStream::s_Read, + }; + + AsyncInputStream::AsyncInputStream(Aws::Crt::Allocator *allocator) : m_allocator(allocator) + { + aws_async_input_stream_init_base(&m_underlying_stream, allocator, &s_vtable, this); + } + } // namespace Io } // namespace Crt } // namespace Aws