Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ if(NOT CMAKE_CROSSCOMPILING)
add_subdirectory(bin/elasticurl_cpp)
add_subdirectory(bin/mqtt5_app)
add_subdirectory(bin/mqtt5_canary)
add_subdirectory(bin/stream_comparison)
endif()
endif()
endif()
28 changes: 28 additions & 0 deletions bin/stream_comparison/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
project(stream_comparison CXX)

set(STREAM_COMPARISON_PROJECT_NAME stream_comparison)
add_executable(${STREAM_COMPARISON_PROJECT_NAME} main.cpp)

aws_add_sanitizers(${STREAM_COMPARISON_PROJECT_NAME})

set_target_properties(${STREAM_COMPARISON_PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
set_target_properties(${STREAM_COMPARISON_PROJECT_NAME} PROPERTIES CXX_STANDARD ${CMAKE_CXX_STANDARD})

if (MSVC)
if(AWS_STATIC_MSVC_RUNTIME_LIBRARY OR STATIC_CRT)
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE "/MT$<$<CONFIG:Debug>:d>")
else()
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE "/MD$<$<CONFIG:Debug>:d>")
endif()
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE /W4 /WX)
else ()
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror)
endif ()

target_compile_definitions(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE $<$<CONFIG:Debug>:DEBUG_BUILD>)

target_include_directories(${STREAM_COMPARISON_PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

target_link_libraries(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE aws-crt-cpp)
233 changes: 233 additions & 0 deletions bin/stream_comparison/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*
* Comparison test: InputStream (sync) vs AsyncInputStream (async)
* Demonstrates CPU usage difference when data source is slow.
*/

#include <aws/crt/Api.h>
#include <aws/crt/io/Stream.h>

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

using namespace Aws::Crt;

static std::atomic<int> g_readCallCount{0};
static const int CHUNK_COUNT = 5;
static const int CHUNK_DELAY_MS = 500;

/**
* Synchronous stream with simulated slow data source.
* CRT polls ReadImpl() repeatedly - causes hot loop when data isn't ready.
*/
class SlowSyncStream : public Io::InputStream
{
mutable int m_chunksRemaining = CHUNK_COUNT;
mutable std::chrono::steady_clock::time_point m_nextDataTime;

public:
SlowSyncStream() : Io::InputStream(), m_nextDataTime(std::chrono::steady_clock::now()) {}

bool IsValid() const noexcept override { return true; }

protected:
bool ReadImpl(ByteBuf &buffer) noexcept override
{
g_readCallCount++;

auto now = std::chrono::steady_clock::now();

// No data ready yet - return without writing
if (now < m_nextDataTime)
{
return true;
}

// EOF
if (m_chunksRemaining <= 0)
{
return true;
}

// Write chunk
const char *chunk = "chunk";
aws_byte_buf_write(&buffer, (const uint8_t *)chunk, 5);
m_chunksRemaining--;
m_nextDataTime = now + std::chrono::milliseconds(CHUNK_DELAY_MS);
return true;
}

bool ReadSomeImpl(ByteBuf &buffer) noexcept override
{
return ReadImpl(buffer);
}

Io::StreamStatus GetStatusImpl() const noexcept override
{
Io::StreamStatus status;
status.is_valid = true;
status.is_end_of_stream = (m_chunksRemaining <= 0);
return status;
}

int64_t GetLengthImpl() const noexcept override
{
return -1; // Unknown length
}

bool SeekImpl(int64_t, Io::StreamSeekBasis) noexcept override { return false; }
int64_t PeekImpl() const noexcept override { return 0; }
};

/**
* Asynchronous stream with simulated slow data source.
* ReadImpl() called once per chunk - callback fires when data ready.
*/
class SlowAsyncStream : public Io::AsyncInputStream
{
int m_chunksRemaining = CHUNK_COUNT;

public:
SlowAsyncStream() : Io::AsyncInputStream() {}

bool IsValid() const noexcept override { return true; }

// Public wrapper for testing
void Read(ByteBuf &buffer, std::function<void(bool)> onComplete)
{
ReadImpl(buffer, std::move(onComplete));
}

protected:
void ReadImpl(ByteBuf &buffer, std::function<void(bool)> onComplete) noexcept override
{
g_readCallCount++;

if (m_chunksRemaining <= 0)
{
onComplete(true); // EOF
return;
}

// Simulate async wait for data
std::thread(
[this, &buffer, onComplete]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(CHUNK_DELAY_MS));

const char *chunk = "chunk";
aws_byte_buf_write(&buffer, (const uint8_t *)chunk, 5);
m_chunksRemaining--;
onComplete(true);
})
.detach();
}
};

void testSyncStream()
{
g_readCallCount = 0;
auto stream = std::make_shared<SlowSyncStream>();

uint8_t buf[64];
ByteBuf buffer = aws_byte_buf_from_empty_array(buf, sizeof(buf));

auto start = std::chrono::steady_clock::now();

// Simulate CRT polling loop
while (true)
{
Io::StreamStatus status;
stream->GetStatus(status);
if (status.is_end_of_stream)
break;

buffer.len = 0;
stream->Read(buffer);
}

auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();

std::cout << "=== InputStream (sync) ===" << std::endl;
std::cout << "ReadImpl calls: " << g_readCallCount << std::endl;
std::cout << "Time: " << ms << "ms" << std::endl;
std::cout << std::endl;
}

void testAsyncStream()
{
g_readCallCount = 0;
auto stream = std::make_shared<SlowAsyncStream>();

uint8_t buf[64];
ByteBuf buffer = aws_byte_buf_from_empty_array(buf, sizeof(buf));

std::mutex mtx;
std::condition_variable cv;
bool done = false;
int chunksRead = 0;

auto start = std::chrono::steady_clock::now();

std::function<void()> readNext = [&]()
{
buffer.len = 0;
stream->Read(buffer, [&](bool success)
{
if (!success || buffer.len == 0)
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
cv.notify_one();
return;
}
chunksRead++;
if (chunksRead >= CHUNK_COUNT)
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
cv.notify_one();
}
else
{
readNext();
}
});
};

readNext();

std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&] { return done; });

auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();

std::cout << "=== AsyncInputStream (async) ===" << std::endl;
std::cout << "ReadImpl calls: " << g_readCallCount << std::endl;
std::cout << "Time: " << ms << "ms" << std::endl;
std::cout << std::endl;
}

int main()
{
ApiHandle apiHandle;

std::cout << "Stream Comparison Test" << std::endl;
std::cout << "Chunks: " << CHUNK_COUNT << ", Delay: " << CHUNK_DELAY_MS << "ms each" << std::endl;
std::cout << "Expected time: ~" << (CHUNK_COUNT * CHUNK_DELAY_MS) << "ms" << std::endl;
std::cout << std::endl;

testSyncStream();
testAsyncStream();

std::cout << "Sync stream polls continuously (high CPU)." << std::endl;
std::cout << "Async stream waits for callback (idle CPU)." << std::endl;

return 0;
}
8 changes: 8 additions & 0 deletions include/aws/crt/http/HttpRequestResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ namespace Aws
*/
bool SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept;

/**
* Sets an async input stream as the message body
* @param body the async input stream representing the message body
* @return future<bool> indicating success/failure
*/
std::future<bool> SetBody(const std::shared_ptr<Aws::Crt::Io::AsyncInputStream> &body) noexcept;

/**
* Gets the number of headers contained in this request
* @return the number of headers contained in this request
Expand Down Expand Up @@ -101,6 +108,7 @@ namespace Aws
Allocator *m_allocator;
struct aws_http_message *m_message;
std::shared_ptr<Aws::Crt::Io::InputStream> m_bodyStream;
std::shared_ptr<Aws::Crt::Io::AsyncInputStream> m_asyncBodyStream;
};

/**
Expand Down
46 changes: 46 additions & 0 deletions include/aws/crt/io/Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
#include <aws/crt/Exports.h>
#include <aws/crt/RefCounted.h>
#include <aws/crt/Types.h>
#include <aws/io/async_stream.h>
#include <aws/io/stream.h>

#include <future>
#include <functional>

namespace Aws
{
namespace Crt
Expand Down Expand Up @@ -192,6 +196,48 @@ namespace Aws
private:
std::shared_ptr<Aws::Crt::Io::IStream> m_stream;
};

/**
* Interface for asynchronous input streams.
* Used for async HTTP request bodies.
*/
class AWS_CRT_CPP_API AsyncInputStream : public std::enable_shared_from_this<AsyncInputStream>,
public RefCounted<AsyncInputStream>
{
public:
virtual ~AsyncInputStream();

AsyncInputStream(const AsyncInputStream &) = delete;
AsyncInputStream &operator=(const AsyncInputStream &) = delete;
AsyncInputStream(AsyncInputStream &&) = delete;
AsyncInputStream &operator=(AsyncInputStream &&) = delete;

explicit operator bool() const noexcept { return IsValid(); }

virtual bool IsValid() const noexcept = 0;

/// @private
aws_async_input_stream *GetUnderlyingStream() noexcept { return &m_underlying_stream; }

protected:
Allocator *m_allocator;
aws_async_input_stream m_underlying_stream;

AsyncInputStream(Aws::Crt::Allocator *allocator = ApiAllocator());

/**
* Asynchronously read into buffer.
* Call onComplete(true) on success (including EOF), or onComplete(false) on error.
*/
virtual void ReadImpl(ByteBuf &buffer, std::function<void(bool)> onComplete) noexcept = 0;

private:
static void s_Destroy(aws_async_input_stream *stream);
static aws_future_bool *s_Read(aws_async_input_stream *stream, aws_byte_buf *dest);

static aws_async_input_stream_vtable s_vtable;
};

} // namespace Io
} // namespace Crt
} // namespace Aws
12 changes: 12 additions & 0 deletions source/http/HttpRequestResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ namespace Aws
bool HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept
{
m_bodyStream = body;
m_asyncBodyStream = nullptr;
aws_http_message_set_body_stream(
m_message, m_bodyStream && *m_bodyStream ? m_bodyStream->GetUnderlyingStream() : nullptr);

return true;
}

std::future<bool> HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::AsyncInputStream> &body) noexcept
{
m_asyncBodyStream = body;
m_bodyStream = nullptr;
aws_http_message_set_async_body_stream(m_message, nullptr);

std::promise<bool> promise;
promise.set_value(body == nullptr || body->IsValid());
return promise.get_future();
}

size_t HttpMessage::GetHeaderCount() const noexcept
{
return aws_http_message_get_header_count(m_message);
Expand Down
Loading
Loading