@@ -41,19 +41,62 @@ def insert(self, objs: Union[OBJECT, List[OBJECT]], **kwargs):
4141 del obj ["_id" ]
4242 self ._post_insert_hook (objs )
4343
44- def upsert (
45- self ,
46- objs : Union [Dict [str , Any ], List [Dict [str , Any ]]],
47- filter_fields : List [str ],
48- update_fields : Union [List [str ], None ] = None ,
49- ** kwargs
50- ):
44+
45+ def index (self ,
46+ objs : Union [OBJECT , List [OBJECT ]],
47+ index_name : Optional [str ] = None ,
48+ replace : bool = False ,
49+ unique : bool = False ,
50+ ** kwargs ):
51+ """
52+ Create indexes on the collection.
53+
54+ :param objs: Field(s) to index.
55+ :param index_name: Optional name for the index.
56+ :param replace: If True, the index will be dropped and recreated.
57+ :param unique: If True, creates a unique index (default: False).
58+ """
59+
60+ if not isinstance (objs , list ):
61+ objs = [objs ]
62+
63+ existing_indexes = self .mongo_collection .index_information ()
64+
65+ for obj in objs :
66+ field_exists = False
67+ index_to_drop = None
68+
69+ # Extract existing index details
70+ for index_name_existing , index_details in existing_indexes .items ():
71+ indexed_fields = [field [0 ] for field in index_details .get ("key" , [])] # Extract field names
72+
73+ if obj in indexed_fields : # If this field is already indexed
74+ field_exists = True
75+ index_to_drop = index_name_existing if replace else None
76+
77+ # Drop the index if replace=True and index_to_drop is valid
78+ if index_to_drop :
79+ self .mongo_collection .drop_index (index_to_drop )
80+ logging .debug (f"Dropped existing index: { index_to_drop } " )
81+
82+ # Create the new index only if it doesn't exist or was dropped
83+ if not field_exists or replace :
84+ self .mongo_collection .create_index (obj , name = index_name , unique = unique )
85+ logging .debug (f"Created new index: { index_name } on field { obj } , unique={ unique } " )
86+ else :
87+ logging .debug (f"Index already exists for field { obj } , skipping creation." )
88+
89+ def upsert (self ,
90+ objs : Union [OBJECT , List [OBJECT ]],
91+ filter_fields : List [str ],
92+ update_fields : Optional [List [str ]] = None ,
93+ ** kwargs ):
5194 """
5295 Upsert one or more documents into the MongoDB collection.
5396
54- :params: objs (Union[Dict[str, Any], List[Dict[str, Any]]]) : The document(s) to insert or update.
55- :params: filter_fields (List[str]) : List of field names to use as the filter for matching existing documents.
56- :params: update_fields (Union[List[str], None]) : List of field names to include in the update. If None, all fields are updated.
97+ :param objs: The document(s) to insert or update.
98+ :param filter_fields: List of field names to use as the filter for matching existing documents.
99+ :param update_fields: List of field names to include in the update. If None, all fields are updated.
57100 """
58101 if not isinstance (objs , list ):
59102 objs = [objs ]
@@ -64,20 +107,22 @@ def upsert(
64107 if not filter_criteria :
65108 raise ValueError ("At least one valid filter field must be present in each object." )
66109
67- # Handle None for update_fields: Update all fields if None
68- if update_fields is None :
69- update_data = {k : v for k , v in obj .items () if v is not None }
70- else :
71- update_data = {field : obj [field ] for field in update_fields if field in obj and obj [field ] is not None }
110+ # Check if a document already exists
111+ existing_doc = self .mongo_collection .find_one (filter_criteria )
72112
73- # Use MongoDB's $set operator to update only the specified fields
74- update_operation = {"$set" : update_data }
113+ if existing_doc :
114+ # Update only changed fields
115+ updates = {key : obj [key ] for key in update_fields if key in obj and obj [key ] != existing_doc .get (key )}
75116
76- self .mongo_collection .update_one (
77- filter = filter_criteria ,
78- update = update_operation ,
79- upsert = True ,
80- )
117+ if updates :
118+ self .mongo_collection .update_one (filter_criteria , {"$set" : updates })
119+ logging .debug (f"Updated existing document: { filter_criteria } with { updates } " )
120+ else :
121+ logging .debug (f"No changes detected for document: { filter_criteria } . Skipping update." )
122+ else :
123+ # Insert a new document
124+ self .mongo_collection .insert_one (obj )
125+ logging .debug (f"Inserted new document: { obj } " )
81126
82127 def query (self , query : Query , limit : Optional [int ] = None , offset : Optional [int ] = None , ** kwargs ) -> QueryResult :
83128 mongo_filter = self ._build_mongo_filter (query .where_clause )
0 commit comments