Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
strategy:
matrix:
python-version: [3.9, 3.10, 3.11]
python-version: [3.9, "3.10", 3.11]

steps:
- uses: actions/checkout@v2
Expand Down
40 changes: 37 additions & 3 deletions tap_sumologic/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import time
from datetime import datetime
from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Iterable, Mapping, Optional

from tap_sumologic.client import SumoLogicStream

Expand All @@ -24,6 +24,7 @@ def __init__(
quantization: Optional[int] = None,
rollup: Optional[str] = None,
timeshift: Optional[int] = None,
table_config: Optional[dict] = None,
) -> None:
"""Class initialization.

Expand All @@ -40,8 +41,17 @@ def __init__(
quantization: see tap.py
rollup: see tap.py
timeshift: see tap.py
table_config: the table configuration for lazy schema discovery

"""
# Initialize with a placeholder schema if None
if schema is None:
# Provide a minimal schema that will be replaced during first sync
schema = {"type": "object", "properties": {}, "key_properties": []}
schema_provided = False
else:
schema_provided = True

super().__init__(tap=tap, schema=schema)

if primary_keys is None:
Expand All @@ -57,14 +67,38 @@ def __init__(
self.quantization = quantization
self.rollup = rollup
self.timeshift = timeshift
self.table_config = table_config
self._schema_discovered = schema_provided

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
def get_records( # noqa: C901
self, context: Optional[Mapping[str, Any]]
) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.

The optional `context` argument is used to identify a specific slice of the
stream if partitioning is required for the stream. Most implementations do not
require partitioning and should ignore the `context` argument.
"""
# Perform lazy schema discovery if needed
if not self._schema_discovered and self.table_config:
self.logger.info(
f"Discovering schema for stream '{self.name}' before processing..."
)
# Type ignore because _tap is typed as Tap but we know it's TapSumoLogic
discovered_schema = self._tap.get_schema_for_table( # type: ignore
self.table_config
)

# Update the internal schema (use _schema instead of schema property)
self._schema = discovered_schema

# Update primary keys from discovered schema if not set
if not self.primary_keys and "key_properties" in discovered_schema:
self.primary_keys = discovered_schema["key_properties"]

self._schema_discovered = True
self.logger.info(f"Schema discovered for stream '{self.name}'")

self.logger.info("Running query in sumologic to get records")

records = []
Expand Down Expand Up @@ -119,7 +153,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
self.logger.info(f"Got {self.query_type} {count} of {record_count}")

recs = response[self.query_type]
# extract the result maps to put them in the list of records

for rec in recs:
records.append({**rec["map"], **custom_columns})

Expand Down
25 changes: 20 additions & 5 deletions tap_sumologic/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,33 +165,47 @@ def discover_streams(self) -> List[SearchJobStream]: # type: ignore
streams = []
for stream in self.config["tables"]:
schema_config = stream.get("schema")

# Store the schema_config and table_config for lazy evaluation
# Schema will be discovered when the stream is actually processed
schema = None
if isinstance(schema_config, str):
self.logger.info("Found path to a schema, not doing discovery.")
self.logger.info(
"Found path to a schema, will load it when stream starts."
)
with open(schema_config, "r") as f:
schema = json.load(f)

elif isinstance(schema_config, dict):
self.logger.info("Found schema in config, not doing discovery.")
self.logger.info(
"Found schema in config, will use it when stream starts."
)
builder = SchemaBuilder()
builder.add_schema(schema_config)
schema = builder.to_schema()

else:
self.logger.info("No schema found. Inferring schema from API call.")
schema = self.get_schema_for_table(stream)
self.logger.info(
"No schema found. Will infer schema from API call"
" when stream starts."
)
schema = None

if stream["query_type"] not in ("records", "messages", "metrics"):
raise ValueError(
f"Invalid query_type: {stream['query_type']}. "
"Must be one of 'records' or 'messages'."
)

# Determine primary keys - use from config or defer to schema discovery
primary_keys = stream["primary_keys"] or []

streams.append(
SearchJobStream(
tap=self,
name=stream["table_name"],
query_type=stream["query_type"],
primary_keys=stream["primary_keys"] or schema["key_properties"],
primary_keys=primary_keys,
replication_key=stream.get(
"replication_key", self.config.get("replication_key", "")
),
Expand All @@ -202,6 +216,7 @@ def discover_streams(self) -> List[SearchJobStream]: # type: ignore
quantization=stream.get("quantization"),
rollup=stream.get("rollup"),
timeshift=stream.get("timeshift"),
table_config=stream,
)
)

Expand Down