Skip to content

Commit f3c2012

Browse files
authored
Merge pull request #462 from stefanseefeld/dask
Use dask to parallelize chunk computation as much as possible.
2 parents 82c9de7 + 14110a0 commit f3c2012

File tree

4 files changed

+78
-22
lines changed

4 files changed

+78
-22
lines changed

docs/source/whatsnew/0.6.0.txt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
Release |version|
2+
-----------------
3+
4+
:Release: |version|
5+
:Date: TBD
6+
7+
New Features
8+
------------
9+
10+
None
11+
12+
Experimental Features
13+
---------------------
14+
15+
.. warning::
16+
17+
Experimental features are subject to change.
18+
19+
None
20+
21+
New Backends
22+
------------
23+
24+
None
25+
26+
Improved Backends
27+
-----------------
28+
29+
* Optimized `chunks` backend to allow parallel evaluation using `dask`
30+
whenever the iteratable is a list of callables.
31+
32+
API Changes
33+
-----------
34+
35+
None
36+
37+
Bug Fixes
38+
---------
39+
40+
None
41+
42+
Miscellaneous
43+
-------------
44+
45+
None

odo/backends/csv.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ..temp import Temp
3131
from ..numpy_dtype import dshape_to_pandas
3232
from .pandas import coerce_datetimes
33+
from functools import partial
3334

3435
dialect_terms = '''delimiter doublequote escapechar lineterminator quotechar
3536
quoting skipinitialspace strict'''.split()
@@ -321,11 +322,8 @@ def CSV_to_chunks_of_dataframes(c, chunksize=2 ** 20, **kwargs):
321322
else:
322323
rest = []
323324

324-
def _():
325-
yield first
326-
for df in rest:
327-
yield df
328-
return chunks(pd.DataFrame)(_)
325+
data = [first] + rest
326+
return chunks(pd.DataFrame)(data)
329327

330328

331329
@discover.register(CSV)
@@ -368,10 +366,8 @@ def resource_glob(uri, **kwargs):
368366
@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))),
369367
cost=10.0)
370368
def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs):
371-
def _():
372-
return concat(convert(chunks(pd.DataFrame), csv, **kwargs)
373-
for csv in csvs)
374-
return chunks(pd.DataFrame)(_)
369+
data = [partial(convert, chunks(pd.DataFrame), csv, **kwargs) for csv in csvs]
370+
return chunks(pd.DataFrame)(data)
375371

376372

377373
@convert.register(Temp(CSV), (pd.DataFrame, chunks(pd.DataFrame)))

odo/chunks.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from toolz import memoize, first, peek
66
from datashape import discover, var
77
from .utils import cls_name, copydoc
8+
from dask.threaded import get as dsk_get
89

910

1011
class Chunks(object):
@@ -34,8 +35,18 @@ def __init__(self, data):
3435
def __iter__(self):
3536
if callable(self.data):
3637
return self.data()
37-
else:
38-
return iter(self.data)
38+
elif (isinstance(self.data, list) and
39+
len(self.data) and
40+
callable(self.data[0])):
41+
# If this is a set of callables, evaluate
42+
# them using dask before returning an iterator for them
43+
p = []
44+
dsk = {}
45+
for i, f in enumerate(self.data):
46+
dsk['p%d'%i] = (f,)
47+
p.append('p%d'%i)
48+
self.data = dsk_get(dsk, p)
49+
return iter(self.data)
3950

4051

4152
@memoize

odo/convert.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .chunks import chunks, Chunks
1212
from .numpy_dtype import dshape_to_numpy
1313
from .utils import records_to_tuples
14+
from functools import partial
1415

1516

1617
convert = NetworkDispatcher('convert')
@@ -207,34 +208,37 @@ def _():
207208
def iterator_to_DataFrame_chunks(seq, chunksize=1024, **kwargs):
208209
seq2 = partition_all(chunksize, seq)
209210

210-
if kwargs.get('add_index'):
211-
mkindex = _add_index
212-
else:
213-
mkindex = _ignore_index
214-
211+
add_index = kwargs.get('add_index', False)
212+
if not add_index:
213+
# Simple, we can dispatch to dask...
214+
f = lambda d: convert(pd.DataFrame, d, **kwargs)
215+
data = [partial(f, d) for d in seq2]
216+
if not data:
217+
data = [convert(pd.DataFrame, [], **kwargs)]
218+
return chunks(pd.DataFrame)(data)
219+
220+
# TODO: Decide whether we should support the `add_index` flag at all.
221+
# If so, we need to post-process the converted DataFrame objects sequencially,
222+
# so we can't parallelize the process.
215223
try:
216224
first, rest = next(seq2), seq2
217225
except StopIteration:
218226
def _():
219227
yield convert(pd.DataFrame, [], **kwargs)
220228
else:
221229
df = convert(pd.DataFrame, first, **kwargs)
222-
df1, n1 = mkindex(df, 0)
230+
df1, n1 = _add_index(df, 0)
223231

224232
def _():
225233
n = n1
226234
yield df1
227235
for i in rest:
228236
df = convert(pd.DataFrame, i, **kwargs)
229-
df, n = mkindex(df, n)
237+
df, n = _add_index(df, n)
230238
yield df
231239
return chunks(pd.DataFrame)(_)
232240

233241

234-
def _ignore_index(df, start):
235-
return df, start
236-
237-
238242
def _add_index(df, start, _idx_type=getattr(pd, 'RangeIndex',
239243
compose(pd.Index, np.arange))):
240244
stop = start + len(df)

0 commit comments

Comments
 (0)