Skip to content

Commit b9b4e60

Browse files
authored
Add multi tenancy support (#99)
1 parent cab6e75 commit b9b4e60

15 files changed

+435
-183
lines changed

packages/flare/bin/constants.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,23 @@
1111

1212
class PasswordKeys(Enum):
1313
API_KEY = "api_key"
14-
TENANT_ID = "tenant_id"
14+
TENANT_IDS = "tenant_ids"
1515
INGEST_FULL_EVENT_DATA = "ingest_full_event_data"
1616
SEVERITIES_FILTER = "severities_filter"
1717
SOURCE_TYPES_FILTER = "source_types_filter"
1818

1919

2020
class DataStoreKeys(Enum):
21-
LAST_INGESTED_TENANT_ID = "last_ingested_tenant_id"
2221
START_DATE = "start_date"
2322
TIMESTAMP_LAST_FETCH = "timestamp_last_fetch"
2423

24+
SECTION_METADATA = "metadata"
25+
SECTION_TENANT_DATA = "tenant_data"
26+
2527
@staticmethod
2628
def get_next_token(tenant_id: int) -> str:
2729
return f"next_{tenant_id}"
30+
31+
@staticmethod
32+
def get_earliest_ingested(tenant_id: int) -> str:
33+
return f"timestamp_earliest_ingested_{tenant_id}"

packages/flare/bin/cron_job_ingest_events.py

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def main(
4343
return
4444

4545
api_key = get_api_key(storage_passwords=storage_passwords)
46-
tenant_id = get_tenant_id(storage_passwords=storage_passwords)
46+
tenant_ids = get_tenant_ids(storage_passwords=storage_passwords)
4747
ingest_full_event_data = get_ingest_full_event_data(
4848
storage_passwords=storage_passwords
4949
)
@@ -52,36 +52,71 @@ def main(
5252

5353
data_store.set_last_fetch(datetime.now(timezone.utc))
5454

55-
# If the tenant has changed, update the start date so that future requests will be based off today
56-
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
57-
# that tenant was switched in the first place.
58-
if data_store.get_last_tenant_id() != tenant_id:
59-
data_store.set_start_date((datetime.now(timezone.utc) - timedelta(days=30)))
55+
total_events_fetched_count = 0
6056

61-
data_store.set_last_tenant_id(tenant_id)
57+
for tenant_id in tenant_ids:
58+
events_fetched_count = 0
6259

63-
events_fetched_count = 0
64-
for event, next_token in fetch_feed(
65-
logger=logger,
66-
api_key=api_key,
67-
tenant_id=tenant_id,
68-
ingest_full_event_data=ingest_full_event_data,
69-
severities=severities_filter,
70-
source_types=source_types_filter,
71-
flare_api_cls=flare_api_cls,
72-
data_store=data_store,
73-
):
74-
data_store.set_last_fetch(datetime.now(timezone.utc))
60+
# The earliest ingested date serves as a low water mark to look
61+
# for identifiers 30 days prior to the day a tenant was first configured.
62+
start_date = data_store.get_earliest_ingested_by_tenant(tenant_id)
63+
if not start_date:
64+
start_date = datetime.now(timezone.utc) - timedelta(days=30)
65+
data_store.set_earliest_ingested_by_tenant(tenant_id, start_date)
66+
67+
for event, next_token in fetch_feed(
68+
logger=logger,
69+
api_key=api_key,
70+
tenant_id=tenant_id,
71+
ingest_full_event_data=ingest_full_event_data,
72+
severities=severities_filter,
73+
source_types=source_types_filter,
74+
flare_api_cls=flare_api_cls,
75+
data_store=data_store,
76+
):
77+
data_store.set_last_fetch(datetime.now(timezone.utc))
78+
79+
data_store.set_next_by_tenant(tenant_id, next_token)
80+
81+
event["tenant_id"] = tenant_id
7582

76-
data_store.set_next_by_tenant(tenant_id, next_token)
83+
# stdout is picked up by splunk and this is how events
84+
# are ingested after being retrieved from Flare.
85+
print(json.dumps(event), flush=True)
7786

78-
# stdout is picked up by splunk and this is how events
79-
# are ingested after being retrieved from Flare.
80-
print(json.dumps(event), flush=True)
87+
events_fetched_count += 1
88+
logger.info(f"Fetched {events_fetched_count} events on tenant {tenant_id}")
89+
total_events_fetched_count += events_fetched_count
8190

82-
events_fetched_count += 1
91+
logger.info(f"Fetched {events_fetched_count} events across all tenants")
92+
93+
94+
def fetch_feed(
95+
logger: Logger,
96+
api_key: str,
97+
tenant_id: int,
98+
ingest_full_event_data: bool,
99+
severities: list[str],
100+
source_types: list[str],
101+
flare_api_cls: type[FlareAPI],
102+
data_store: ConfigDataStore,
103+
) -> Iterator[tuple[dict, str]]:
104+
flare_api: FlareAPI = flare_api_cls(api_key=api_key, tenant_id=tenant_id)
83105

84-
logger.info(f"Fetched {events_fetched_count} events")
106+
try:
107+
next = data_store.get_next_by_tenant(tenant_id)
108+
start_date = data_store.get_earliest_ingested_by_tenant(tenant_id)
109+
logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}")
110+
for event_next in flare_api.fetch_feed_events(
111+
next=next,
112+
start_date=start_date,
113+
ingest_full_event_data=ingest_full_event_data,
114+
severities=severities,
115+
source_types=source_types,
116+
):
117+
yield event_next
118+
except Exception as e:
119+
logger.error(f"Exception={e}")
85120

86121

87122
def get_storage_password_value(
@@ -103,18 +138,20 @@ def get_api_key(storage_passwords: client.StoragePasswords) -> str:
103138
return api_key
104139

105140

106-
def get_tenant_id(storage_passwords: client.StoragePasswords) -> int:
107-
stored_tenant_id = get_storage_password_value(
108-
storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_ID.value
141+
def get_tenant_ids(storage_passwords: client.StoragePasswords) -> list[int]:
142+
stored_tenant_ids = get_storage_password_value(
143+
storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_IDS.value
109144
)
110145
try:
111-
tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None
146+
tenant_ids: Optional[list[int]] = (
147+
json.loads(stored_tenant_ids) if stored_tenant_ids else None
148+
)
112149
except Exception:
113150
pass
114151

115-
if not tenant_id:
116-
raise Exception("Tenant ID not found")
117-
return tenant_id
152+
if not tenant_ids:
153+
raise Exception("Tenant IDs not found")
154+
return tenant_ids
118155

119156

120157
def get_ingest_full_event_data(storage_passwords: client.StoragePasswords) -> bool:
@@ -151,34 +188,6 @@ def get_source_types_filter(storage_passwords: client.StoragePasswords) -> list[
151188
return []
152189

153190

154-
def fetch_feed(
155-
logger: Logger,
156-
api_key: str,
157-
tenant_id: int,
158-
ingest_full_event_data: bool,
159-
severities: list[str],
160-
source_types: list[str],
161-
flare_api_cls: type[FlareAPI],
162-
data_store: ConfigDataStore,
163-
) -> Iterator[tuple[dict, str]]:
164-
try:
165-
flare_api: FlareAPI = flare_api_cls(api_key=api_key, tenant_id=tenant_id)
166-
167-
next = data_store.get_next_by_tenant(tenant_id)
168-
start_date = data_store.get_start_date()
169-
logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}")
170-
for event_next in flare_api.fetch_feed_events(
171-
next=next,
172-
start_date=start_date,
173-
ingest_full_event_data=ingest_full_event_data,
174-
severities=severities,
175-
source_types=source_types,
176-
):
177-
yield event_next
178-
except Exception as e:
179-
logger.error(f"Exception={e}")
180-
181-
182191
def get_splunk_service(logger: Logger, token: str) -> client.Service:
183192
try:
184193
splunk_service = client.connect(

packages/flare/bin/data_store.py

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
class ConfigDataStore:
1818
def __init__(self) -> None:
19-
config_store = configparser.ConfigParser()
19+
config_store = configparser.RawConfigParser()
2020
config_store.read(config_path)
2121

2222
# Add data sections
23-
if "metadata" not in config_store.sections():
24-
config_store.add_section("metadata")
25-
if "next_tokens" not in config_store.sections():
26-
config_store.add_section("next_tokens")
23+
if DataStoreKeys.SECTION_METADATA.value not in config_store.sections():
24+
config_store.add_section(DataStoreKeys.SECTION_METADATA.value)
25+
if DataStoreKeys.SECTION_TENANT_DATA.value not in config_store.sections():
26+
config_store.add_section(DataStoreKeys.SECTION_TENANT_DATA.value)
2727
self._store = config_store
2828

2929
def _commit(self) -> None:
@@ -33,47 +33,12 @@ def _commit(self) -> None:
3333
def _sync(self) -> None:
3434
self._store.read(config_path)
3535

36-
def get_last_tenant_id(self) -> Optional[int]:
37-
self._sync()
38-
last_ingested_tenant_id = self._store.get(
39-
"metadata", DataStoreKeys.LAST_INGESTED_TENANT_ID.value, fallback=None
40-
)
41-
42-
try:
43-
return int(last_ingested_tenant_id) if last_ingested_tenant_id else None
44-
except Exception:
45-
pass
46-
return None
47-
48-
def set_last_tenant_id(self, tenant_id: int) -> None:
49-
self._store.set(
50-
"metadata", DataStoreKeys.LAST_INGESTED_TENANT_ID.value, str(tenant_id)
51-
)
52-
self._commit()
53-
54-
def get_start_date(self) -> Optional[datetime]:
55-
self._sync()
56-
start_date = self._store.get(
57-
"metadata", DataStoreKeys.START_DATE.value, fallback=None
58-
)
59-
60-
if start_date:
61-
try:
62-
return datetime.fromisoformat(start_date)
63-
except Exception:
64-
pass
65-
return None
66-
67-
def set_start_date(self, start_date: datetime) -> None:
68-
self._store.set(
69-
"metadata", DataStoreKeys.START_DATE.value, start_date.isoformat()
70-
)
71-
self._commit()
72-
7336
def get_last_fetch(self) -> Optional[datetime]:
7437
self._sync()
7538
last_fetched = self._store.get(
76-
"metadata", DataStoreKeys.TIMESTAMP_LAST_FETCH.value, fallback=None
39+
DataStoreKeys.SECTION_METADATA.value,
40+
DataStoreKeys.TIMESTAMP_LAST_FETCH.value,
41+
fallback=None,
7742
)
7843

7944
if last_fetched:
@@ -85,7 +50,7 @@ def get_last_fetch(self) -> Optional[datetime]:
8550

8651
def set_last_fetch(self, last_fetch: datetime) -> None:
8752
self._store.set(
88-
"metadata",
53+
DataStoreKeys.SECTION_METADATA.value,
8954
DataStoreKeys.TIMESTAMP_LAST_FETCH.value,
9055
last_fetch.isoformat(),
9156
)
@@ -94,7 +59,7 @@ def set_last_fetch(self, last_fetch: datetime) -> None:
9459
def get_next_by_tenant(self, tenant_id: int) -> Optional[str]:
9560
self._sync()
9661
return self._store.get(
97-
"next_tokens",
62+
DataStoreKeys.SECTION_TENANT_DATA.value,
9863
DataStoreKeys.get_next_token(tenant_id=tenant_id),
9964
fallback=None,
10065
)
@@ -104,5 +69,33 @@ def set_next_by_tenant(self, tenant_id: int, next: Optional[str]) -> None:
10469
return
10570

10671
self._store.set(
107-
"next_tokens", DataStoreKeys.get_next_token(tenant_id=tenant_id), next
72+
DataStoreKeys.SECTION_TENANT_DATA.value,
73+
DataStoreKeys.get_next_token(tenant_id=tenant_id),
74+
next,
75+
)
76+
self._commit()
77+
78+
def get_earliest_ingested_by_tenant(self, tenant_id: int) -> Optional[datetime]:
79+
self._sync()
80+
earliest_ingested = self._store.get(
81+
DataStoreKeys.SECTION_TENANT_DATA.value,
82+
DataStoreKeys.get_earliest_ingested(tenant_id=tenant_id),
83+
fallback=None,
10884
)
85+
86+
if earliest_ingested:
87+
try:
88+
return datetime.fromisoformat(earliest_ingested)
89+
except Exception:
90+
pass
91+
return None
92+
93+
def set_earliest_ingested_by_tenant(
94+
self, tenant_id: int, earliest_ingested: datetime
95+
) -> None:
96+
self._store.set(
97+
DataStoreKeys.SECTION_TENANT_DATA.value,
98+
DataStoreKeys.get_earliest_ingested(tenant_id=tenant_id),
99+
earliest_ingested.isoformat(),
100+
)
101+
self._commit()

packages/flare/bin/flare_external_requests.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,11 @@ def handle_GET(self) -> None:
9090

9191
data_store = ConfigDataStore()
9292
last_fetched_timestamp = data_store.get_last_fetch()
93-
last_tenant_id = data_store.get_last_tenant_id()
9493

9594
status_resp = {
9695
"last_fetched_at": last_fetched_timestamp.isoformat()
9796
if last_fetched_timestamp is not None
98-
else None,
99-
"last_tenant_id": last_tenant_id,
97+
else None
10098
}
10199
logger.debug(f"FlareIngestionStatus: {status_resp}")
102100
self.response.setHeader("Content-Type", "application/json")

packages/flare/tests/bin/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def mock_env(mock_config_file: Path) -> Generator[None, None, None]:
116116
@pytest.fixture
117117
def data_store(mock_env: None) -> Generator[ConfigDataStore, None, None]:
118118
# Creates an instance of ConfigDataStore with mocked dependencies.
119-
with mock.patch("configparser.ConfigParser.read") as mock_read:
119+
with mock.patch("configparser.RawConfigParser.read") as mock_read:
120120
mock_read.return_value = None
121121
store = ConfigDataStore()
122122
store._commit = lambda: None

packages/flare/tests/bin/test_data_store.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,6 @@
33
from datetime import timezone
44

55

6-
def test_get_and_set_last_tenant_id(data_store: ConfigDataStore) -> None:
7-
data_store.set_last_tenant_id(123)
8-
assert data_store.get_last_tenant_id() == 123
9-
10-
11-
def test_get_and_set_start_date(data_store: ConfigDataStore) -> None:
12-
date = datetime(2024, 3, 6, 12, 0, 0, tzinfo=timezone.utc)
13-
data_store.set_start_date(date)
14-
assert data_store.get_start_date() == date
15-
16-
176
def test_get_and_set_last_fetch(data_store: ConfigDataStore) -> None:
187
date = datetime(2024, 3, 6, 14, 0, 0, tzinfo=timezone.utc)
198
data_store.set_last_fetch(date)
@@ -25,3 +14,10 @@ def test_get_and_set_next_by_tenant(data_store: ConfigDataStore) -> None:
2514
next_token = "next_token_value"
2615
data_store.set_next_by_tenant(tenant_id, next_token)
2716
assert data_store.get_next_by_tenant(tenant_id) == "next_token_value"
17+
18+
19+
def test_get_and_set_earliest_ingested_by_tenant(data_store: ConfigDataStore) -> None:
20+
tenant_id = 789
21+
date = datetime(2024, 3, 6, 14, 0, 0, tzinfo=timezone.utc)
22+
data_store.set_earliest_ingested_by_tenant(tenant_id, date)
23+
assert data_store.get_earliest_ingested_by_tenant(tenant_id) == date

0 commit comments

Comments
 (0)