11import logging
22from collections import OrderedDict
3- from datetime import datetime , timedelta , timezone
3+ from datetime import datetime , timedelta , timezone , date
44from typing import Dict , List , Optional , Any , Union
55from enum import Enum
66
7- from bumiworker .bumiworker .modules .base import ModuleBase
8- from bumiworker .bumiworker .modules .pricing .aws_cloudwatch import DEFAULT as CWL_PRICING
7+ from bumiworker .bumiworker .modules .base import ModuleBase , DAYS_IN_MONTH
98
109SUPPORTED_CLOUD_TYPES = ("aws_cnr" ,)
1110
@@ -145,43 +144,74 @@ def _is_inactive(self, resource: Dict, days_threshold: int) -> bool:
145144 except Exception :
146145 return False
147146
148- def _estimate_saving (self , resource : Dict ) -> float :
147+ def _real_saving_payload (
148+ self ,
149+ resource : Dict [str , Any ],
150+ today : date
151+ ) -> Optional [Dict [str , float ]]:
149152 """
150- Calculate potential monthly savings for an inactive log group based on AWS pricing,
151- without considering lifecycle policy changes.
152-
153- Uses three components:
154- - Ingestion (IngestionBytes): USD 0.50 per GB (last 30 days)
155- - Storage (stored_bytes): USD 0.03 per GB-month compressed (apply 0.15 compression factor)
156- - Query (QueryBytes): USD 0.005 per GB scanned (last 30 days)
153+ Build saving payload backed by ClickHouse expenses for the log group.
157154 """
155+ cloud_account_id = resource .get ("cloud_account_id" )
156+ resource_id = resource .get ("resource_id" )
157+ if not cloud_account_id or not resource_id :
158+ return None
158159
159- try :
160- stored_bytes = self ._get_from_resource (
161- resource , 'stored_bytes' , 0 ) or 0
162-
163- uncompressed_gb = stored_bytes / BYTES_PER_GIB
164- compressed_gb = uncompressed_gb * CWL_PRICING .compression_factor
165- storage_monthly_cost = compressed_gb * CWL_PRICING .storage_usd_per_gb_month
166-
167- metrics = self ._get_metrics (resource )
168- ingestion_metrics = metrics .get (
169- MetricKey .INGESTION .value , []) or []
170- query_metrics = metrics .get (MetricKey .QUERY .value , []) or []
171-
172- ingestion_bytes = self ._sum_metrics_last_month (ingestion_metrics )
173- query_bytes = self ._sum_metrics_last_month (query_metrics )
174-
175- ingestion_gb = ingestion_bytes / BYTES_PER_GIB
176- query_gb = query_bytes / BYTES_PER_GIB
177-
178- ingestion_cost = ingestion_gb * CWL_PRICING .ingestion_usd_per_gb
179- query_cost = query_gb * CWL_PRICING .query_usd_per_gb
160+ real_cost = self ._log_group_monthly_cost (cloud_account_id , resource_id , today )
161+ if real_cost is None :
162+ return None
180163
181- total = storage_monthly_cost + ingestion_cost + query_cost
182- return float (total )
183- except Exception :
184- return 0.0
164+ stored_bytes = int (resource .get ("stored_bytes" ) or 0 )
165+ stored_gb = stored_bytes / BYTES_PER_GIB if stored_bytes else 0.0
166+
167+ return {
168+ "saving" : max (0.0 , float (real_cost )),
169+ "current_cost_month" : float (real_cost ),
170+ "stored_gb" : stored_gb ,
171+ }
172+
173+ def _log_group_monthly_cost (
174+ self ,
175+ cloud_account_id : str ,
176+ resource_id : str ,
177+ today : date
178+ ) -> Optional [float ]:
179+ """
180+ Sum daily CUR expenses for the log group over the last month.
181+ """
182+ start_date = today - timedelta (days = DAYS_IN_MONTH )
183+ query = """
184+ SELECT date, sum(cost)
185+ FROM expenses
186+ WHERE cloud_account_id = %(cloud_account_id)s
187+ AND resource_id = %(resource_id)s
188+ AND date >= %(start_date)s
189+ AND date < %(end_date)s
190+ GROUP BY date
191+ """
192+ try :
193+ rows = self .clickhouse_client .query (
194+ query = query ,
195+ parameters = {
196+ "cloud_account_id" : cloud_account_id ,
197+ "resource_id" : resource_id ,
198+ "start_date" : start_date ,
199+ "end_date" : today ,
200+ }
201+ ).result_rows
202+ except Exception as exc :
203+ LOG .warning (
204+ "Failed to fetch CUR expenses for log group %s: %s" ,
205+ resource_id , str (exc )
206+ )
207+ return None
208+ total = 0.0
209+ for _ , cost in rows :
210+ try :
211+ total += float (cost or 0 )
212+ except (TypeError , ValueError ):
213+ continue
214+ return total
185215
186216 def _aggregate_resources (
187217 self , cloud_account_id : str ) -> List [Dict [str , Any ]]:
@@ -233,6 +263,10 @@ def _get(self):
233263 ca_map = self .get_cloud_accounts (
234264 SUPPORTED_CLOUD_TYPES , skip_cloud_accounts )
235265
266+ today = (
267+ datetime .utcfromtimestamp (self .created_at ).date ()
268+ if self .created_at else self ._utc_now ().date ()
269+ )
236270 result : List [Dict [str , Any ]] = []
237271
238272 for ca in ca_map :
@@ -253,7 +287,10 @@ def _get(self):
253287 if not self ._is_inactive (r , days_threshold ):
254288 continue
255289
256- saving = self ._estimate_saving (r )
290+ saving_payload = self ._real_saving_payload (r , today )
291+ if not saving_payload :
292+ continue
293+ saving = saving_payload .get ("saving" , 0.0 )
257294 ca_info = ca_map .get (r ['cloud_account_id' ], {})
258295
259296 metrics = self ._get_metrics (r )
@@ -263,7 +300,7 @@ def _get(self):
263300 query_occurrences = self ._count_occurrences_in_threshold (
264301 metrics .get (MetricKey .QUERY .value , []), days_threshold )
265302
266- result . append ( {
303+ item = {
267304 'cloud_resource_id' : r .get ('resource_id' ),
268305 'resource_id' : r .get ('resource_id' ),
269306 'resource_name' : r .get ('name' ),
@@ -282,7 +319,13 @@ def _get(self):
282319 'ingestion' : int (ingestion_occurrences ),
283320 'query' : int (query_occurrences ),
284321 'saving' : saving ,
285- })
322+ }
323+ if "current_cost_month" in saving_payload :
324+ item ["current_cost_month" ] = round (
325+ saving_payload ["current_cost_month" ], 2 )
326+ if "stored_gb" in saving_payload :
327+ item ["stored_gb" ] = round (saving_payload ["stored_gb" ], 3 )
328+ result .append (item )
286329
287330 return result
288331
@@ -294,4 +337,4 @@ def main(organization_id, config_client, created_at, **kwargs):
294337
295338
296339def get_module_email_name ():
297- return 'Inactive CloudWatch Log Groups'
340+ return 'Inactive CloudWatch Log Groups'
0 commit comments