Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ if (WIN32)
target_link_libraries(${TARGET_NAME} PRIVATE Shlwapi)
endif()

find_package(Threads REQUIRED)
target_link_libraries(${TARGET_NAME} PRIVATE Threads::Threads)

target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${UTIL_INCLUDE_DIR}>)

ov_add_clang_format_target(${TARGET_NAME}_clang FOR_TARGETS ${TARGET_NAME})
Expand Down
10 changes: 10 additions & 0 deletions src/common/util/include/openvino/util/file_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ std::filesystem::path get_plugin_path(const std::filesystem::path& plugin,
*/
std::vector<uint8_t> load_binary(const std::filesystem::path& path);

/**
* @brief Reads data from file into buffer with optimized method (Parallel IO)
* @param path File path
* @param buffer Destination buffer
* @param size Number of bytes to read
* @param offset Offset in file
* @return true if read successful
*/
bool read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset);

/**
* @brief save binary data to file
* @param path - binary file path to store
Expand Down
196 changes: 194 additions & 2 deletions src/common/util/src/file_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
#include "openvino/util/file_util.hpp"

#include <algorithm>
#include <atomic>
#include <climits>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <sstream>
#include <string_view>
#include <thread>
#include <vector>

#include "openvino/util/common_util.hpp"

Expand All @@ -18,11 +24,16 @@
# define NOMINMAX
# endif
# include <direct.h>
# include <malloc.h>
# include <shlwapi.h>
# include <windows.h>
#else
# include <dirent.h>
# include <dlfcn.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <sys/types.h>
# include <unistd.h>
#endif

std::filesystem::path ov::util::get_directory(const std::filesystem::path& path) {
Expand Down Expand Up @@ -250,12 +261,193 @@ std::filesystem::path ov::util::get_plugin_path(const std::filesystem::path& plu
std::vector<uint8_t> ov::util::load_binary(const std::filesystem::path& path) {
std::vector<uint8_t> buffer;
if (auto input = std::ifstream(path, std::ios::binary); input.is_open()) {
buffer.reserve(std::filesystem::file_size(path));
input.read(reinterpret_cast<char*>(buffer.data()), buffer.capacity());
buffer.resize(std::filesystem::file_size(path));
input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
}
return buffer;
}

#ifdef _WIN32
bool ov::util::read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this implementation to dedicated file for windows under os folder

if (path.empty())
return false;

// CreateFileW expects wchar_t*
const std::wstring& wpath = path.native();

HANDLE hFile = CreateFileW(wpath.c_str(),
Comment on lines +276 to +278
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const std::wstring& wpath = path.native();
HANDLE hFile = CreateFileW(wpath.c_str(),
HANDLE hFile = CreateFileW(path.c_str(),

GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (hFile == INVALID_HANDLE_VALUE)
return false;

// Safety check: File size
LARGE_INTEGER fileSize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LARGE_INTEGER fileSize;
LARGE_INTEGER file_size;

Use snake_case for variables

if (GetFileSizeEx(hFile, &fileSize)) {
if (static_cast<unsigned long long>(fileSize.QuadPart) < offset + size) {
CloseHandle(hFile);
return false;
}
}
CloseHandle(hFile);

const size_t num_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()), size / (1024 * 1024));
if (num_threads <= 1) {
// Fallback to single threaded read if size is small or bad concurrency
HANDLE s_hFile = CreateFileW(wpath.c_str(),
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (s_hFile == INVALID_HANDLE_VALUE)
return false;

OVERLAPPED ov = {0};
ov.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((offset >> 32) & 0xFFFFFFFF);
DWORD bytesRead = 0;
// Note: ReadFile takes DWORD (32-bit) for size. If size > 4GB, this simple fallback needs loop.
// But for single threaded simple read we might just fail or loop.
// Let's implement loop for correctness.

char* current_ptr = static_cast<char*>(buffer);
size_t remaining_size = size;
size_t current_file_offset = offset;
bool success = true;

while (remaining_size > 0) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));
ov.Offset = static_cast<DWORD>(current_file_offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((current_file_offset >> 32) & 0xFFFFFFFF);

if (!ReadFile(s_hFile, current_ptr, to_read, &bytesRead, &ov) || bytesRead != to_read) {
if (GetLastError() != ERROR_IO_PENDING) {
success = false;
break;
}
}
remaining_size -= bytesRead;
current_ptr += bytesRead;
current_file_offset += bytesRead;
}
CloseHandle(s_hFile);
return success;
}

std::vector<std::future<void>> futures;
size_t chunk_size = size / num_threads;
chunk_size = (chunk_size + 4095) & ~4095;

size_t current_offset = 0;
std::atomic<bool> overall_status{true};

for (size_t i = 0; i < num_threads; i++) {
size_t read_size = (i == num_threads - 1) ? (size - current_offset) : chunk_size;
if (read_size == 0)
break;

void* ptr = static_cast<char*>(buffer) + current_offset;
size_t file_offset = offset + current_offset;

futures.emplace_back(std::async(std::launch::async, [wpath, file_offset, ptr, read_size, &overall_status] {
HANDLE t_hFile = CreateFileW(wpath.c_str(),
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (t_hFile == INVALID_HANDLE_VALUE) {
overall_status = false;
return;
}

size_t remaining_size = read_size;
char* current_ptr = static_cast<char*>(ptr);
size_t current_file_offset = file_offset;

while (remaining_size > 0 && overall_status) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));

OVERLAPPED ov = {0};
ov.Offset = static_cast<DWORD>(current_file_offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((current_file_offset >> 32) & 0xFFFFFFFF);

DWORD bytesRead = 0;
if (!ReadFile(t_hFile, current_ptr, to_read, &bytesRead, &ov) || bytesRead != to_read) {
if (GetLastError() != ERROR_IO_PENDING) {
overall_status = false;
break;
}
Comment on lines +375 to +387
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKER] Windows implementation: on ReadFile failure you treat ERROR_IO_PENDING as non-fatal, but the handles are opened without FILE_FLAG_OVERLAPPED and there is no GetOverlappedResult/event wait. If ERROR_IO_PENDING ever occurs, bytesRead can remain 0, causing an infinite loop or silent short read. Suggestion: either open with FILE_FLAG_OVERLAPPED and properly wait for completion, or (simpler) treat any ReadFile failure / short read as an immediate error and return false.

Suggested change
while (remaining_size > 0 && overall_status) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));
OVERLAPPED ov = {0};
ov.Offset = static_cast<DWORD>(current_file_offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((current_file_offset >> 32) & 0xFFFFFFFF);
DWORD bytesRead = 0;
if (!ReadFile(t_hFile, current_ptr, to_read, &bytesRead, &ov) || bytesRead != to_read) {
if (GetLastError() != ERROR_IO_PENDING) {
overall_status = false;
break;
}
LARGE_INTEGER li;
li.QuadPart = static_cast<LONGLONG>(file_offset);
if (!SetFilePointerEx(t_hFile, li, nullptr, FILE_BEGIN)) {
overall_status = false;
CloseHandle(t_hFile);
return;
}
while (remaining_size > 0 && overall_status) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));
DWORD bytesRead = 0;
if (!ReadFile(t_hFile, current_ptr, to_read, &bytesRead, nullptr) || bytesRead != to_read) {
overall_status = false;
break;

Copilot uses AI. Check for mistakes.
}

remaining_size -= bytesRead;
current_ptr += bytesRead;
current_file_offset += bytesRead;
}
CloseHandle(t_hFile);
}));

current_offset += read_size;
}

for (auto& f : futures) {
f.get();
}
return overall_status;
}
#else
bool ov::util::read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs.is_open())
return false;

const size_t num_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()), size / (1024 * 1024));
// Fallback to single thread if not enough work or threads
if (num_threads <= 1) {
ifs.seekg(offset, std::ios::beg);
ifs.read(static_cast<char*>(buffer), size);
return ifs.good();
}

std::vector<std::future<void>> futures;
size_t chunk_size = size / num_threads;
chunk_size = (chunk_size + 4095) & ~4095;
size_t current_offset = 0;

// We open file in each thread to have independent file pointers
for (size_t i = 0; i < num_threads; i++) {
size_t read_size = (i == num_threads - 1) ? (size - current_offset) : chunk_size;
if (read_size == 0)
break;

void* ptr = static_cast<char*>(buffer) + current_offset;
size_t file_offset = offset + current_offset;

futures.emplace_back(std::async(std::launch::async, [path, file_offset, ptr, read_size] {
std::ifstream t_ifs(path, std::ios::binary);
if (t_ifs.is_open()) {
t_ifs.seekg(file_offset, std::ios::beg);
t_ifs.read(static_cast<char*>(ptr), read_size);
}
}));

current_offset += read_size;
}

for (auto& f : futures) {
f.get();
}
return true; // Simplified error handling for parallel ifstream
}
Comment on lines +433 to +448
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKER] read_binary_file_parallel on non-Windows always returns true and doesn’t validate that each thread successfully opened/seeked/read the requested bytes. This can silently return partially/uninitialized data and cause model corruption. Please propagate per-thread read/seek failures via an atomic status, validate file size vs offset+size, and return false if any chunk read fails (and consider a single-thread fallback when parallel read fails).

Copilot uses AI. Check for mistakes.
#endif

void ov::util::save_binary(const std::filesystem::path& path, const void* binary, size_t bin_size) {
if (std::ofstream out_file(path, std::ios::binary); out_file.is_open()) {
out_file.write(reinterpret_cast<const char*>(binary), bin_size);
Expand Down
13 changes: 13 additions & 0 deletions src/inference/src/dev/core_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,19 @@ ov::SoPtr<ov::ICompiledModel> ov::CoreImpl::load_model_from_cache(
}
}

// Pass the cached blob file path to plugins that support it (e.g. GPU plugin)
// so they can use optimized parallel I/O to read weights directly from the blob file
if (!cacheContent.m_blob_id.empty() && util::contains(plugin.get_property(ov::supported_properties),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core logic should be avoided, especially for device specific properties.
The cache entry is manage by cache manger and here should not be any logic to add such property or bypass what is opened cache manger. Also use any hardcoded path is not correct.

The proper solution is open the stream (fast version) or (mmap) version which allow to read the parallel better read.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At early time, I also hope to do so, but ifstream cannot meet requirement of parallel read due to each thread need seek to different offset to read data. Could you give some test sample of such parallel read with stream?

ov::PropertyName("GPU_CACHED_BLOB_PATH"))) {
if (auto cache_dir_it = config.find(ov::cache_dir.name()); cache_dir_it != config.end()) {
auto blob_path = std::filesystem::path(cache_dir_it->second.as<std::string>()) /
(cacheContent.m_blob_id + ".blob");
if (ov::util::file_exists(blob_path)) {
update_config["GPU_CACHED_BLOB_PATH"] = util::path_to_string(blob_path);
}
}
}

ov::util::VariantVisitor model_importer{
[&](const ov::Tensor& compiled_blob) -> ov::SoPtr<ov::ICompiledModel> {
const ov::Tensor compiled_blob_without_header{compiled_blob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ class BinaryInputBuffer : public InputBuffer<BinaryInputBuffer> {
virtual void read(void* const data, std::streamsize size) {
auto const read_size = _stream.rdbuf()->sgetn(reinterpret_cast<char*>(data), size);
OPENVINO_ASSERT(read_size == size,
"[GPU] Failed to read " + std::to_string(size) + " bytes from stream! Read " + std::to_string(read_size));
"[GPU] Failed to read " + std::to_string(size) + " bytes to stream! Read " + std::to_string(read_size));
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion message says "Failed to read ... bytes to stream" but this function reads from the input stream. Please change wording back to "from stream" to avoid confusing diagnostics when cache import fails.

Suggested change
"[GPU] Failed to read " + std::to_string(size) + " bytes to stream! Read " + std::to_string(read_size));
"[GPU] Failed to read " + std::to_string(size) + " bytes from stream! Read " + std::to_string(read_size));

Copilot uses AI. Check for mistakes.
}

std::istream& get_stream() { return _stream; }

void setKernelImplParams(void* impl_params) { _impl_params = impl_params; }
void* getKernelImplParams() const { return _impl_params; }

Expand Down
Loading
Loading