1- from typing import Any , Optional
1+ import time
2+ from typing import Any , Optional , Tuple
23
34from graphsenselib .db .asynchronous .services .addresses_service import AddressesService
45from graphsenselib .db .asynchronous .services .blocks_service import BlocksService
1314from graphsenselib .db .asynchronous .services .tokens_service import TokensService
1415from graphsenselib .db .asynchronous .services .txs_service import TxsService
1516from graphsenselib .tagstore .db import TagstoreDbAsync , Taxonomies
17+ from graphsenselib .tagstore .db .queries import TagPublic
1618
1719from gsrest .builtin .plugins .obfuscate_tags .obfuscate_tags import (
1820 GROUPS_HEADER_NAME ,
@@ -47,6 +49,78 @@ async def setup_cache(cls, db_engine: Any, app: Any):
4749 }
4850
4951
52+ class TagAccessLoggerTagstoreProxy :
53+ """Adds logging for which tags are accessed from the tagstore
54+ it intercepts calls to the tagstore DB
55+ and logs returned tags to redis.
56+ """
57+
58+ def __init__ (
59+ self , tagstore_db : TagstoreDbAsync , redis_client : Any , key_prefix : str
60+ ):
61+ self .tagstore_db = tagstore_db
62+ self .redis_client = redis_client
63+ self .key_prefix = key_prefix
64+
65+ def __getattr__ (self , name ):
66+ """Proxy all method calls to the underlying tagstore_db"""
67+ attr = getattr (self .tagstore_db , name )
68+
69+ if callable (attr ):
70+
71+ async def wrapper (* args , ** kwargs ):
72+ # Call the original method
73+ result = await attr (* args , ** kwargs )
74+
75+ # Log tag access if this method returns TagPublic objects
76+ should_log , is_list = self ._should_log_result (result )
77+ if self .redis_client and should_log :
78+ if is_list :
79+ for tag in result :
80+ await self ._log_tag_access (name , tag , * args , ** kwargs )
81+ else :
82+ await self ._log_tag_access (name , result , * args , ** kwargs )
83+
84+ return result
85+
86+ return wrapper
87+ else :
88+ return attr
89+
90+ def _should_log_result (self , result : Any ) -> Tuple [bool , bool ]:
91+ """Determine if this result should be logged based on data type"""
92+
93+ if not result :
94+ return False , False
95+
96+ # Check if result is a PublicTag
97+ if isinstance (result , TagPublic ):
98+ return True , False
99+
100+ # Check if result is a list of TagPublic objects
101+ if hasattr (result , "__iter__" ) and not isinstance (result , str ):
102+ try :
103+ # Check if all items in the iterable are TagPublic objects
104+ for item in result :
105+ if isinstance (item , TagPublic ):
106+ return True , True
107+ break # Only check first item for performance
108+ except (TypeError , StopIteration ):
109+ pass
110+
111+ return False , False
112+
113+ async def _log_tag_access (self , method_name : str , tag : TagPublic , * args , ** kwargs ):
114+ """Log tag access information to Redis"""
115+
116+ current_time = time .localtime ()
117+ timestamp = time .strftime ("%Y-%m-%d" , current_time )
118+ key = "|" .join (
119+ (self .key_prefix , timestamp , tag .creator , tag .network , tag .identifier )
120+ )
121+ await self .redis_client .incr (key )
122+
123+
50124class ServiceContainer :
51125 def __init__ (
52126 self ,
@@ -55,10 +129,17 @@ def __init__(
55129 tagstore_engine : any ,
56130 concepts_cache_service : ConceptsCacheService ,
57131 logger : any ,
132+ redis_client : Optional [Any ] = None ,
133+ log_tag_access_prefix : Optional [str ] = None ,
58134 ):
135+ tsdb = TagstoreDbAsync (tagstore_engine )
59136 self .config = config
60137 self .db = db
61- self .tagstore_db = TagstoreDbAsync (tagstore_engine )
138+ self .tagstore_db = (
139+ TagAccessLoggerTagstoreProxy (tsdb , redis_client , log_tag_access_prefix )
140+ if log_tag_access_prefix
141+ else tsdb
142+ )
62143 self .logger = logger
63144 self .category_cache_service = concepts_cache_service
64145
0 commit comments