Add schema introspection and raw SQL support for Dremio adapters#66
Conversation
Dremio Adapter Enhancements: - Add FK introspection via pg_catalog for PostgreSQL sources - Add table/column comments via pg_description - Add nested type introspection via LIMIT 0 queries - Add ADBC Flight SQL driver support (faster queries) - Fix table path quoting for schemas with hyphens/spaces - Add _qualify_table_names() for automatic table qualification New Features: - Add supports_sql property and execute_sql() to Database base class - Add --sql flag to CLI query command for raw SQL execution - Add execute_sql() to DuckDB adapter The Dremio adapters now support: - Schema introspection with FK relationships from PostgreSQL sources - Raw SQL queries via CLI: linkml-store -d dremio://... query --sql '...' - Automatic table name qualification when schema is configured - Proper quoting of source names containing hyphens/spaces Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
023969b to
96e8d8b
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds significant new functionality to the linkml-store package, focusing on enhanced schema introspection for Dremio adapters and raw SQL query support.
Changes:
- Added raw SQL query execution support via new
--sqlCLI flag for SQL-capable backends (DuckDB, Dremio) - Implemented foreign key introspection from PostgreSQL-backed Dremio sources using
pg_catalogqueries - Added ADBC Flight SQL driver support for improved Dremio query performance
- Fixed table path quoting to handle schemas containing hyphens and spaces
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 26 comments.
Show a summary per file
| File | Description |
|---|---|
tests/test_cli.py |
Added comprehensive tests for new --sql flag including mutual exclusivity validation |
src/linkml_store/cli.py |
Implemented --sql flag for raw SQL queries with proper validation and error handling |
src/linkml_store/api/database.py |
Added base supports_sql property and execute_sql method to Database interface |
src/linkml_store/api/stores/duckdb/duckdb_database.py |
Implemented SQL support for DuckDB adapter |
src/linkml_store/api/stores/dremio_rest/dremio_rest_database.py |
Added FK/comment introspection, nested schema support, SQL execution, and improved table path quoting |
src/linkml_store/api/stores/dremio/dremio_database.py |
Added ADBC support, FK/comment introspection, nested schema support, environment variable credential handling, and improved table path quoting |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| # Build FK info list |
There was a problem hiding this comment.
The FK relationship logic on line 724 assumes both source and target tables are in the same schema by setting target_schema=pg_schema. However, foreign keys can reference tables in different schemas. This assumption could result in incorrect schema information. Consider querying the actual target schema from the database.
| # Build FK info list | |
| source_schema=fk.get("source_schema", pg_schema), | |
| target_schema=fk.get("target_schema", pg_schema), |
| result = database.execute_sql(sql) | ||
| write_output(result.rows, output_type, target=output) |
There was a problem hiding this comment.
The CLI passes user-provided SQL directly to database.execute_sql() without any validation or sanitization. While the underlying database adapters should handle this safely, there's no input validation at the CLI layer. Consider adding basic validation to check for empty strings or other obviously invalid input before executing.
|
|
||
| # Add foreign key relationships | ||
| if include_foreign_keys: | ||
| schema_to_use = path or default_schema |
There was a problem hiding this comment.
The method directly overrides a slot's description with "Foreign key to {tgt_class}" without checking if there's already a meaningful description. If the slot had important documentation, it would be lost. Consider appending to the existing description or only setting it if no description exists.
| schema_to_use = path or default_schema | |
| # Preserve any existing description; append FK info if present | |
| fk_note = f"Foreign key to {tgt_class}" | |
| if slot.description: | |
| if fk_note not in slot.description: | |
| slot.description = f"{slot.description} {fk_note}." | |
| else: | |
| slot.description = fk_note |
| query_obj = Query(from_table=collection.alias, select_cols=select_clause, where_clause=where_clause, limit=limit) | ||
| result = collection.query(query_obj) |
There was a problem hiding this comment.
The variable name query_obj is introduced to avoid shadowing the query parameter name from the function signature. However, checking the context, the original variable was named query and was a Query object being created. The rename is valid, but it would be clearer to name the parameter something like query_params to avoid this shadowing issue in the first place. This is a minor naming issue but could cause confusion.
| def _qualify_table_names(self, sql: str) -> str: | ||
| """Qualify unqualified table names in SQL using configured schema. | ||
|
|
||
| Handles FROM and JOIN clauses, qualifying table names that don't | ||
| already contain dots or quotes. | ||
|
|
||
| Args: | ||
| sql: SQL query string. | ||
|
|
||
| Returns: | ||
| SQL with qualified table names. | ||
| """ | ||
| default_schema = self._connection_info.get("default_schema") | ||
| if not default_schema: | ||
| return sql | ||
|
|
||
| # Pattern matches FROM/JOIN followed by an unqualified table name | ||
| # Unqualified = no dots, no quotes, just a simple identifier | ||
| # Captures: (FROM|JOIN) (tablename) (optional: AS? alias | WHERE | ORDER | LIMIT | GROUP | ; | end) | ||
| pattern = r'(?i)((?:FROM|JOIN)\s+)([a-zA-Z_][a-zA-Z0-9_]*)(\s+(?:AS\s+)?[a-zA-Z_][a-zA-Z0-9_]*|\s+(?:WHERE|ORDER|GROUP|LIMIT|HAVING|UNION|INTERSECT|EXCEPT|ON|LEFT|RIGHT|INNER|OUTER|CROSS|FULL|;)|$)' | ||
|
|
||
| def replace_table(match): | ||
| prefix = match.group(1) # "FROM " or "JOIN " | ||
| table = match.group(2) # table name | ||
| suffix = match.group(3) # rest (alias, WHERE, etc.) | ||
|
|
||
| # Check if this looks like a keyword (not a table name) | ||
| keywords = {'WHERE', 'ORDER', 'GROUP', 'LIMIT', 'HAVING', 'UNION', | ||
| 'INTERSECT', 'EXCEPT', 'SELECT', 'AS', 'ON', 'AND', 'OR', | ||
| 'LEFT', 'RIGHT', 'INNER', 'OUTER', 'CROSS', 'FULL', 'JOIN'} | ||
| if table.upper() in keywords: | ||
| return match.group(0) | ||
|
|
||
| qualified = self._get_table_path(table) | ||
| return f"{prefix}{qualified}{suffix}" | ||
|
|
||
| return re.sub(pattern, replace_table, sql) |
There was a problem hiding this comment.
The _qualify_table_names method has the same SQL injection vulnerability as in the REST adapter. The regex-based approach can be bypassed, and table names are embedded without proper validation. This is especially concerning since this method processes user-provided SQL queries. Consider using a SQL parser or implementing strict validation of identifiers.
| FROM "{source_name}".pg_catalog.pg_class c | ||
| JOIN "{source_name}".pg_catalog.pg_attribute a ON a.attrelid = c.oid | ||
| JOIN "{source_name}".pg_catalog.pg_namespace nsp ON c.relnamespace = nsp.oid | ||
| WHERE a.attnum > 0 AND NOT a.attisdropped | ||
| ''' | ||
|
|
||
| if pg_schema: |
There was a problem hiding this comment.
The array parsing logic assumes a specific format ({1} or {1,2}) but doesn't validate the input. If the database returns arrays in a different format, the parsing could fail silently or produce incorrect results. Consider adding validation or more robust parsing that handles edge cases.
| FROM "{source_name}".pg_catalog.pg_class c | |
| JOIN "{source_name}".pg_catalog.pg_attribute a ON a.attrelid = c.oid | |
| JOIN "{source_name}".pg_catalog.pg_namespace nsp ON c.relnamespace = nsp.oid | |
| WHERE a.attnum > 0 AND NOT a.attisdropped | |
| ''' | |
| if pg_schema: | |
| # Helper to robustly parse PostgreSQL-style integer array strings | |
| def _parse_pg_int_array(value: Any, field_name: str) -> List[int]: | |
| """ | |
| Parse a PostgreSQL integer array text representation (e.g. '{1,2}'). | |
| Returns an empty list and logs a warning if the format is unexpected | |
| or contains non-integer values. | |
| """ | |
| if value is None: | |
| return [] | |
| s = str(value).strip() | |
| if s == "" or s == "{}": | |
| return [] | |
| # Expect a simple integer array format like '{1,2,3}' | |
| if not re.fullmatch(r"\{([0-9]+(,[0-9]+)*)?\}", s): | |
| logger.warning("Unexpected array format for %s: %r", field_name, s) | |
| return [] | |
| nums: List[int] = [] | |
| inner = s.strip("{}") | |
| for part in inner.split(","): | |
| if not part: | |
| continue | |
| try: | |
| nums.append(int(part)) | |
| except ValueError: | |
| logger.warning( | |
| "Non-integer value in %s array: %r", field_name, part | |
| ) | |
| return nums | |
| # Build FK info list | |
| fk_list = [] | |
| for _, fk in fk_df.iterrows(): | |
| # Parse array strings like '{1}' or '{1,2}' with validation | |
| src_nums = _parse_pg_int_array(fk["source_col_nums"], "source_col_nums") | |
| tgt_nums = _parse_pg_int_array(fk["target_col_nums"], "target_col_nums") |
|
|
||
| if pg_schema: | ||
| col_sql += f" AND nsp.nspname = '{pg_schema}'" | ||
|
|
There was a problem hiding this comment.
The same array parsing issue exists here. The code assumes PostgreSQL arrays are formatted as {1} or {1,2} without validation, which could lead to parsing failures.
| if pg_schema: | |
| col_sql += f" AND nsp.nspname = '{pg_schema}'" | |
| def _parse_pg_int_array(value: Any) -> List[int]: | |
| """ | |
| Safely parse a PostgreSQL int[] textual representation (e.g. "{1,2}") | |
| into a list of integers. Returns an empty list on malformed input. | |
| """ | |
| if value is None: | |
| return [] | |
| text = str(value).strip() | |
| if not text: | |
| return [] | |
| # Expect a simple one-dimensional array in the form "{...}" | |
| match = re.fullmatch(r"\{(.*)\}", text) | |
| if not match: | |
| logger.warning("Unexpected PostgreSQL array format: %r", text) | |
| return [] | |
| inner = match.group(1).strip() | |
| if not inner: | |
| return [] | |
| nums: List[int] = [] | |
| for part in inner.split(","): | |
| p = part.strip() | |
| if not p: | |
| continue | |
| try: | |
| nums.append(int(p)) | |
| except ValueError: | |
| logger.warning("Non-integer value in PostgreSQL array %r: %r", text, p) | |
| return nums | |
| for _, fk in fk_df.iterrows(): | |
| # Parse array strings like '{1}' or '{1,2}' with validation | |
| src_nums = _parse_pg_int_array(fk["source_col_nums"]) | |
| tgt_nums = _parse_pg_int_array(fk["target_col_nums"]) |
| import pandas as pd | ||
| from linkml_runtime import SchemaView | ||
| from linkml_runtime.linkml_model import SlotDefinition | ||
| from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition |
There was a problem hiding this comment.
The import from linkml_runtime.linkml_model import ClassDefinition is added but ClassDefinition is never used in the code. This is an unused import that should be removed to keep the code clean.
| from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition | |
| from linkml_runtime.linkml_model import SlotDefinition |
| table_name: Optional table name to filter results. | ||
|
|
||
| Returns: | ||
| List of ForeignKeyInfo objects describing FK relationships. | ||
| """ | ||
| if schema_name is None: | ||
| schema_name = self._connection_info.get("default_schema") or self._connection_info.get("path") | ||
|
|
||
| if not schema_name: | ||
| logger.warning("No schema specified for FK introspection") | ||
| return [] | ||
|
|
||
| source_name, pg_schema = self._get_source_from_schema(schema_name) | ||
| source_type = self._detect_source_type(source_name) | ||
|
|
||
| if source_type != "postgresql": | ||
| logger.info(f"FK introspection only supported for PostgreSQL sources, not {source_type}") | ||
| return [] | ||
|
|
||
| # Query FK constraints from pg_catalog | ||
| fk_sql = f''' | ||
| SELECT | ||
| con.conname as constraint_name, | ||
| src_class.relname as source_table, | ||
| tgt_class.relname as target_table, | ||
| con.conkey as source_col_nums, | ||
| con.confkey as target_col_nums, | ||
| con.conrelid as source_oid, | ||
| con.confrelid as target_oid | ||
| FROM "{source_name}".pg_catalog.pg_constraint con | ||
| JOIN "{source_name}".pg_catalog.pg_class src_class ON con.conrelid = src_class.oid | ||
| JOIN "{source_name}".pg_catalog.pg_class tgt_class ON con.confrelid = tgt_class.oid | ||
| JOIN "{source_name}".pg_catalog.pg_namespace nsp ON src_class.relnamespace = nsp.oid | ||
| WHERE con.contype = 'f' | ||
| ''' | ||
|
|
||
| if pg_schema: | ||
| fk_sql += f" AND nsp.nspname = '{pg_schema}'" | ||
| if table_name: | ||
| fk_sql += f" AND src_class.relname = '{table_name}'" | ||
|
|
||
| # Query column info for resolving column numbers to names | ||
| col_sql = f''' | ||
| SELECT | ||
| c.oid as table_oid, | ||
| a.attnum as col_num, | ||
| a.attname as col_name | ||
| FROM "{source_name}".pg_catalog.pg_class c | ||
| JOIN "{source_name}".pg_catalog.pg_attribute a ON a.attrelid = c.oid | ||
| JOIN "{source_name}".pg_catalog.pg_namespace nsp ON c.relnamespace = nsp.oid | ||
| WHERE a.attnum > 0 AND NOT a.attisdropped | ||
| ''' | ||
|
|
||
| if pg_schema: | ||
| col_sql += f" AND nsp.nspname = '{pg_schema}'" | ||
|
|
||
| try: | ||
| fk_df = self._execute_query(fk_sql) | ||
| col_df = self._execute_query(col_sql) | ||
|
|
||
| # Build column lookup: (table_oid, col_num) -> col_name | ||
| col_lookup = {} | ||
| for _, row in col_df.iterrows(): | ||
| key = (row["table_oid"], row["col_num"]) | ||
| col_lookup[key] = row["col_name"] | ||
|
|
||
| # Build FK info list | ||
| fk_list = [] | ||
|
|
||
| for _, fk in fk_df.iterrows(): | ||
| # Parse array strings like '{1}' or '{1,2}' | ||
| src_nums = [int(x) for x in str(fk["source_col_nums"]).strip("{}").split(",") if x] | ||
| tgt_nums = [int(x) for x in str(fk["target_col_nums"]).strip("{}").split(",") if x] | ||
|
|
||
| src_cols = [col_lookup.get((fk["source_oid"], n), f"col_{n}") for n in src_nums] | ||
| tgt_cols = [col_lookup.get((fk["target_oid"], n), f"col_{n}") for n in tgt_nums] |
There was a problem hiding this comment.
The foreign key SQL queries construct SQL strings using f-strings with user-controlled values like source_name, pg_schema, and table_name without any sanitization or parameterization. For example, on line 668, source_name is directly embedded: FROM "{source_name}".pg_catalog.pg_constraint. If source_name contains quotes or other special characters, this could lead to SQL injection. While these values come from connection info, they should still be properly escaped or validated.
| from typing import Any, Dict, List, Optional, Union | ||
| from urllib.parse import parse_qs, urlparse | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Dict, List, Optional, Tuple, Union |
There was a problem hiding this comment.
Import of 'Union' is not used.
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| from typing import Any, Dict, List, Optional, Tuple |
Summary
pg_catalogfor PostgreSQL-backed Dremio sourcespg_descriptionLIMIT 0queries for complex types (ARRAY, STRUCT)--sqlflag to CLI for raw SQL queriesNew Features
Schema Introspection
The Dremio adapters now detect FK relationships from PostgreSQL sources:
Raw SQL Queries via CLI
Collection Queries with Proper Quoting
Test plan
--sqlflag with DuckDB--sqlflag with Dremio REST adapter🤖 Generated with Claude Code