Skip to content

Commit c07016c

Browse files
obdevshenyunlong
authored andcommitted
revert commit 4c0ae9138d1daccad6fa89deee802cce6645b701 and commit 2e865c128a3072550384423840df795b6f79dd2b
Co-authored-by: shenyunlong.syl <ylshen0919@gmail.com>
1 parent 3842221 commit c07016c

10 files changed

+252
-644
lines changed

src/share/ob_debug_sync_point.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,6 @@ class ObString;
757757
ACT(BEFORE_INC_MAJOR_DDL_MERGE_UPDATE_TABLE_STORE,)\
758758
ACT(AFTER_INC_MAJOR_MERGE_GET_SSTABLE,)\
759759
ACT(AFTER_GET_MTL_TENANT_LOCK,)\
760-
ACT(BEFORE_DO_EMBEDDING_TASK,)\
761-
ACT(BEFORE_COMPLETE_EMBEDDING_TASK,)\
762760
ACT(MAX_DEBUG_SYNC_POINT,)
763761

764762
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

src/share/vector_index/ob_hybrid_vector_refresh_task.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,6 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
508508
storage::ObTableScanParam *&table_scan_param = task_ctx->table_scan_param_;
509509
schema::ObTableParam *&table_param = task_ctx->table_param_;
510510
storage::ObValueRowIterator &delta_delete_iter = task_ctx->delta_delete_iter_;
511-
ObCollationType col_type = CS_TYPE_INVALID;
512511
int64_t dim = 0;
513512
int64_t loop_cnt = 0;
514513
int64_t http_timeout_us = 0;
@@ -529,8 +528,6 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
529528
LOG_WARN("unexpected error", K(ret), KPC(task_ctx));
530529
} else if (OB_FAIL(adaptor.get_dim(dim))) {
531530
LOG_WARN("get dim failed", K(ret));
532-
} else if (OB_FAIL(ObVectorIndexUtil::get_index_column_collation_type(tenant_id_, adaptor.get_embedded_table_id(), col_type))) {
533-
LOG_WARN("failed to get chunc column col_type", K(ret), K(adaptor));
534531
} else {
535532
if (OB_NOT_NULL(tsc_iter) || OB_NOT_NULL(table_scan_param) || OB_NOT_NULL(table_param)) {
536533
if (OB_ISNULL(tsc_iter) || OB_ISNULL(table_scan_param) || OB_ISNULL(table_param)) {
@@ -659,7 +656,6 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
659656
const ObAiModelEndpointInfo *endpoint = task_ctx->endpoint_; // endpoint should not be null after init.
660657
task_ctx->embedding_task_ = new(task_buf)ObEmbeddingTask(task_ctx->allocator_);
661658
ObPluginVectorIndexService *service = MTL(ObPluginVectorIndexService *);
662-
663659
if (OB_ISNULL(service)) {
664660
ret = OB_ERR_UNEXPECTED;
665661
LOG_WARN("unexpected null ptr", K(ret), KPC(service));
@@ -668,8 +664,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
668664
} else if (OB_FAIL(ob_write_string(task_ctx->allocator_, endpoint->get_url(), url, true))) {
669665
LOG_WARN("fail to write string", K(ret));
670666
} else if (OB_FAIL(task_ctx->embedding_task_->init(url, endpoint->get_request_model_name(),
671-
endpoint->get_provider(), access_key, chunk_array, col_type, dim, http_timeout_us,
672-
http_max_retries, ctx_->task_status_.task_id_, ObEmbeddingTasSourceType::ASYNC_INDEX))) {
667+
endpoint->get_provider(), access_key, chunk_array, dim, http_timeout_us, http_max_retries))) {
673668
LOG_WARN("failed to init embedding task", K(ret), KPC(endpoint));
674669
} else {
675670
ObEmbeddingTaskHandler *embedding_handler = nullptr;

src/share/vector_index/ob_vector_embedding_handler.cpp

Lines changed: 208 additions & 385 deletions
Large diffs are not rendered by default.

src/share/vector_index/ob_vector_embedding_handler.h

Lines changed: 9 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,6 @@ class ObEmbeddingTaskPhaseManager {
155155
}
156156
};
157157

158-
enum class ObEmbeddingTasSourceType
159-
{
160-
INDEX_PIPELINE = 0,
161-
ASYNC_INDEX = 1,
162-
};
163-
164158
class ObEmbeddingTaskHandler;
165159

166160
// Constants for field lengths
@@ -175,14 +169,10 @@ class ObEmbeddingTask
175169
const ObString &provider,
176170
const ObString &user_key,
177171
const ObIArray<ObString> &input_chunks,
178-
const ObCollationType col_type,
179172
int64_t dimension,
180173
int64_t http_timeout_us,
181174
int64_t http_max_retries,
182-
int64_t source_task_id,
183-
ObEmbeddingTasSourceType source_task_type,
184-
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr,
185-
bool always_retry = false);
175+
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr);
186176
template <typename ThreadPoolType>
187177
int do_work(ThreadPoolType *thread_pool);
188178
int64_t get_task_id() const { return task_id_; }
@@ -191,56 +181,29 @@ class ObEmbeddingTask
191181

192182
TO_STRING_KV(K_(is_inited),
193183
K_(task_id),
194-
K_(phase),
195184
K_(model_url),
196185
K_(model_name),
186+
K_(user_key),
197187
K(input_chunks_.count()),
198188
K(output_vectors_.count()),
199189
K_(dimension),
200190
K_(batch_size),
201-
K_(current_batch_idx),
202-
K_(http_total_retry_count),
203-
K_(http_retry_count),
204-
K_(http_max_retry_count),
205-
K_(http_retry_start_time_us),
206-
K_(http_last_retry_time_us),
207191
K_(processed_chunks),
208192
K_(total_chunks),
209-
K_(process_callback_offset),
210-
K_(col_type),
211-
K_(current_input_token_limit),
212-
K_(need_cancel),
213-
K_(source_task_id),
214-
K_(source_task_type),
215-
K_(always_retry),
216-
K_(next_handle_task_time_us),
217-
K_(internal_error_code));
193+
K_(process_callback_offset));
218194
bool is_completed();
219195
void retain_if_managed();
220196
void release_if_managed();
221197
int get_async_result(ObArray<float*> &output_vectors);
222198
// 公共方法用于外部设置任务失败
223199
int mark_task_failed(int error_code);
224200
int maybe_callback();
225-
// Wait for task completion with periodic check interval
226-
// @param check_interval_us: the interval to wait before returning, must be > 0
227-
// @return:
228-
// - OB_INVALID_ARGUMENT: if check_interval_us <= 0
229-
// - OB_NOT_INIT: if task is not initialized
230-
// - OB_SUCCESS: if task completed (callback_done_ is true)
231-
// - OB_TIMEOUT: if task has exceeded wait_for_completion_timeout_us_ (real timeout)
232-
// - OB_EAGAIN: if wait timed out but task not yet exceeded total timeout,
233-
// caller should check cancel status and retry
234-
// Usage: call in a loop, check for cancel between calls, handle OB_TIMEOUT as task failure
235-
int wait_for_completion(int64_t check_interval_us);
201+
int wait_for_completion();
236202
int wake_up();
237203
void disable_callback();
238204
void set_callback_done();
239205
bool need_callback() { return cb_handle_ != nullptr ? true : false; };
240-
void set_need_cancel() { ATOMIC_STORE(&need_cancel_, true); }
241-
bool need_cancel() const { return ATOMIC_LOAD(&need_cancel_); }
242-
common::ObCurTraceId::TraceId get_trace_id() const { return trace_id_; }
243-
void set_always_retry(bool always_retry) { always_retry_ = always_retry; }
206+
244207
public:
245208
static const ObString MODEL_URL_NAME;
246209
static const ObString MODEL_NAME_NAME;
@@ -252,35 +215,28 @@ class ObEmbeddingTask
252215
static const ObString USER_KEY_NAME;
253216
static const ObString INPUT_NAME;
254217
static const ObString DIMENSIONS_NAME;
255-
static const ObString REQUEST_TOO_LARGE_ERROR_MSG;
256218

257219
static const int64_t HTTP_REQUEST_TIMEOUT; // 20 seconds
258220

259221
// Reschedule related constants
260222
static const int64_t MAX_RESCHEDULE_RETRY_CNT;
261223
static const int64_t RESCHEDULE_RETRY_INTERVAL_US;
262-
static const int64_t MAX_NEXT_HANDLE_INTERVAL_US;
263224

264225
// HTTP retry related constants
265226
static const int64_t MAX_HTTP_RETRY_CNT;
266227
static const int64_t HTTP_RETRY_BASE_INTERVAL_US;
267228
static const int64_t HTTP_RETRY_MAX_INTERVAL_US;
268229
static const int64_t HTTP_RETRY_MULTIPLIER;
269-
static const int64_t ALWAYS_RETRY_MIN_INTERVAL_US;
270-
static const int64_t MIN_EMBEDDING_MODEL_RPM;
271-
static const int64_t EMBEDDING_MODEL_WAIT_RATIO;
272230

273231
// Callback related constants
274232
static const int64_t CALLBACK_BATCH_SIZE;
275-
static const int64_t MAX_INPUT_TOKEN; // Default max token count for each input: 512
276233

277234
private:
278-
void init_members(); // Common initialization for all constructors
279235
void reset();
280236
bool is_finished() const; // Internal use only - no lock needed
281-
void set_stop(int ret_code);
237+
void set_stop();
282238
int set_phase(ObEmbeddingTaskPhase new_phase);
283-
int complete_task(int result_code, bool finished = true);
239+
int complete_task(ObEmbeddingTaskPhase new_phase, int result_code, bool finished = true);
284240
int start_async_work();
285241
int check_async_progress();
286242

@@ -300,24 +256,14 @@ class ObEmbeddingTask
300256
int parse_embedding_response(const char *response_data, size_t response_size);
301257

302258
// Helper methods for retry logic
303-
bool should_retry_http_request(int64_t http_error_code, const ObString &http_error_msg) const;
259+
bool should_retry_http_request(int64_t http_error_code) const;
304260
bool is_batch_size_related_error(int64_t http_error_code) const;
305261
int64_t calculate_retry_interval() const;
306262
int adjust_batch_size_for_retry();
307-
int adjust_input_token_limit_for_retry();
308263
void reset_retry_state();
309264
int map_http_error_to_internal_error(int64_t http_error_code) const;
310265
void try_increase_batch_size();
311266
int init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us);
312-
bool is_request_too_large(int64_t http_error_code, const ObString &content) const;
313-
int truncate_text_by_token_count(ObString &text, const ObCollationType cs_type, const int64_t max_token_count) const;
314-
bool can_retry_request() const { return http_retry_count_ < http_max_retry_count_; }
315-
bool is_ready_to_handle() const; // Check if current time exceeds next_handle_task_time_us_
316-
int finish_task();
317-
int try_rescheule_task();
318-
int calc_max_wait_completion_time_us(int64_t http_timeout_us, int64_t http_max_retry_count,
319-
int64_t batch_size, int64_t input_chunks_count,
320-
int64_t &max_wait_completion_time_us) const;
321267

322268
struct HttpResponseData {
323269
HttpResponseData(ObIAllocator &allocator) : data(nullptr), size(0), allocator(allocator) {}
@@ -419,31 +365,20 @@ class ObEmbeddingTask
419365
int64_t wait_for_completion_timeout_us_; // For controlling the maximum timeout of waiting for completion
420366

421367
bool need_retry_flag_;
422-
bool always_retry_; // If true, task can retry even after exceeding max retry count, but must wait at least 3 minutes
423-
int64_t last_exceeded_retry_time_us_; // Time when retry count was exceeded
424368

425369
// Batch size adjustment for retry
426370
uint32_t original_batch_size_;
427371
bool batch_size_adjusted_;
428372
uint32_t current_batch_size_;
429373
uint32_t successful_requests_count_;
430374

431-
// Input token limit adjustment for retry (when request too large)
432-
int64_t current_input_token_limit_;
433-
434375
ObThreadCond task_cond_;
435376
bool callback_done_;
436377

437378
// TODO(fanfangyao.ffy): use taskhandle to manage task reference count
438379
// ref_cnt_ is only used to track the reference count of the post create embedding task
439380
int64_t ref_cnt_;
440-
ObCollationType col_type_;
441-
bool need_cancel_;
442-
common::ObCurTraceId::TraceId trace_id_;
443-
int64_t source_task_id_;
444-
ObEmbeddingTasSourceType source_task_type_;
445-
int64_t next_handle_task_time_us_;
446-
ObEmbeddingTaskHandler *thread_pool_;
381+
447382
private:
448383
DISALLOW_COPY_AND_ASSIGN(ObEmbeddingTask);
449384
};

src/share/vector_index/ob_vector_index_util.cpp

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3622,70 +3622,6 @@ int ObVectorIndexUtil::check_index_param(
36223622

36233623
return ret;
36243624
}
3625-
// index_table_id must be table which has vector column
3626-
int ObVectorIndexUtil::get_index_column_collation_type(
3627-
const int64_t tenant_id,
3628-
const uint64_t index_table_id,
3629-
ObCollationType &col_type)
3630-
{
3631-
int ret = OB_SUCCESS;
3632-
const ObTableSchema *data_table_schema = nullptr;
3633-
const ObTableSchema *table_schema = nullptr;
3634-
int64_t main_table_id = OB_INVALID_ID;
3635-
ObArray<uint64_t> tmp_column_ids;
3636-
col_type = CS_TYPE_INVALID;
3637-
ObSchemaGetterGuard schema_guard;
3638-
3639-
if (!is_valid_tenant_id(tenant_id) || OB_INVALID_ID == index_table_id) {
3640-
ret = OB_INVALID_ARGUMENT;
3641-
LOG_WARN("invalid argument", K(ret), K(index_table_id));
3642-
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
3643-
LOG_WARN("fail to get tenant schema guard", K(ret), K(MTL_ID()));
3644-
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, index_table_id, table_schema))) {
3645-
LOG_WARN("fail to get table scheam", K(ret), K(tenant_id), K(index_table_id));
3646-
} else if (OB_ISNULL(table_schema)) {
3647-
ret = OB_TABLE_NOT_EXIST;
3648-
LOG_INFO("table not exit", K(ret), K(tenant_id), K(index_table_id));
3649-
} else if (OB_FALSE_IT(main_table_id = table_schema->get_data_table_id())) {
3650-
} else if (OB_INVALID_ID == main_table_id) {
3651-
ret = OB_ERR_UNEXPECTED;
3652-
LOG_WARN("unexpected invalid id", K(ret), K(main_table_id));
3653-
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, main_table_id, data_table_schema))) {
3654-
LOG_WARN("fail to get table scheam", K(ret), K(tenant_id), K(index_table_id));
3655-
} else if (OB_ISNULL(data_table_schema)) {
3656-
ret = OB_TABLE_NOT_EXIST;
3657-
LOG_INFO("table not exit", K(ret), K(tenant_id), K(main_table_id));
3658-
} else if (OB_FAIL(table_schema->get_column_ids(tmp_column_ids))) {
3659-
LOG_WARN("fail to get index table all column ids", K(ret), K(data_table_schema));
3660-
} else {
3661-
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_column_ids.count() && col_type == CS_TYPE_INVALID; ++i) {
3662-
const ObColumnSchemaV2 *col_schema = data_table_schema->get_column_schema(tmp_column_ids[i]);
3663-
if (OB_ISNULL(col_schema)) {
3664-
ret = OB_ERR_UNEXPECTED;
3665-
LOG_WARN("unexpected null column schema ptr", K(ret));
3666-
} else if (!col_schema->is_vec_hnsw_vector_column()) {
3667-
// only need vector column
3668-
} else {
3669-
ObArray<uint64_t> cascaded_column_ids;
3670-
if (OB_FAIL(col_schema->get_cascaded_column_ids(cascaded_column_ids))) {
3671-
LOG_WARN("failed to get cascaded column ids", K(ret));
3672-
} else {
3673-
for (int64_t j = 0; OB_SUCC(ret) && j < cascaded_column_ids.count() && col_type == CS_TYPE_INVALID; ++j) {
3674-
const ObColumnSchemaV2 *cascaded_column = NULL;
3675-
if (OB_ISNULL(cascaded_column = data_table_schema->get_column_schema(cascaded_column_ids.at(j)))) {
3676-
ret = OB_ERR_UNEXPECTED;
3677-
LOG_WARN("unexpected cascaded column", K(ret));
3678-
} else {
3679-
col_type = cascaded_column->get_collation_type();
3680-
LOG_DEBUG("get vector index collation type", K(col_type));
3681-
}
3682-
}
3683-
}
3684-
}
3685-
}
3686-
}
3687-
return ret;
3688-
}
36893625

36903626
int ObVectorIndexUtil::get_vector_index_type(
36913627
sql::ObRawExpr *&raw_expr,

src/share/vector_index/ob_vector_index_util.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -832,9 +832,6 @@ class ObVectorIndexUtil final
832832
const ObString &new_idx_params,
833833
const ObTableSchema &index_table_schema,
834834
bool &need_embedding_when_rebuild);
835-
836-
static int get_index_column_collation_type(const int64_t tenant_id, const uint64_t index_table_id, ObCollationType &col_type);
837-
838835
private:
839836
static void save_column_schema(
840837
const ObColumnSchemaV2 *&old_column,

src/storage/ddl/ob_ddl_pipeline.cpp

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ ObVectorIndexTabletContext::ObVectorIndexTabletContext()
9393
lob_inrow_threshold_(0), rowkey_cnt_(0), column_cnt_(0), snapshot_version_(0), index_type_(share::VIAT_MAX), helper_(nullptr),
9494
allocator_("VecIndexCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
9595
memory_context_(MTL(ObPluginVectorIndexService *)->get_memory_context()),
96-
all_vsag_use_mem_(MTL(ObPluginVectorIndexService *)->get_all_vsag_use_mem()),
97-
table_id_(0)
96+
all_vsag_use_mem_(MTL(ObPluginVectorIndexService *)->get_all_vsag_use_mem())
9897
{
9998

10099
}
@@ -121,8 +120,6 @@ int ObVectorIndexTabletContext::init(
121120
rowkey_cnt_ = ddl_table_schema.table_item_.rowkey_column_num_;
122121
column_cnt_ = ddl_table_schema.column_items_.count();
123122
snapshot_version_ = snapshot_version;
124-
table_id_ = ddl_table_schema.table_id_;
125-
126123
if (schema::is_vec_index_snapshot_data_type(index_type)) {
127124
if (OB_FAIL(init_hnsw_index(ddl_table_schema))) {
128125
LOG_WARN("init hnsw index failed", K(ret));
@@ -1664,18 +1661,13 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
16641661
ret = OB_ERR_UNEXPECTED;
16651662
LOG_WARN("error unexpected, vector index ctx is null", K(ret));
16661663
} else {
1667-
const uint64_t table_id = vector_index_ctx->table_id_;
16681664
vec_dim_ = vector_index_ctx->vec_dim_;
16691665
rowkey_cnt_ = vector_index_ctx->rowkey_cnt_;
16701666
text_col_idx_ = vector_index_ctx->vector_chunk_col_idx_;
16711667
extra_column_idxs_.reset();
16721668
ObVectorIndexParam index_param;
1673-
ObSchemaGetterGuard schema_guard;
1674-
ObCollationType col_type = CS_TYPE_INVALID;
16751669

1676-
if (OB_FAIL(ObVectorIndexUtil::get_index_column_collation_type(MTL_ID(), table_id, col_type))) {
1677-
LOG_WARN("fail to get vector column collation type", K(ret), K(text_col_idx_), K(table_id));
1678-
} else if (OB_FAIL(vector_index_ctx->build_extra_column_idxs(static_cast<int32_t>(text_col_idx_), extra_column_idxs_))) {
1670+
if (OB_FAIL(vector_index_ctx->build_extra_column_idxs(static_cast<int32_t>(text_col_idx_), extra_column_idxs_))) {
16791671
LOG_WARN("build_extra_column_idxs failed", K(ret), K(text_col_idx_));
16801672
} else if (OB_FAIL(ObVectorIndexUtil::parser_params_from_string(vector_index_ctx->vec_idx_param_, ObVectorIndexType::VIT_HNSW_INDEX, index_param, false))) {
16811673
LOG_WARN("failed to parser params from string", K(ret));
@@ -1687,12 +1679,12 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
16871679
ret = OB_ALLOCATE_MEMORY_FAILED;
16881680
LOG_WARN("failed to alloc ObEmbeddingTaskMgr", K(ret));
16891681
} else {
1690-
embedmgr_ = new (buf) ObEmbeddingTaskMgr(*this);
1682+
embedmgr_ = new (buf) ObEmbeddingTaskMgr();
16911683
}
16921684
}
16931685

16941686
if (OB_SUCC(ret)) {
1695-
if (OB_FAIL(embedmgr_->init(model_id_, col_type))) {
1687+
if (OB_FAIL(embedmgr_->init(model_id_))) {
16961688
embedmgr_->~ObEmbeddingTaskMgr();
16971689
op_allocator_.free(embedmgr_);
16981690
embedmgr_ = nullptr;

src/storage/ddl/ob_ddl_pipeline.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(snapshot_version), K_(i
169169
common::ObArenaAllocator allocator_;
170170
lib::MemoryContext &memory_context_;
171171
uint64_t *all_vsag_use_mem_;
172-
uint64_t table_id_;
173172
};
174173

175174
class ObVectorIndexRowIterator

0 commit comments

Comments
 (0)