-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathnormalized_chunks.py
More file actions
93 lines (80 loc) · 2.88 KB
/
normalized_chunks.py
File metadata and controls
93 lines (80 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from dataclasses import dataclass
from functools import cached_property
from os.path import exists, join, splitext, basename
from tempfile import mkdtemp
import pandas as pd
from fsspec.implementations.local import LocalFileSystem
from pandas import DataFrame
from utz import YM, err
from ctbk.blob import DvcBlob
from ctbk.normalized import ParquetEngine, normalize_df, dedupe_sort
from ctbk.paths import S3
from ctbk.tables_dir import Tables
from ctbk.tripdata_month import TripdataMonth
from ctbk.util.df import save
from ctbk.util.region import Region, get_regions
@dataclass
class NormalizedChunks(DvcBlob):
ym: YM
engine: ParquetEngine = 'auto'
DIR = 'normalized'
@property
def path(self) -> str:
return join(S3, self.DIR, str(self.ym))
@cached_property
def fs(self):
return LocalFileSystem()
def paths(self) -> dict[str, str]:
fs = self.fs
paths = fs.glob(f'{self.path}/*_*.parquet')
return { splitext(basename(path))[0]: path for path in paths }
def normalized_region(self, region: Region) -> Tables:
ym = self.ym
csv = TripdataMonth(ym=ym, region=region)
return normalize_df(csv.df(), src=csv.zip_basename, region=region)
def dfs(self) -> Tables:
dfs_dict: dict[str, list[DataFrame]] = {}
for region in get_regions(self.ym):
for name, df in self.normalized_region(region).items():
if name not in dfs_dict:
dfs_dict[name] = []
dfs_dict[name].append(df)
rv = {}
for name, dfs in dfs_dict.items():
df = pd.concat(dfs)
df = dedupe_sort(df, f'{self.ym}/{name}')
rv[name] = df
return rv
@property
def save_kwargs(self):
return dict(write_kwargs=dict(index=False, engine=self.engine))
def save(self) -> Tables:
path = self.path
rmdir = False
if not exists(path):
self.fs.mkdir(path)
rmdir = True
rm_paths = []
try:
_dfs = self.dfs()
dfs: Tables = {}
for name, df in _dfs.items():
path = f'{path}/{name}.parquet'
save(df, url=path, **self.save_kwargs)
dfs[name] = df
rmdir = False
rm_paths = {
name: path
for name, path in self.paths().items()
if name not in dfs
}
return dfs
finally:
if rmdir:
err(f"Removing directory {path} after failed write")
self.fs.delete(path) # TODO: remove all directory levels that were created
if rm_paths:
tmpdir = mkdtemp()
err(f"Moving untracked parquets to {tmpdir}: {rm_paths}")
for name, path in rm_paths.items():
self.fs.mv(path, f'{tmpdir}/{name}.parquet')