diff --git a/pkg/config/store/config.go b/pkg/config/store/config.go index dbf617996..33ee06685 100644 --- a/pkg/config/store/config.go +++ b/pkg/config/store/config.go @@ -18,6 +18,9 @@ package store import ( + "os" + "strings" + "github.com/apache/dubbo-admin/pkg/config" ) @@ -31,16 +34,53 @@ const ( Postgres Type = "postgres" ) +// DeploymentMode represents the deployment mode of the store +type DeploymentMode string + +const ( + // DeploymentModeSingle represents single instance deployment + DeploymentModeSingle DeploymentMode = "single" + // DeploymentModeMasterSlave represents master-slave deployment + DeploymentModeMasterSlave DeploymentMode = "master-slave" +) + // Config defines the ResourceStore configuration type Config struct { config.BaseConfig // Type of Store used in Admin Type Type `json:"type"` Address string `json:"address"` + // DeploymentMode specifies the deployment mode (single or master-slave) + DeploymentMode DeploymentMode `json:"deploymentMode"` + // UseDBIndex forces using database-backed index (overrides auto-detection) + UseDBIndex *bool `json:"useDbIndex,omitempty"` } func DefaultStoreConfig() *Config { return &Config{ - Type: Memory, + Type: Memory, + DeploymentMode: detectDeploymentMode(), + UseDBIndex: nil, // Auto-detect based on deployment mode + } +} + +// detectDeploymentMode detects the deployment mode from environment +func detectDeploymentMode() DeploymentMode { + mode := os.Getenv("DUBBO_ADMIN_DB_DEPLOYMENT_MODE") + if mode != "" { + mode = strings.ToLower(mode) + if mode == "master-slave" || mode == "cluster" { + return DeploymentModeMasterSlave + } + } + return DeploymentModeSingle +} + +// ShouldUseDBIndex determines whether to use database-backed index +func (c *Config) ShouldUseDBIndex() bool { + if c.UseDBIndex != nil { + return *c.UseDBIndex } + // Auto-detect: use DB index for master-slave deployment + return c.DeploymentMode == DeploymentModeMasterSlave } diff --git a/pkg/console/service/instance.go b/pkg/console/service/instance.go index ccfe2f949..6951d926f 100644 --- a/pkg/console/service/instance.go +++ b/pkg/console/service/instance.go @@ -41,11 +41,13 @@ import ( // SearchInstanceByIp search instance by ip func SearchInstanceByIp(ctx consolectx.Context, req *model.SearchReq) (*model.SearchPaginationResult, error) { - pageData, err := manager.PageListByIndexes[*meshresource.InstanceResource]( + pageData, err := manager.PageListByIndexesWithPrefix[*meshresource.InstanceResource]( ctx.ResourceManager(), meshresource.InstanceKind, map[string]string{ - index.ByMeshIndex: req.Mesh, + index.ByMeshIndex: req.Mesh, + }, + map[string]string{ index.ByInstanceIpIndex: req.Keywords, }, req.PageReq) @@ -72,11 +74,13 @@ func SearchInstanceByIp(ctx consolectx.Context, req *model.SearchReq) (*model.Se // SearchInstanceByName search instance by name func SearchInstanceByName(ctx consolectx.Context, req *model.SearchReq) (*model.SearchPaginationResult, error) { - pageData, err := manager.PageListByIndexes[*meshresource.InstanceResource]( + pageData, err := manager.PageListByIndexesWithPrefix[*meshresource.InstanceResource]( ctx.ResourceManager(), meshresource.InstanceKind, map[string]string{ - index.ByMeshIndex: req.Mesh, + index.ByMeshIndex: req.Mesh, + }, + map[string]string{ index.ByInstanceNameIndex: req.Keywords, }, req.PageReq) diff --git a/pkg/console/service/service.go b/pkg/console/service/service.go index be851266e..414799b77 100644 --- a/pkg/console/service/service.go +++ b/pkg/console/service/service.go @@ -118,13 +118,15 @@ func SearchServices(ctx consolectx.Context, req *model.ServiceSearchReq) (*model }, nil } -// SearchServicesByKeywords search services by keywords, for now only support accurate search +// SearchServicesByKeywords search services by keywords, now supports prefix search func SearchServicesByKeywords(ctx consolectx.Context, req *model.ServiceSearchReq) (*model.SearchPaginationResult, error) { - pageData, err := manager.PageListByIndexes[*meshresource.ServiceProviderMetadataResource]( + pageData, err := manager.PageListByIndexesWithPrefix[*meshresource.ServiceProviderMetadataResource]( ctx.ResourceManager(), meshresource.ServiceProviderMetadataKind, map[string]string{ - index.ByMeshIndex: req.Mesh, + index.ByMeshIndex: req.Mesh, + }, + map[string]string{ index.ByServiceProviderServiceName: req.Keywords, }, req.PageReq, diff --git a/pkg/core/manager/manager.go b/pkg/core/manager/manager.go index 44861ca7f..3a15fe504 100644 --- a/pkg/core/manager/manager.go +++ b/pkg/core/manager/manager.go @@ -36,6 +36,10 @@ type ReadOnlyResourceManager interface { ListByIndexes(rk model.ResourceKind, indexes map[string]string) ([]model.Resource, error) // PageListByIndexes page list the resources with the given indexes, indexes is a map of index name and index value PageListByIndexes(rk model.ResourceKind, indexes map[string]string, pr model.PageReq) (*model.PageData[model.Resource], error) + // ListByIndexesWithPrefix returns the resources with the given indexes and prefix indexes + ListByIndexesWithPrefix(rk model.ResourceKind, indexes map[string]string, prefixIndexes map[string]string) ([]model.Resource, error) + // PageListByIndexesWithPrefix page list the resources with the given indexes and prefix indexes + PageListByIndexesWithPrefix(rk model.ResourceKind, indexes map[string]string, prefixIndexes map[string]string, pr model.PageReq) (*model.PageData[model.Resource], error) // PageSearchResourceByConditions page fuzzy search resource by conditions, conditions cannot be empty // TODO support multiple conditions PageSearchResourceByConditions(rk model.ResourceKind, conditions []string, pr model.PageReq) (*model.PageData[model.Resource], error) @@ -128,6 +132,39 @@ func (rm *resourcesManager) PageListByIndexes( return pageData, nil } +func (rm *resourcesManager) ListByIndexesWithPrefix( + rk model.ResourceKind, + indexes map[string]string, + prefixIndexes map[string]string) ([]model.Resource, error) { + + rs, err := rm.storeRouter.ResourceKindRoute(rk) + if err != nil { + return nil, err + } + resources, err := rs.ListByIndexesWithPrefix(indexes, prefixIndexes) + if err != nil { + return nil, err + } + return resources, nil +} + +func (rm *resourcesManager) PageListByIndexesWithPrefix( + rk model.ResourceKind, + indexes map[string]string, + prefixIndexes map[string]string, + pr model.PageReq) (*model.PageData[model.Resource], error) { + + rs, err := rm.storeRouter.ResourceKindRoute(rk) + if err != nil { + return nil, err + } + pageData, err := rs.PageListByIndexesWithPrefix(indexes, prefixIndexes, pr) + if err != nil { + return nil, err + } + return pageData, nil +} + func (rm *resourcesManager) PageSearchResourceByConditions(rk model.ResourceKind, conditions []string, pr model.PageReq) (*model.PageData[model.Resource], error) { //TODO implement me panic("implement me") diff --git a/pkg/core/manager/manager_helper.go b/pkg/core/manager/manager_helper.go index 80e40ba80..618bf9afb 100644 --- a/pkg/core/manager/manager_helper.go +++ b/pkg/core/manager/manager_helper.go @@ -137,3 +137,59 @@ func PageSearchResourceByConditions[T model.Resource]( } return newPageData, nil } + +// ListByIndexesWithPrefix is a helper function of ResourceManager.ListByIndexesWithPrefix +func ListByIndexesWithPrefix[T model.Resource]( + rm ReadOnlyResourceManager, + rk model.ResourceKind, + indexes map[string]string, + prefixIndexes map[string]string) ([]T, error) { + + resources, err := rm.ListByIndexesWithPrefix(rk, indexes, prefixIndexes) + if err != nil { + return nil, err + } + + typedResources := make([]T, len(resources)) + for i, resource := range resources { + typedResource, ok := resource.(T) + if !ok { + return nil, bizerror.NewAssertionError(rk, reflect.TypeOf(typedResource).Name()) + } + typedResources[i] = typedResource + } + + return typedResources, nil +} + +// PageListByIndexesWithPrefix is a helper function of ResourceManager.PageListByIndexesWithPrefix +func PageListByIndexesWithPrefix[T model.Resource]( + rm ReadOnlyResourceManager, + rk model.ResourceKind, + indexes map[string]string, + prefixIndexes map[string]string, + pr model.PageReq) (*model.PageData[T], error) { + + pageData, err := rm.PageListByIndexesWithPrefix(rk, indexes, prefixIndexes, pr) + if err != nil { + return nil, err + } + + typedResources := make([]T, len(pageData.Data)) + for i, resource := range pageData.Data { + typedResource, ok := resource.(T) + if !ok { + return nil, bizerror.NewAssertionError(rk, reflect.TypeOf(typedResource).Name()) + } + typedResources[i] = typedResource + } + newPageData := &model.PageData[T]{ + Pagination: model.Pagination{ + Total: pageData.Total, + PageOffset: pageData.PageOffset, + PageSize: pageData.PageSize, + }, + Data: typedResources, + } + return newPageData, nil +} diff --git a/pkg/core/store/store.go b/pkg/core/store/store.go index 15d2117e0..8ca88cfd6 100644 --- a/pkg/core/store/store.go +++ b/pkg/core/store/store.go @@ -40,6 +40,11 @@ type ResourceStore interface { ListByIndexes(indexes map[string]string) ([]model.Resource, error) // PageListByIndexes list resources by indexes pageable, indexes is map of index name and index value PageListByIndexes(indexes map[string]string, pq model.PageReq) (*model.PageData[model.Resource], error) + // ListByIndexesWithPrefix list resources by indexes with prefix matching support + // prefixIndexes is map of index name and prefix value + ListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string) ([]model.Resource, error) + // PageListByIndexesWithPrefix list resources by indexes with prefix matching support, pageable + PageListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string, pq model.PageReq) (*model.PageData[model.Resource], error) } // ManagedResourceStore includes both functional interfaces and lifecycle interfaces diff --git a/pkg/store/dbcommon/gorm_store.go b/pkg/store/dbcommon/gorm_store.go index e823189b5..0bad151a6 100644 --- a/pkg/store/dbcommon/gorm_store.go +++ b/pkg/store/dbcommon/gorm_store.go @@ -27,11 +27,13 @@ import ( "k8s.io/client-go/tools/cache" "github.com/apache/dubbo-admin/pkg/common/bizerror" + storeconfig "github.com/apache/dubbo-admin/pkg/config/store" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" "github.com/apache/dubbo-admin/pkg/core/store" "github.com/apache/dubbo-admin/pkg/core/store/index" + "github.com/apache/dubbo-admin/pkg/store/indexer" ) // GormStore is a GORM-backed store implementation for Dubbo resources @@ -41,7 +43,8 @@ type GormStore struct { pool *ConnectionPool // Shared connection pool with reference counting kind model.ResourceKind address string - indices *Index // In-memory index with thread-safe operations + indices *indexer.IndexImpl + config *storeconfig.Config // Store configuration stopCh chan struct{} } @@ -49,11 +52,25 @@ var _ store.ManagedResourceStore = &GormStore{} // NewGormStore creates a new GORM store for the specified resource kind func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return NewGormStoreWithConfig(kind, address, pool, storeconfig.DefaultStoreConfig()) +} + +// NewGormStoreWithConfig creates a new GORM store with custom configuration +func NewGormStoreWithConfig(kind model.ResourceKind, address string, pool *ConnectionPool, config *storeconfig.Config) *GormStore { + // Create appropriate index based on configuration + var idx indexer.Index + if config.ShouldUseDBIndex() { + idx = indexer.NewDBIndex(pool.GetDB(), kind.ToString()) + } else { + idx = indexer.NewMemoryIndex() + } + return &GormStore{ kind: kind, address: address, pool: pool, - indices: NewIndex(), + indices: indexer.NewIndexImpl(idx), + config: config, stopCh: make(chan struct{}), } } @@ -352,11 +369,12 @@ func (gs *GormStore) Resync() error { } func (gs *GormStore) Index(indexName string, obj interface{}) ([]interface{}, error) { - if !gs.indices.IndexExists(indexName) { + indexers := gs.indices.GetIndexers() + indexFunc, exists := indexers[indexName] + if !exists { return nil, fmt.Errorf("index %s does not exist", indexName) } - indexFunc := gs.indices.GetIndexers()[indexName] indexValues, err := indexFunc(obj) if err != nil { return nil, err @@ -370,7 +388,8 @@ func (gs *GormStore) Index(indexName string, obj interface{}) ([]interface{}, er } func (gs *GormStore) IndexKeys(indexName, indexedValue string) ([]string, error) { - if !gs.indices.IndexExists(indexName) { + indexers := gs.indices.GetIndexers() + if _, exists := indexers[indexName]; !exists { return nil, fmt.Errorf("index %s does not exist", indexName) } @@ -390,7 +409,8 @@ func (gs *GormStore) IndexKeys(indexName, indexedValue string) ([]string, error) } func (gs *GormStore) ListIndexFuncValues(indexName string) []string { - if !gs.indices.IndexExists(indexName) { + indexers := gs.indices.GetIndexers() + if _, exists := indexers[indexName]; !exists { return []string{} } @@ -398,7 +418,8 @@ func (gs *GormStore) ListIndexFuncValues(indexName string) []string { } func (gs *GormStore) ByIndex(indexName, indexedValue string) ([]interface{}, error) { - if !gs.indices.IndexExists(indexName) { + indexers := gs.indices.GetIndexers() + if _, exists := indexers[indexName]; !exists { return nil, fmt.Errorf("index %s does not exist", indexName) } @@ -485,12 +506,17 @@ func (gs *GormStore) PageListByIndexes(indexes map[string]string, pq model.PageR } func (gs *GormStore) findByIndex(indexName, indexedValue string) ([]interface{}, error) { - if !gs.indices.IndexExists(indexName) { + indexers := gs.indices.GetIndexers() + if _, exists := indexers[indexName]; !exists { return nil, fmt.Errorf("index %s does not exist", indexName) } - // Get resource keys from in-memory index - keys := gs.indices.GetKeys(indexName, indexedValue) + // Get resource keys from in-memory index using Query + query := indexer.NewQuery(indexName, indexer.OperatorEquals, indexedValue) + keys, err := gs.indices.Query(query) + if err != nil { + return nil, err + } if len(keys) == 0 { return []interface{}{}, nil @@ -582,3 +608,145 @@ func (gs *GormStore) rebuildIndices() error { logger.Infof("Rebuilt indices for %s: loaded %d resources", gs.kind.ToString(), len(models)) return nil } + +// IndexKeysByPrefix returns all resource keys matching the prefix for a given index +func (gs *GormStore) IndexKeysByPrefix(indexName, prefix string) ([]string, error) { + indexers := gs.indices.GetIndexers() + if _, exists := indexers[indexName]; !exists { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + query := indexer.NewQuery(indexName, indexer.OperatorPrefix, prefix) + return gs.indices.Query(query) +} + +// ByIndexPrefix returns all resources matching the prefix for a given index +func (gs *GormStore) ByIndexPrefix(indexName, prefix string) ([]interface{}, error) { + keys, err := gs.IndexKeysByPrefix(indexName, prefix) + if err != nil { + return nil, err + } + + if len(keys) == 0 { + return []interface{}{}, nil + } + + resources, err := gs.GetByKeys(keys) + if err != nil { + return nil, err + } + + result := make([]interface{}, len(resources)) + for i, resource := range resources { + result[i] = resource + } + + return result, nil +} + +// ListByIndexesWithPrefix lists resources by indexes with prefix matching support +func (gs *GormStore) ListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string) ([]model.Resource, error) { + keys, err := gs.getKeysByIndexesWithPrefix(indexes, prefixIndexes) + if err != nil { + return nil, err + } + + return gs.GetByKeys(keys) +} + +// PageListByIndexesWithPrefix lists resources by indexes with prefix matching support, pageable +func (gs *GormStore) PageListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string, pq model.PageReq) (*model.PageData[model.Resource], error) { + keys, err := gs.getKeysByIndexesWithPrefix(indexes, prefixIndexes) + if err != nil { + return nil, err + } + + sort.Strings(keys) + total := len(keys) + + if pq.PageOffset < 0 || pq.PageOffset > total { + return nil, store.ErrorInvalidOffset + } + + // Calculate page range + end := pq.PageOffset + pq.PageSize + if end > total { + end = total + } + + // Get only the keys for current page + pageKeys := keys[pq.PageOffset:end] + + // Batch fetch resources for current page (single DB query) + resources, err := gs.GetByKeys(pageKeys) + if err != nil { + return nil, err + } + + return model.NewPageData(total, pq.PageOffset, pq.PageSize, resources), nil +} + +// getKeysByIndexesWithPrefix gets resource keys by combining exact and prefix indexes +func (gs *GormStore) getKeysByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string) ([]string, error) { + if len(indexes) == 0 && len(prefixIndexes) == 0 { + return gs.ListKeys(), nil + } + + var keySet map[string]struct{} + first := true + + // Process exact match indexes + for indexName, indexValue := range indexes { + keys, err := gs.IndexKeys(indexName, indexValue) + if err != nil { + return nil, err + } + + if first { + keySet = make(map[string]struct{}, len(keys)) + for _, key := range keys { + keySet[key] = struct{}{} + } + first = false + } else { + nextSet := make(map[string]struct{}, len(keys)) + for _, key := range keys { + if _, exists := keySet[key]; exists { + nextSet[key] = struct{}{} + } + } + keySet = nextSet + } + } + + // Process prefix match indexes + for indexName, prefix := range prefixIndexes { + keys, err := gs.IndexKeysByPrefix(indexName, prefix) + if err != nil { + return nil, err + } + + if first { + keySet = make(map[string]struct{}, len(keys)) + for _, key := range keys { + keySet[key] = struct{}{} + } + first = false + } else { + nextSet := make(map[string]struct{}, len(keys)) + for _, key := range keys { + if _, exists := keySet[key]; exists { + nextSet[key] = struct{}{} + } + } + keySet = nextSet + } + } + + result := make([]string, 0, len(keySet)) + for key := range keySet { + result = append(result, key) + } + + return result, nil +} diff --git a/pkg/store/dbcommon/index.go b/pkg/store/dbcommon/index.go deleted file mode 100644 index 5c5356195..000000000 --- a/pkg/store/dbcommon/index.go +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dbcommon - -import ( - "fmt" - "sync" - - set "github.com/duke-git/lancet/v2/datastructure/set" - "k8s.io/client-go/tools/cache" - - "github.com/apache/dubbo-admin/pkg/core/resource/model" -) - -// ValueIndex represents the mapping from indexed values to resource keys for a single index. -// Structure: map[indexedValue]set[resourceKey] -// Example: map["default"]{"resource1", "resource2", "resource3"} -type ValueIndex struct { - values map[string]set.Set[string] -} - -// NewValueIndex creates a new ValueIndex -func NewValueIndex() *ValueIndex { - return &ValueIndex{ - values: make(map[string]set.Set[string]), - } -} - -// Add adds a resource key to the specified indexed value -func (vi *ValueIndex) Add(indexedValue, resourceKey string) { - if vi.values[indexedValue] == nil { - vi.values[indexedValue] = set.New[string]() - } - vi.values[indexedValue].Add(resourceKey) -} - -// Remove removes a resource key from the specified indexed value -// Returns true if the value entry becomes empty after removal -func (vi *ValueIndex) Remove(indexedValue, resourceKey string) bool { - if vi.values[indexedValue] == nil { - return false - } - - vi.values[indexedValue].Delete(resourceKey) - - // Check if the set is now empty - if vi.values[indexedValue].Size() == 0 { - delete(vi.values, indexedValue) - return true - } - - return false -} - -// GetKeys returns all resource keys for the specified indexed value -func (vi *ValueIndex) GetKeys(indexedValue string) []string { - if vi.values[indexedValue] == nil { - return []string{} - } - return vi.values[indexedValue].ToSlice() -} - -// GetAllValues returns all indexed values in this ValueIndex -func (vi *ValueIndex) GetAllValues() []string { - if len(vi.values) == 0 { - return []string{} - } - - result := make([]string, 0, len(vi.values)) - for value := range vi.values { - result = append(result, value) - } - return result -} - -// IsEmpty returns true if the ValueIndex has no entries -func (vi *ValueIndex) IsEmpty() bool { - return len(vi.values) == 0 -} - -// Index is a thread-safe in-memory index structure that manages multiple named indices. -// Each index maps values to sets of resource keys. -// -// Structure: map[indexName]*ValueIndex -// Example: map["mesh"]*ValueIndex where ValueIndex contains {"default": {"res1", "res2"}} -type Index struct { - mu sync.RWMutex - indices map[string]*ValueIndex // map[indexName]*ValueIndex - indexers cache.Indexers // Index functions for creating indices -} - -// NewIndex creates a new empty Index instance -func NewIndex() *Index { - return &Index{ - indices: make(map[string]*ValueIndex), - indexers: cache.Indexers{}, - } -} - -// AddIndexers adds new indexer functions to the Index -// Returns an error if an indexer with the same name already exists -func (idx *Index) AddIndexers(newIndexers cache.Indexers) error { - idx.mu.Lock() - defer idx.mu.Unlock() - - for name, indexFunc := range newIndexers { - if _, exists := idx.indexers[name]; exists { - return fmt.Errorf("indexer %s already exists", name) - } - idx.indexers[name] = indexFunc - } - - return nil -} - -// GetIndexers returns a copy of all registered indexers -func (idx *Index) GetIndexers() cache.Indexers { - idx.mu.RLock() - defer idx.mu.RUnlock() - - result := make(cache.Indexers, len(idx.indexers)) - for k, v := range idx.indexers { - result[k] = v - } - return result -} - -// UpdateResource atomically updates all indices for a resource -// If oldResource is nil, it's treated as an add operation -// If oldResource is not nil, it's treated as an update operation (remove old, add new) -// This is a high-level atomic operation that handles all indexers internally -func (idx *Index) UpdateResource(newResource model.Resource, oldResource model.Resource) { - idx.mu.Lock() - defer idx.mu.Unlock() - - // Remove old resource from indices if this is an update - if oldResource != nil { - idx.removeResourceUnsafe(oldResource) - } - - // Add new resource to indices - idx.addResourceUnsafe(newResource) -} - -// RemoveResource atomically removes a resource from all indices -func (idx *Index) RemoveResource(resource model.Resource) { - idx.mu.Lock() - defer idx.mu.Unlock() - - idx.removeResourceUnsafe(resource) -} - -// GetKeys returns all resource keys for a given index name and value -// Returns an empty slice if the index name or value doesn't exist -func (idx *Index) GetKeys(indexName, indexValue string) []string { - idx.mu.RLock() - defer idx.mu.RUnlock() - - valueIndex := idx.indices[indexName] - if valueIndex == nil { - return []string{} - } - - return valueIndex.GetKeys(indexValue) -} - -// ListIndexFuncValues returns all indexed values for a given index name -// This directly retrieves values from the in-memory index without recalculating -func (idx *Index) ListIndexFuncValues(indexName string) []string { - idx.mu.RLock() - defer idx.mu.RUnlock() - - valueIndex := idx.indices[indexName] - if valueIndex == nil { - return []string{} - } - - return valueIndex.GetAllValues() -} - -// Clear removes all entries from the index -func (idx *Index) Clear() { - idx.mu.Lock() - defer idx.mu.Unlock() - idx.indices = make(map[string]*ValueIndex) -} - -// IndexExists checks if an indexer with the given name exists -func (idx *Index) IndexExists(indexName string) bool { - idx.mu.RLock() - defer idx.mu.RUnlock() - _, exists := idx.indexers[indexName] - return exists -} - -// addResourceUnsafe adds a resource to all indices (must be called with lock held) -func (idx *Index) addResourceUnsafe(resource model.Resource) { - for indexName, indexFunc := range idx.indexers { - values, err := indexFunc(resource) - if err != nil { - continue - } - - // Ensure the ValueIndex exists for this index name - if idx.indices[indexName] == nil { - idx.indices[indexName] = NewValueIndex() - } - - // Add resource key to each indexed value - for _, value := range values { - idx.indices[indexName].Add(value, resource.ResourceKey()) - } - } -} - -// removeResourceUnsafe removes a resource from all indices (must be called with lock held) -func (idx *Index) removeResourceUnsafe(resource model.Resource) { - for indexName, indexFunc := range idx.indexers { - values, err := indexFunc(resource) - if err != nil { - continue - } - - valueIndex := idx.indices[indexName] - if valueIndex == nil { - continue - } - - // Remove resource key from each indexed value - for _, value := range values { - valueIndex.Remove(value, resource.ResourceKey()) - } - - // Clean up empty ValueIndex - if valueIndex.IsEmpty() { - delete(idx.indices, indexName) - } - } -} diff --git a/pkg/store/indexer/db_index.go b/pkg/store/indexer/db_index.go new file mode 100644 index 000000000..85f826365 --- /dev/null +++ b/pkg/store/indexer/db_index.go @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "fmt" + "sync" + + "gorm.io/gorm" +) + +type DBIndex struct { + db *gorm.DB + resourceKind string + mu sync.RWMutex + indexers Indexers +} + +func NewDBIndex(db *gorm.DB, resourceKind string) *DBIndex { + return &DBIndex{ + db: db, + resourceKind: resourceKind, + indexers: make(Indexers), + } +} + +func (d *DBIndex) AddIndexers(indexers Indexers) error { + d.mu.Lock() + defer d.mu.Unlock() + + for name, indexFunc := range indexers { + if _, exists := d.indexers[name]; exists { + return fmt.Errorf("indexer %s already exists", name) + } + d.indexers[name] = indexFunc + } + return nil +} + +func (d *DBIndex) GetIndexers() Indexers { + d.mu.RLock() + defer d.mu.RUnlock() + + result := make(Indexers, len(d.indexers)) + for k, v := range d.indexers { + result[k] = v + } + return result +} + +func (d *DBIndex) Add(indexName, indexValue, resourceKey string) error { + model := IndexModel{ + IndexName: indexName, + IndexValue: indexValue, + ResourceKey: resourceKey, + ResourceKind: d.resourceKind, + } + return d.db.Create(&model).Error +} + +func (d *DBIndex) Remove(indexName, indexValue, resourceKey string) error { + return d.db.Where( + "index_name = ? AND index_value = ? AND resource_key = ? AND resource_kind = ?", + indexName, indexValue, resourceKey, d.resourceKind, + ).Delete(&IndexModel{}).Error +} + +func (d *DBIndex) Query(query Query) ([]string, error) { + var models []IndexModel + tx := d.db.Where("index_name = ? AND resource_kind = ?", query.IndexName, d.resourceKind) + + switch query.Operator { + case OperatorEquals: + tx = tx.Where("index_value = ?", query.Value) + case OperatorPrefix: + tx = tx.Where("index_value LIKE ?", query.Value+"%") + default: + return nil, fmt.Errorf("unsupported operator: %s", query.Operator) + } + + if err := tx.Find(&models).Error; err != nil { + return nil, err + } + + keys := make([]string, 0, len(models)) + for _, model := range models { + keys = append(keys, model.ResourceKey) + } + return keys, nil +} + +func (d *DBIndex) Clear() { + d.db.Where("resource_kind = ?", d.resourceKind).Delete(&IndexModel{}) +} + +// ListIndexFuncValues returns all distinct index values for a given index name +func (d *DBIndex) ListIndexFuncValues(indexName string) []string { + var values []string + d.db.Model(&IndexModel{}). + Where("index_name = ? AND resource_kind = ?", indexName, d.resourceKind). + Distinct("index_value"). + Pluck("index_value", &values) + return values +} diff --git a/pkg/store/indexer/db_index_test.go b/pkg/store/indexer/db_index_test.go new file mode 100644 index 000000000..2c36cd8f8 --- /dev/null +++ b/pkg/store/indexer/db_index_test.go @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "testing" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + + if err := db.AutoMigrate(&IndexModel{}); err != nil { + t.Fatalf("failed to migrate: %v", err) + } + + return db +} + +func TestNewDBIndex(t *testing.T) { + db := setupTestDB(t) + idx := NewDBIndex(db, "Service") + if idx == nil { + t.Fatal("NewDBIndex returned nil") + } + if idx.db == nil { + t.Error("db is nil") + } + if idx.resourceKind != "Service" { + t.Errorf("resourceKind = %s, want Service", idx.resourceKind) + } + if idx.indexers == nil { + t.Error("indexers map is nil") + } +} + +func TestDBIndexAddRemove(t *testing.T) { + tests := []struct { + name string + adds []struct{ indexName, indexValue, resourceKey string } + removes []struct{ indexName, indexValue, resourceKey string } + wantQuery Query + wantCount int + }{ + { + name: "add single entry", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + wantQuery: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 1, + }, + { + name: "add and remove", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + removes: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + wantQuery: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := setupTestDB(t) + idx := NewDBIndex(db, "Service") + for _, add := range tt.adds { + if err := idx.Add(add.indexName, add.indexValue, add.resourceKey); err != nil { + t.Fatalf("Add failed: %v", err) + } + } + for _, rm := range tt.removes { + if err := idx.Remove(rm.indexName, rm.indexValue, rm.resourceKey); err != nil { + t.Fatalf("Remove failed: %v", err) + } + } + results, err := idx.Query(tt.wantQuery) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != tt.wantCount { + t.Errorf("got %d results, want %d", len(results), tt.wantCount) + } + }) + } +} + +func TestDBIndexQuery(t *testing.T) { + tests := []struct { + name string + adds []struct{ indexName, indexValue, resourceKey string } + query Query + wantCount int + }{ + { + name: "exact match", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + {"app_name", "dubbo-admin", "key2"}, + }, + query: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 1, + }, + { + name: "prefix match", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service-a", "key1"}, + {"app_name", "dubbo-service-b", "key2"}, + {"app_name", "dubbo-admin", "key3"}, + }, + query: NewQuery("app_name", OperatorPrefix, "dubbo-service"), + wantCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := setupTestDB(t) + idx := NewDBIndex(db, "Service") + for _, add := range tt.adds { + if err := idx.Add(add.indexName, add.indexValue, add.resourceKey); err != nil { + t.Fatalf("Add failed: %v", err) + } + } + results, err := idx.Query(tt.query) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != tt.wantCount { + t.Errorf("got %d results, want %d", len(results), tt.wantCount) + } + }) + } +} + +func TestDBIndexClear(t *testing.T) { + db := setupTestDB(t) + idx := NewDBIndex(db, "Service") + + idx.Add("app_name", "dubbo-service", "key1") + idx.Add("app_name", "dubbo-admin", "key2") + + idx.Clear() + + results, err := idx.Query(NewQuery("app_name", OperatorPrefix, "dubbo")) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 0 { + t.Errorf("got %d results after Clear, want 0", len(results)) + } +} + +func TestDBIndexResourceKindIsolation(t *testing.T) { + db := setupTestDB(t) + serviceIdx := NewDBIndex(db, "Service") + instanceIdx := NewDBIndex(db, "Instance") + + serviceIdx.Add("app_name", "dubbo-service", "service-key1") + instanceIdx.Add("app_name", "dubbo-service", "instance-key1") + + serviceResults, err := serviceIdx.Query(NewQuery("app_name", OperatorEquals, "dubbo-service")) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(serviceResults) != 1 || serviceResults[0] != "service-key1" { + t.Errorf("Service index got %v, want [service-key1]", serviceResults) + } + + instanceResults, err := instanceIdx.Query(NewQuery("app_name", OperatorEquals, "dubbo-service")) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(instanceResults) != 1 || instanceResults[0] != "instance-key1" { + t.Errorf("Instance index got %v, want [instance-key1]", instanceResults) + } +} diff --git a/pkg/store/indexer/index.go b/pkg/store/indexer/index.go new file mode 100644 index 000000000..c019577d0 --- /dev/null +++ b/pkg/store/indexer/index.go @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "k8s.io/client-go/tools/cache" +) + +type IndexFunc func(obj interface{}) ([]string, error) + +type Indexers map[string]IndexFunc + +type Index interface { + AddIndexers(indexers Indexers) error + GetIndexers() Indexers + + Add(indexName, indexValue, resourceKey string) error + Remove(indexName, indexValue, resourceKey string) error + + Query(query Query) ([]string, error) + + // ListIndexFuncValues returns all index values for a given index name + ListIndexFuncValues(indexName string) []string + + Clear() +} + +type IndexImpl struct { + index Index + indexers cache.Indexers +} + +func NewIndexImpl(index Index) *IndexImpl { + return &IndexImpl{ + index: index, + indexers: make(cache.Indexers), + } +} + +func (idx *IndexImpl) AddIndexers(newIndexers cache.Indexers) error { + for name, fn := range newIndexers { + idx.indexers[name] = fn + } + + // Convert and register to underlying index + converted := make(Indexers) + for name, fn := range newIndexers { + // Type conversion: both are func(interface{}) ([]string, error) + converted[name] = IndexFunc(fn) + } + return idx.index.AddIndexers(converted) +} + +func (idx *IndexImpl) GetIndexers() cache.Indexers { + result := make(cache.Indexers, len(idx.indexers)) + for k, v := range idx.indexers { + result[k] = v + } + return result +} + +func (idx *IndexImpl) UpdateResource(newResource, oldResource model.Resource) { + if oldResource != nil { + idx.RemoveResource(oldResource) + } + + for indexName, indexFunc := range idx.indexers { + values, err := indexFunc(newResource) + if err != nil { + continue + } + for _, value := range values { + idx.index.Add(indexName, value, newResource.ResourceKey()) + } + } +} + +func (idx *IndexImpl) RemoveResource(resource model.Resource) { + for indexName, indexFunc := range idx.indexers { + values, err := indexFunc(resource) + if err != nil { + continue + } + for _, value := range values { + idx.index.Remove(indexName, value, resource.ResourceKey()) + } + } +} + +func (idx *IndexImpl) Query(query Query) ([]string, error) { + return idx.index.Query(query) +} + +func (idx *IndexImpl) Clear() { + idx.index.Clear() +} + +func (idx *IndexImpl) ListIndexFuncValues(indexName string) []string { + return idx.index.ListIndexFuncValues(indexName) +} diff --git a/pkg/store/indexer/index_model.go b/pkg/store/indexer/index_model.go new file mode 100644 index 000000000..b5a68f18e --- /dev/null +++ b/pkg/store/indexer/index_model.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "time" +) + +type IndexModel struct { + ID uint `gorm:"primarykey"` + IndexName string `gorm:"type:varchar(255);not null;index:idx_name_value"` + IndexValue string `gorm:"type:varchar(512);not null;index:idx_name_value;index:idx_prefix"` + ResourceKey string `gorm:"type:varchar(512);not null"` + ResourceKind string `gorm:"type:varchar(128);not null;index"` + CreatedAt time.Time `gorm:"autoCreateTime"` + UpdatedAt time.Time `gorm:"autoUpdateTime"` +} + +func (IndexModel) TableName() string { + return "resource_indexes" +} diff --git a/pkg/store/indexer/memory_index.go b/pkg/store/indexer/memory_index.go new file mode 100644 index 000000000..a1745da2c --- /dev/null +++ b/pkg/store/indexer/memory_index.go @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "fmt" + "sync" +) + +type MemoryIndex struct { + mu sync.RWMutex + indices map[string]*RadixTree + indexers Indexers +} + +func NewMemoryIndex() *MemoryIndex { + return &MemoryIndex{ + indices: make(map[string]*RadixTree), + indexers: make(Indexers), + } +} + +func (m *MemoryIndex) AddIndexers(indexers Indexers) error { + m.mu.Lock() + defer m.mu.Unlock() + + for name, indexFunc := range indexers { + if _, exists := m.indexers[name]; exists { + return fmt.Errorf("indexer %s already exists", name) + } + m.indexers[name] = indexFunc + } + return nil +} + +func (m *MemoryIndex) GetIndexers() Indexers { + m.mu.RLock() + defer m.mu.RUnlock() + + result := make(Indexers, len(m.indexers)) + for k, v := range m.indexers { + result[k] = v + } + return result +} + +func (m *MemoryIndex) Add(indexName, indexValue, resourceKey string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.indices[indexName] == nil { + m.indices[indexName] = NewRadixTree() + } + + m.indices[indexName].Insert(indexValue, resourceKey) + return nil +} + +func (m *MemoryIndex) Remove(indexName, indexValue, resourceKey string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.indices[indexName] == nil { + return nil + } + + m.indices[indexName].Delete(indexValue, resourceKey) + return nil +} + +func (m *MemoryIndex) Query(query Query) ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + tree := m.indices[query.IndexName] + if tree == nil { + return []string{}, nil + } + + switch query.Operator { + case OperatorEquals: + return tree.ExactSearch(query.Value), nil + case OperatorPrefix: + return tree.SearchPrefix(query.Value), nil + default: + return nil, fmt.Errorf("unsupported operator: %s", query.Operator) + } +} + +func (m *MemoryIndex) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + m.indices = make(map[string]*RadixTree) +} + +// ListIndexFuncValues returns all index values for a given index name +func (m *MemoryIndex) ListIndexFuncValues(indexName string) []string { + m.mu.RLock() + defer m.mu.RUnlock() + + tree := m.indices[indexName] + if tree == nil { + return []string{} + } + + return tree.GetAllIndexValues() +} diff --git a/pkg/store/indexer/memory_index_test.go b/pkg/store/indexer/memory_index_test.go new file mode 100644 index 000000000..cac78b939 --- /dev/null +++ b/pkg/store/indexer/memory_index_test.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "sync" + "testing" +) + +func TestNewMemoryIndex(t *testing.T) { + idx := NewMemoryIndex() + if idx == nil { + t.Fatal("NewMemoryIndex returned nil") + } + if idx.indices == nil { + t.Error("indices map is nil") + } + if idx.indexers == nil { + t.Error("indexers map is nil") + } +} + +func TestMemoryIndexAddRemove(t *testing.T) { + tests := []struct { + name string + adds []struct{ indexName, indexValue, resourceKey string } + removes []struct{ indexName, indexValue, resourceKey string } + wantQuery Query + wantCount int + }{ + { + name: "add single entry", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + wantQuery: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 1, + }, + { + name: "add and remove", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + removes: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + }, + wantQuery: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + idx := NewMemoryIndex() + for _, add := range tt.adds { + idx.Add(add.indexName, add.indexValue, add.resourceKey) + } + for _, rm := range tt.removes { + idx.Remove(rm.indexName, rm.indexValue, rm.resourceKey) + } + results, err := idx.Query(tt.wantQuery) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != tt.wantCount { + t.Errorf("got %d results, want %d", len(results), tt.wantCount) + } + }) + } +} + +func TestMemoryIndexQuery(t *testing.T) { + tests := []struct { + name string + adds []struct{ indexName, indexValue, resourceKey string } + query Query + wantCount int + }{ + { + name: "exact match", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service", "key1"}, + {"app_name", "dubbo-admin", "key2"}, + }, + query: NewQuery("app_name", OperatorEquals, "dubbo-service"), + wantCount: 1, + }, + { + name: "prefix match", + adds: []struct{ indexName, indexValue, resourceKey string }{ + {"app_name", "dubbo-service-a", "key1"}, + {"app_name", "dubbo-service-b", "key2"}, + {"app_name", "dubbo-admin", "key3"}, + }, + query: NewQuery("app_name", OperatorPrefix, "dubbo-service"), + wantCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + idx := NewMemoryIndex() + for _, add := range tt.adds { + idx.Add(add.indexName, add.indexValue, add.resourceKey) + } + results, err := idx.Query(tt.query) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != tt.wantCount { + t.Errorf("got %d results, want %d", len(results), tt.wantCount) + } + }) + } +} +func TestMemoryIndexConcurrent(t *testing.T) { + idx := NewMemoryIndex() + var wg sync.WaitGroup + + for i := 0; i < 50; i++ { + wg.Add(2) + go func() { + defer wg.Done() + idx.Add("app_name", "dubbo-service", "key1") + }() + go func() { + defer wg.Done() + idx.Query(NewQuery("app_name", OperatorPrefix, "dubbo")) + }() + } + + wg.Wait() +} diff --git a/pkg/store/indexer/operator.go b/pkg/store/indexer/operator.go new file mode 100644 index 000000000..71ea679b7 --- /dev/null +++ b/pkg/store/indexer/operator.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +type Operator string + +const ( + OperatorEquals Operator = "equals" + OperatorPrefix Operator = "prefix" + OperatorContains Operator = "contains" + OperatorSuffix Operator = "suffix" +) + +type Query struct { + IndexName string + Operator Operator + Value string +} + +func NewQuery(indexName string, operator Operator, value string) Query { + return Query{ + IndexName: indexName, + Operator: operator, + Value: value, + } +} diff --git a/pkg/store/indexer/radix_tree.go b/pkg/store/indexer/radix_tree.go new file mode 100644 index 000000000..12b89c79e --- /dev/null +++ b/pkg/store/indexer/radix_tree.go @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "strings" + "sync" + + set "github.com/duke-git/lancet/v2/datastructure/set" +) + +// RadixNode is a node in the compressed Trie (Radix Tree) +type RadixNode struct { + // prefix is the string segment stored in this node + prefix string + // keys stores all resource keys that end at this node + keys set.Set[string] + // children maps the first byte of child prefixes to child nodes + children map[byte]*RadixNode + // isLeaf marks whether this node contains actual resource keys + isLeaf bool +} + +// RadixTree is a thread-safe compressed Trie tree for prefix matching +type RadixTree struct { + root *RadixNode + mu sync.RWMutex + size int +} + +// NewRadixTree creates a new compressed Trie tree +func NewRadixTree() *RadixTree { + return &RadixTree{ + root: &RadixNode{ + prefix: "", + keys: set.New[string](), + children: make(map[byte]*RadixNode), + isLeaf: false, + }, + size: 0, + } +} + +func (rt *RadixTree) Size() int { + rt.mu.RLock() + defer rt.mu.RUnlock() + return rt.size +} + +func newNode(prefix string) *RadixNode { + return &RadixNode{ + prefix: prefix, + keys: set.New[string](), + children: make(map[byte]*RadixNode), + isLeaf: false, + } +} + +func (rt *RadixTree) addKeyToNode(node *RadixNode, resourceKey string) bool { + if node.keys == nil { + node.keys = set.New[string]() + } + if !node.keys.Contain(resourceKey) { + node.keys.Add(resourceKey) + node.isLeaf = true + rt.size++ + return true + } + return false +} + +func (node *RadixNode) getChild(value string) (*RadixNode, bool) { + if value == "" { + return nil, false + } + child, exists := node.children[value[0]] + return child, exists +} + +// Insert inserts an index value and its associated resource key +func (rt *RadixTree) Insert(indexValue, resourceKey string) { + rt.mu.Lock() + defer rt.mu.Unlock() + + if indexValue == "" { + return + } + + rt.insert(rt.root, indexValue, resourceKey) +} + +func (rt *RadixTree) insert(node *RadixNode, indexValue, resourceKey string) { + if node.prefix == "" || indexValue == "" { + if indexValue == "" { + rt.addKeyToNode(node, resourceKey) + return + } + + child, exists := node.getChild(indexValue) + if !exists { + child = newNode(indexValue) + child.keys.Add(resourceKey) + child.isLeaf = true + node.children[indexValue[0]] = child + rt.size++ + return + } + + rt.insert(child, indexValue, resourceKey) + return + } + + commonLen := rt.commonPrefixLength(node.prefix, indexValue) + + if commonLen == len(node.prefix) { + remaining := indexValue[commonLen:] + + if remaining == "" { + rt.addKeyToNode(node, resourceKey) + return + } + + child, exists := node.getChild(remaining) + if !exists { + child = newNode(remaining) + child.keys.Add(resourceKey) + child.isLeaf = true + node.children[remaining[0]] = child + rt.size++ + return + } + + rt.insert(child, remaining, resourceKey) + return + } + + rt.splitNode(node, commonLen, indexValue, resourceKey) +} + +func (rt *RadixTree) splitNode(node *RadixNode, commonLen int, indexValue, resourceKey string) { + commonPrefix := node.prefix[:commonLen] + oldSuffix := node.prefix[commonLen:] + newSuffix := indexValue[commonLen:] + + middleNode := newNode(commonPrefix) + + oldChild := &RadixNode{ + prefix: oldSuffix, + keys: node.keys, + children: node.children, + isLeaf: node.isLeaf, + } + middleNode.children[oldSuffix[0]] = oldChild + + if newSuffix != "" { + newChild := newNode(newSuffix) + newChild.keys.Add(resourceKey) + newChild.isLeaf = true + middleNode.children[newSuffix[0]] = newChild + } else { + middleNode.keys.Add(resourceKey) + middleNode.isLeaf = true + } + + node.prefix = middleNode.prefix + node.keys = middleNode.keys + node.children = middleNode.children + node.isLeaf = middleNode.isLeaf + + rt.size++ +} + +func (rt *RadixTree) commonPrefixLength(s1, s2 string) int { + minLen := len(s1) + if len(s2) < minLen { + minLen = len(s2) + } + + for i := 0; i < minLen; i++ { + if s1[i] != s2[i] { + return i + } + } + + return minLen +} + +func (rt *RadixTree) SearchPrefix(prefix string) []string { + rt.mu.RLock() + defer rt.mu.RUnlock() + + if prefix == "" { + return rt.getAllKeys(rt.root) + } + + node := rt.findPrefixNode(rt.root, prefix) + if node == nil { + return []string{} + } + + return rt.getAllKeys(node) +} + +func (rt *RadixTree) findNode(node *RadixNode, value string, exactMatch bool) *RadixNode { + if value == "" { + return node + } + + if node.prefix == "" { + child, exists := node.getChild(value) + if !exists { + return nil + } + return rt.findNode(child, value, exactMatch) + } + + if value == node.prefix { + return node + } + + if strings.HasPrefix(value, node.prefix) { + remaining := value[len(node.prefix):] + child, exists := node.getChild(remaining) + if !exists { + return nil + } + return rt.findNode(child, remaining, exactMatch) + } + + if !exactMatch && strings.HasPrefix(node.prefix, value) { + return node + } + + return nil +} + +func (rt *RadixTree) findPrefixNode(node *RadixNode, prefix string) *RadixNode { + return rt.findNode(node, prefix, false) +} + +func (rt *RadixTree) findExactNode(node *RadixNode, value string) *RadixNode { + return rt.findNode(node, value, true) +} + +func (rt *RadixTree) getAllKeys(node *RadixNode) []string { + if node == nil { + return []string{} + } + + var result []string + + // Collect keys from current node + if node.isLeaf && node.keys != nil { + result = append(result, node.keys.ToSlice()...) + } + + // Recursively collect keys from all children + for _, child := range node.children { + result = append(result, rt.getAllKeys(child)...) + } + + return result +} + +// ExactSearch finds resource keys that exactly match the given value +func (rt *RadixTree) ExactSearch(indexValue string) []string { + rt.mu.RLock() + defer rt.mu.RUnlock() + + node := rt.findExactNode(rt.root, indexValue) + if node == nil || !node.isLeaf { + return []string{} + } + + return node.keys.ToSlice() +} + +// GetAllIndexValues returns all index values stored in the tree +func (rt *RadixTree) GetAllIndexValues() []string { + rt.mu.RLock() + defer rt.mu.RUnlock() + + return rt.collectIndexValues(rt.root, "") +} + +// collectIndexValues recursively collects all index values from the tree +func (rt *RadixTree) collectIndexValues(node *RadixNode, currentPath string) []string { + if node == nil { + return []string{} + } + + var result []string + fullPath := currentPath + node.prefix + + // If this node is a leaf, add the full path as an index value + if node.isLeaf { + result = append(result, fullPath) + } + + // Recursively collect from all children + for _, child := range node.children { + result = append(result, rt.collectIndexValues(child, fullPath)...) + } + + return result +} + +func (rt *RadixTree) Delete(indexValue, resourceKey string) bool { + rt.mu.Lock() + defer rt.mu.Unlock() + + return rt.delete(rt.root, indexValue, resourceKey, nil, 0) +} + +func (rt *RadixTree) removeKeyFromNode(node *RadixNode, resourceKey string, parent *RadixNode, parentKey byte) bool { + if node.keys == nil || !node.keys.Contain(resourceKey) { + return false + } + + node.keys.Delete(resourceKey) + rt.size-- + + if node.keys.Size() == 0 { + node.isLeaf = false + rt.mergeNode(node, parent, parentKey) + } + + return true +} + +func (rt *RadixTree) delete(node *RadixNode, indexValue, resourceKey string, parent *RadixNode, parentKey byte) bool { + if node == nil { + return false + } + + if indexValue == "" { + return rt.removeKeyFromNode(node, resourceKey, parent, parentKey) + } + + if node.prefix == "" { + child, exists := node.getChild(indexValue) + if !exists { + return false + } + return rt.delete(child, indexValue, resourceKey, node, indexValue[0]) + } + + if indexValue == node.prefix { + return rt.removeKeyFromNode(node, resourceKey, parent, parentKey) + } + + if strings.HasPrefix(indexValue, node.prefix) { + remaining := indexValue[len(node.prefix):] + child, exists := node.getChild(remaining) + if !exists { + return false + } + return rt.delete(child, remaining, resourceKey, node, remaining[0]) + } + + return false +} + +func (rt *RadixTree) mergeNode(node *RadixNode, parent *RadixNode, parentKey byte) { + if !node.isLeaf && len(node.children) == 0 { + if parent != nil { + delete(parent.children, parentKey) + } + return + } + + if !node.isLeaf && len(node.children) == 1 { + var onlyChild *RadixNode + for _, child := range node.children { + onlyChild = child + break + } + + node.prefix = node.prefix + onlyChild.prefix + node.keys = onlyChild.keys + node.children = onlyChild.children + node.isLeaf = onlyChild.isLeaf + } +} diff --git a/pkg/store/indexer/radix_tree_test.go b/pkg/store/indexer/radix_tree_test.go new file mode 100644 index 000000000..5a6cff826 --- /dev/null +++ b/pkg/store/indexer/radix_tree_test.go @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package indexer + +import ( + "sync" + "testing" +) + +func TestNewRadixTree(t *testing.T) { + tree := NewRadixTree() + if tree == nil { + t.Fatal("NewRadixTree returned nil") + } + if tree.Size() != 0 { + t.Errorf("Expected size 0, got %d", tree.Size()) + } + if tree.root == nil { + t.Error("Root node is nil") + } +} + +func TestInsert(t *testing.T) { + tests := []struct { + name string + inserts []struct{ index, key string } + wantSize int + }{ + { + name: "single insert", + inserts: []struct{ index, key string }{{"dubbo-service", "key1"}}, + wantSize: 1, + }, + { + name: "multiple inserts", + inserts: []struct{ index, key string }{ + {"dubbo-service-a", "key1"}, + {"dubbo-service-b", "key2"}, + {"dubbo-admin", "key3"}, + }, + wantSize: 3, + }, + { + name: "duplicate insert", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + {"dubbo-service", "key1"}, + }, + wantSize: 1, + }, + { + name: "same index different keys", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + {"dubbo-service", "key2"}, + }, + wantSize: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tree := NewRadixTree() + for _, ins := range tt.inserts { + tree.Insert(ins.index, ins.key) + } + if got := tree.Size(); got != tt.wantSize { + t.Errorf("Size() = %d, want %d", got, tt.wantSize) + } + }) + } +} + +func TestSearchPrefix(t *testing.T) { + tests := []struct { + name string + inserts []struct{ index, key string } + prefix string + wantCount int + }{ + { + name: "basic prefix match", + inserts: []struct{ index, key string }{ + {"dubbo-service-a", "key1"}, + {"dubbo-service-b", "key2"}, + {"dubbo-admin", "key3"}, + }, + prefix: "dubbo-service", + wantCount: 2, + }, + { + name: "empty prefix returns all", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + {"dubbo-admin", "key2"}, + }, + prefix: "", + wantCount: 2, + }, + { + name: "no match", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + }, + prefix: "spring", + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tree := NewRadixTree() + for _, ins := range tt.inserts { + tree.Insert(ins.index, ins.key) + } + results := tree.SearchPrefix(tt.prefix) + if got := len(results); got != tt.wantCount { + t.Errorf("SearchPrefix() returned %d results, want %d", got, tt.wantCount) + } + }) + } +} + +func TestExactSearch(t *testing.T) { + tests := []struct { + name string + inserts []struct{ index, key string } + searchKey string + wantCount int + wantKey string + }{ + { + name: "exact match", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + {"dubbo-service-a", "key2"}, + }, + searchKey: "dubbo-service", + wantCount: 1, + wantKey: "key1", + }, + { + name: "no match", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + }, + searchKey: "dubbo-service-a", + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tree := NewRadixTree() + for _, ins := range tt.inserts { + tree.Insert(ins.index, ins.key) + } + results := tree.ExactSearch(tt.searchKey) + if got := len(results); got != tt.wantCount { + t.Errorf("ExactSearch() returned %d results, want %d", got, tt.wantCount) + } + if tt.wantCount > 0 && len(results) > 0 && results[0] != tt.wantKey { + t.Errorf("ExactSearch() returned key %s, want %s", results[0], tt.wantKey) + } + }) + } +} + +func TestDelete(t *testing.T) { + tests := []struct { + name string + inserts []struct{ index, key string } + deleteIndex string + deleteKey string + wantDeleted bool + wantSize int + wantRemaining string + }{ + { + name: "basic delete", + inserts: []struct{ index, key string }{{"dubbo-service", "key1"}}, + deleteIndex: "dubbo-service", + deleteKey: "key1", + wantDeleted: true, + wantSize: 0, + }, + { + name: "delete one of multiple keys", + inserts: []struct{ index, key string }{ + {"dubbo-service", "key1"}, + {"dubbo-service", "key2"}, + }, + deleteIndex: "dubbo-service", + deleteKey: "key1", + wantDeleted: true, + wantSize: 1, + wantRemaining: "key2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tree := NewRadixTree() + for _, ins := range tt.inserts { + tree.Insert(ins.index, ins.key) + } + + deleted := tree.Delete(tt.deleteIndex, tt.deleteKey) + if deleted != tt.wantDeleted { + t.Errorf("Delete() = %v, want %v", deleted, tt.wantDeleted) + } + if got := tree.Size(); got != tt.wantSize { + t.Errorf("Size() = %d, want %d", got, tt.wantSize) + } + + if tt.wantRemaining != "" { + results := tree.ExactSearch(tt.deleteIndex) + if len(results) != 1 || results[0] != tt.wantRemaining { + t.Errorf("Expected remaining key %s, got %v", tt.wantRemaining, results) + } + } + }) + } +} + +func TestConcurrentInsert(t *testing.T) { + tree := NewRadixTree() + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + tree.Insert("dubbo-service", "key") + }(i) + } + + wg.Wait() + if tree.Size() != 1 { + t.Errorf("Expected size 1 after concurrent inserts, got %d", tree.Size()) + } +} + +func TestConcurrentReadWrite(t *testing.T) { + tree := NewRadixTree() + tree.Insert("dubbo-service-a", "key1") + tree.Insert("dubbo-service-b", "key2") + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(2) + go func() { + defer wg.Done() + tree.SearchPrefix("dubbo-service") + }() + go func() { + defer wg.Done() + tree.Insert("dubbo-admin", "key3") + }() + } + + wg.Wait() +} diff --git a/pkg/store/memory/store.go b/pkg/store/memory/store.go index fa0b3a6b7..5afcf5f83 100644 --- a/pkg/store/memory/store.go +++ b/pkg/store/memory/store.go @@ -30,11 +30,13 @@ import ( "github.com/apache/dubbo-admin/pkg/core/runtime" "github.com/apache/dubbo-admin/pkg/core/store" "github.com/apache/dubbo-admin/pkg/core/store/index" + "github.com/apache/dubbo-admin/pkg/store/indexer" ) type resourceStore struct { rk coremodel.ResourceKind storeProxy cache.Indexer + indices *indexer.IndexImpl } var _ store.ManagedResourceStore = &resourceStore{} @@ -55,6 +57,14 @@ func (rs *resourceStore) Init(_ runtime.BuilderContext) error { }, indexers, ) + + // Create IndexWrapper with MemoryIndex for prefix query support + memIndex := indexer.NewMemoryIndex() + rs.indices = indexer.NewIndexImpl(memIndex) + if err := rs.indices.AddIndexers(indexers); err != nil { + return err + } + return nil } @@ -63,15 +73,56 @@ func (rs *resourceStore) Start(_ runtime.Runtime, _ <-chan struct{}) error { } func (rs *resourceStore) Add(obj interface{}) error { - return rs.storeProxy.Add(obj) + if err := rs.storeProxy.Add(obj); err != nil { + return err + } + + // Update indices + if resource, ok := obj.(coremodel.Resource); ok { + rs.indices.UpdateResource(resource, nil) + } + + return nil } func (rs *resourceStore) Update(obj interface{}) error { - return rs.storeProxy.Update(obj) + resource, ok := obj.(coremodel.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + // Get old resource for index update + oldObj, exists, err := rs.storeProxy.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + + var oldResource coremodel.Resource + if exists { + oldResource, _ = oldObj.(coremodel.Resource) + } + + if err := rs.storeProxy.Update(obj); err != nil { + return err + } + + // Update indices + rs.indices.UpdateResource(resource, oldResource) + + return nil } func (rs *resourceStore) Delete(obj interface{}) error { - return rs.storeProxy.Delete(obj) + if err := rs.storeProxy.Delete(obj); err != nil { + return err + } + + // Remove from indices + if resource, ok := obj.(coremodel.Resource); ok { + rs.indices.RemoveResource(resource) + } + + return nil } func (rs *resourceStore) List() []interface{} { @@ -204,3 +255,100 @@ func (rs *resourceStore) getKeysByIndexes(indexes map[string]string) ([]string, } return keySet.ToSlice(), nil } + +// getKeysByIndexesWithPrefix gets resource keys by combining exact and prefix indexes +func (rs *resourceStore) getKeysByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string) ([]string, error) { + if len(indexes) == 0 && len(prefixIndexes) == 0 { + return []string{}, nil + } + + keySet := set.New[string]() + first := true + + // Process exact match indexes using IndexWrapper + for indexName, indexValue := range indexes { + query := indexer.NewQuery(indexName, indexer.OperatorEquals, indexValue) + keys, err := rs.indices.Query(query) + if err != nil { + return nil, err + } + + if first { + keySet = set.FromSlice(keys) + first = false + } else { + nextSet := set.FromSlice(keys) + keySet = keySet.Intersection(nextSet) + } + } + + // Process prefix match indexes using IndexWrapper + for indexName, prefix := range prefixIndexes { + query := indexer.NewQuery(indexName, indexer.OperatorPrefix, prefix) + keys, err := rs.indices.Query(query) + if err != nil { + return nil, err + } + + if first { + keySet = set.FromSlice(keys) + first = false + } else { + nextSet := set.FromSlice(keys) + keySet = keySet.Intersection(nextSet) + } + } + + return keySet.ToSlice(), nil +} + +// ListByIndexesWithPrefix lists resources by indexes with prefix matching support +func (rs *resourceStore) ListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string) ([]coremodel.Resource, error) { + keys, err := rs.getKeysByIndexesWithPrefix(indexes, prefixIndexes) + if err != nil { + return nil, err + } + + resources, err := rs.GetByKeys(keys) + if err != nil { + return nil, err + } + + resources = slices.SortBy(resources, func(r coremodel.Resource) string { + return r.ResourceKey() + }) + + return resources, nil +} + +// PageListByIndexesWithPrefix lists resources by indexes with prefix matching support, pageable +func (rs *resourceStore) PageListByIndexesWithPrefix(indexes map[string]string, prefixIndexes map[string]string, pq coremodel.PageReq) (*coremodel.PageData[coremodel.Resource], error) { + keys, err := rs.getKeysByIndexesWithPrefix(indexes, prefixIndexes) + if err != nil { + return nil, err + } + + sort.Strings(keys) + total := len(keys) + + if pq.PageOffset < 0 || pq.PageOffset > total { + return nil, store.ErrorInvalidOffset + } + + // Calculate page range + end := pq.PageOffset + pq.PageSize + if end > total { + end = total + } + + // Get only the keys for current page + pageKeys := keys[pq.PageOffset:end] + + // Batch fetch resources for current page + resources, err := rs.GetByKeys(pageKeys) + if err != nil { + return nil, err + } + + return coremodel.NewPageData(total, pq.PageOffset, pq.PageSize, resources), nil +}