Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions runtime/onert/backend/trix/ops/BulkPipelineManager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BulkPipelineManager.h"

#include <iostream>
#include <sstream>
#include <algorithm>
#include <thread>
#include <chrono>

namespace onert
{
namespace backend
{
namespace trix
{
namespace ops
{

BulkPipelineManager::BulkPipelineManager(const PipelineConfig &config) : _config(config)
{
// DO NOTHING
}

BulkPipelineManager::~BulkPipelineManager() { shutdown(); }

bool BulkPipelineManager::initialize()
{
if (_initialized.load())
{
// Already initialized
return true;
}

try
{
createModels();
prepareModels();

_initialized = true;
return true;
}
catch (const std::exception &e)
{
std::cerr << "Failed to initialize pipeline: " + std::string(e.what()) << std::endl;
shutdown();
return false;
}
}

void BulkPipelineManager::shutdown()
{
if (!_initialized.load())
{
return;
}

_initialized = false;

// Wait until all executions are finished
while (_executing.load())
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

// Release models and clear buffer pool
for (auto &model : _models)
{
if (model)
{
model->release();
}
}
_models.clear();
}

void BulkPipelineManager::execute(const std::vector<const IPortableTensor *> &inputs,
std::vector<IPortableTensor *> &outputs)
{
if (!_initialized.load())
{
throw std::runtime_error("Pipeline is not initialized");
}

if (_models.empty())
{
throw std::runtime_error("No models in pipeline");
}

_executing = true;

try
{
auto current_inputs = inputs;
auto current_outputs = outputs;

for (size_t i = 0; i < _models.size(); ++i)
{
auto &model = _models[i];
if (!model || !model->isPrepared())
{
throw std::runtime_error("Model at index " + std::to_string(i) + " is not prepared");
}

// Wait for buffer ready before execution
model->waitForBufferReady();

// Execute model
model->run(current_inputs, current_outputs);

// The input of the next model is the output of the current model
if (i < _models.size() - 1)
{
current_inputs.clear();
for (const auto &output : current_outputs)
{
current_inputs.push_back(const_cast<IPortableTensor *>(output));
}
}
}
}
catch (...)
{
_executing = false;
throw;
}

_executing = false;
}

void BulkPipelineManager::createModels()
{
_models.clear();
_models.reserve(_config.model_paths.size());

for (size_t i = 0; i < _config.model_paths.size(); ++i)
{
auto model = std::make_shared<BulkPipelineModel>(_config.model_paths[i], _config.device_id);
if (!model->initialize())
{
throw std::runtime_error("Failed to initialize model: " + model->modelPath());
}
_models.push_back(model);
}
}

void BulkPipelineManager::prepareModels()
{
for (auto &model : _models)
{
if (!model->prepare())
{
throw std::runtime_error("Failed to prepare model: " + model->modelPath());
}
}
}

} // namespace ops
} // namespace trix
} // namespace backend
} // namespace onert
80 changes: 80 additions & 0 deletions runtime/onert/backend/trix/ops/BulkPipelineManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__
#define __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__

#include <memory>
#include <vector>
#include <string>
#include <array>
#include <atomic>
#include <mutex>
#include <exception>
#include <backend/IPortableTensor.h>
#include "BulkPipelineModel.h"

namespace onert
{
namespace backend
{
namespace trix
{
namespace ops
{

class BulkPipelineManager
{
public:
struct PipelineConfig
{
std::vector<std::string> model_paths;
int device_id{0};
};

public:
explicit BulkPipelineManager(const PipelineConfig &config);
~BulkPipelineManager();

// Disallow copying
BulkPipelineManager(const BulkPipelineManager &) = delete;
BulkPipelineManager &operator=(const BulkPipelineManager &) = delete;

bool initialize();
void shutdown();
bool isInitialized() const { return _initialized; }

void execute(const std::vector<const IPortableTensor *> &inputs,
std::vector<IPortableTensor *> &outputs);

private:
void createModels();
void prepareModels();

private:
PipelineConfig _config;
std::atomic<bool> _initialized{false};
std::atomic<bool> _executing{false};

std::vector<std::shared_ptr<BulkPipelineModel>> _models;
};

} // namespace ops
} // namespace trix
} // namespace backend
} // namespace onert

#endif // __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__
85 changes: 85 additions & 0 deletions runtime/onert/backend/trix/ops/test/BulkPipelineManager.test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "../BulkPipelineManager.h"
#include <gtest/gtest.h>

#include "mock_syscalls.h"

using namespace onert::backend::trix::ops;
using namespace onert::backend::trix::ops::test;

class BulkPipelineManagerTest : public ::testing::Test
{
protected:
void SetUp() override
{
BulkPipelineManager::PipelineConfig config;
config.device_id = 0;
config.model_paths.push_back("model_path");
manager = std::make_unique<BulkPipelineManager>(config);

// Reset all mock syscalls before each test
MockSyscallsManager::getInstance().resetAll();

MockSyscallsManager::getInstance().setFreadHook(
[](void *ptr, size_t size, size_t, FILE *) -> int {
if (size == NPUBIN_META_SIZE)
{
auto meta = reinterpret_cast<npubin_meta *>(ptr);
meta->program_size = 1024;
meta->weight_size = 1024;
meta->size = 4096;
}
return 1;
});

MockSyscallsManager::getInstance().setIoctlHook(
[](int, unsigned long request, void *arg) -> int {
// Get Version
if (request == _IOR(0x88, 1, unsigned int))
{
// Return version 3.2.X.X for trix backend sanity checking
*static_cast<int *>(arg) = 0x3020000;
}
return 0;
});
}
void TearDown() override {}

std::unique_ptr<BulkPipelineManager> manager;
};

TEST_F(BulkPipelineManagerTest, test_initilize)
{
EXPECT_TRUE(manager->initialize());
EXPECT_TRUE(manager->isInitialized());
}

TEST_F(BulkPipelineManagerTest, test_shutdown)
{
EXPECT_TRUE(manager->initialize());
manager->shutdown();
EXPECT_FALSE(manager->isInitialized());
}

TEST_F(BulkPipelineManagerTest, test_execute)
{
EXPECT_TRUE(manager->initialize());
const std::vector<const onert::backend::IPortableTensor *> inputs;
std::vector<onert::backend::IPortableTensor *> outputs;
EXPECT_NO_THROW(manager->execute(inputs, outputs));
}