-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathdata_loader.py
More file actions
126 lines (107 loc) · 4.53 KB
/
data_loader.py
File metadata and controls
126 lines (107 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Data Loader CLI STAC_API Ingestion Tool."""
import os
from typing import Any
import click
import orjson
from httpx import Client
def load_data(filepath: str) -> dict[str, Any]:
"""Load json data from a file within the specified data directory."""
try:
with open(filepath, "rb") as file:
return orjson.loads(file.read())
except FileNotFoundError as e:
click.secho(f"File not found: {filepath}", fg="red", err=True)
raise click.Abort() from e
def load_collection(client: Client, collection_id: str, data_dir: str) -> None:
"""Load a STAC collection into the database."""
collection = load_data(os.path.join(data_dir, "collection.json"))
collection["id"] = collection_id
resp = client.post("/collections", json=collection)
if resp.status_code == 200 or resp.status_code == 201:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added collection: {collection['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Collection: {collection['id']} already exists")
else:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Error writing {collection['id']} collection. Message: {resp.text}")
def load_items(
client: Client, collection_id: str, use_bulk: bool, data_dir: str
) -> None:
"""Load STAC items into the database based on the method selected."""
with os.scandir(data_dir) as entries:
# Attempt to dynamically find a suitable feature collection file
# Use the first found feature collection file
feature_file = next(
(
entry.path
for entry in entries
if entry.is_file()
and entry.name.endswith(".json")
and entry.name != "collection.json"
),
None,
)
if feature_file is None:
click.secho(
"No feature collection files found in the specified directory.",
fg="red",
err=True,
)
raise click.Abort()
feature_collection = load_data(feature_file)
load_collection(client, collection_id, data_dir)
if use_bulk:
load_items_bulk_insert(client, collection_id, feature_collection)
else:
load_items_one_by_one(client, collection_id, feature_collection)
def load_items_one_by_one(
client: Client, collection_id: str, feature_collection: dict[str, Any]
) -> None:
"""Load STAC items into the database one by one."""
for feature in feature_collection["features"]:
feature["collection"] = collection_id
resp = client.post(f"/collections/{collection_id}/items", json=feature)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added item: {feature['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Item: {feature['id']} already exists")
def load_items_bulk_insert(
client: Client, collection_id: str, feature_collection: dict[str, Any]
) -> None:
"""Load STAC items into the database via bulk insert."""
for feature in feature_collection["features"]:
feature["collection"] = collection_id
resp = client.post(f"/collections/{collection_id}/items", json=feature_collection)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk inserted items successfully.")
elif resp.status_code == 204:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk update successful, no content returned.")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo("Conflict detected, some items might already exist.")
@click.command()
@click.option("--base-url", required=True, help="Base URL of the STAC API")
@click.option(
"--collection-id",
default="test-collection",
help="ID of the collection to which items are added",
)
@click.option("--use-bulk", is_flag=True, help="Use bulk insert method for items")
@click.option(
"--data-dir",
type=click.Path(exists=True),
default="sample_data/",
help="Directory containing collection.json and feature collection file",
)
def main(base_url: str, collection_id: str, use_bulk: bool, data_dir: str) -> None:
"""Load STAC items into the database."""
with Client(base_url=base_url) as client:
load_items(client, collection_id, use_bulk, data_dir)
if __name__ == "__main__":
main()