simple dask map_partitions example
Solution 1
There is an example in map_partitions
docs to achieve exactly what are trying to do:
ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
When you call map_partitions
(just like when you call .apply()
on pandas.DataFrame
), the function that you try to map
(or apply
) will be given dataframe as a first argument.
In case of dask.dataframe.map_partitions
this first argument will be a partition and in case of pandas.DataFrame.apply
- a whole dataframe.
Which means that your function has to accept dataframe(partition) as a first argument and and in your case could look like this:
def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])
Note that assignment of a new column in this case happens (i.e. gets scheduled to happen) BEFORE you call .compute()
.
In your example you assign column AFTER you call .compute()
, which kind of defeats the purpose of using dask. I.e. after you call .compute()
the results of that operation are loaded into memory if there is enough space for those results (if not you just get MemoryError
).
So for you example to work you could:
1) Use function (with column names as arguments):
def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])
ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')
# Here is good place to do something with BIG ddf_out dataframe before calling .compute()
result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
2) Use lambda
(with column names hardcoded in the function):
ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))
# Here is good place to do something with BIG ddf_out dataframe before calling .compute()
result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
Update:
To apply function on a row-by-row basis, here is a quote from the post you linked:
map
/apply
You can map a function row-wise across a series with
map
df.mycolumn.map(func)
You can map a function row-wise across a dataframe with
apply
df.apply(func, axis=1)
I.e. for the example function in your question, it might look like this:
def test_f(dds, col_1, col_2):
return dds[col_1] * dds[col_2]
Since you will be applying it on a row-by-row basis the function's first argument will be a series (i.e. each row of a dataframe is a series).
To apply this function then you might call it like this:
dds_out = ddf.apply(
test_f,
args=('col_1', 'col_2'),
axis=1,
meta=('result', int)
).compute(get=get)
This will return a series named 'result'
.
I guess you could also call .apply
on each partition with a function but it does not look to be any more efficient then calling .apply
on dataframe directly. But may be your tests will prove otherwise.
Solution 2
Your test_f
takes two arguments: col_1
and col_2
. You pass a single argument, ddf
.
Try something like
In [5]: dd.map_partitions(test_f, ddf['col_1'], ddf['col_2'])
Out[5]:
Dask Series Structure:
npartitions=8
0 int64
1250 ...
...
8750 ...
9999 ...
dtype: int64
Dask Name: test_f, 32 tasks
Comments
-
user1700890 almost 2 years
I read the following SO thead and now am trying to understand it. Here is my example:
import dask.dataframe as dd import pandas as pd from dask.multiprocessing import get import random df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) }) def test_f(col_1, col_2): return col_1*col_2 ddf = dd.from_pandas(df, npartitions=8) ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
It generates the following error below. What am I doing wrong? Also I am not clear how to pass additional parameters to function in
map_partitions
?--------------------------------------------------------------------------- TypeError Traceback (most recent call last) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname) 136 try: --> 137 yield 138 except Exception as e: ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs) 3130 with raise_on_meta_error(funcname(func)): -> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) 3132 TypeError: test_f() got an unexpected keyword argument 'columns' During handling of the above exception, another exception occurred: ValueError Traceback (most recent call last) <ipython-input-9-913789c7326c> in <module>() ----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs) 469 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP 470 """ --> 471 return map_partitions(func, self, *args, **kwargs) 472 473 @insert_meta_param_description(pad=12) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs) 3163 3164 if meta is no_default: -> 3165 meta = _emulate(func, *args, **kwargs) 3166 3167 if all(isinstance(arg, Scalar) for arg in args): ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs) 3129 """ 3130 with raise_on_meta_error(funcname(func)): -> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) 3132 3133 ~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback) 75 value = type() 76 try: ---> 77 self.gen.throw(type, value, traceback) 78 except StopIteration as exc: 79 # Suppress StopIteration *unless* it's the same exception that ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname) 148 ).format(" in `{0}`".format(funcname) if funcname else "", 149 repr(e), tb) --> 150 raise ValueError(msg) 151 152 ValueError: Metadata inference failed in `test_f`. Original error is below: ------------------------ TypeError("test_f() got an unexpected keyword argument 'columns'",) Traceback: --------- File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error yield File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
-
user1700890 over 6 yearsI just tried it, it does not work.
ValueError: Metadata inference failed in
test_f.
-
user1700890 over 6 yearsSo if understand correctly, in order to achieve row by row operations with
dask
, I will need to create two functions: one will accept partitions and the second one willapply
my target function to each row (of partition) in regularpandas
sense. -
user1700890 over 6 yearsThank you for update. I am still reading and trying your update. Quick question, does
apply
fromdask
computes in parallel? -
Primer over 6 yearsAs far as I know it does.
-
Matt Camp over 5 yearsI've got my own problems I'm trying to solve using dask and had a quick questions. The Dask documentation said to not mutate the input, but you're mutating df by assigning columns to it. Is this just a misunderstanding on my part? dask.pydata.org/en/latest/…
-
Primer over 5 yearsYour understanding is correct - the example shown above does mutate the input. This should be avoided where possible just as the docs recommend.
-
SummerEla over 5 yearsYou can resolve
ValueError: Metadata inference failed in..
by adding ameta='dtype'
argument to the above statement. Where dtype is your expected data type. -
denson about 5 yearsSummerEla Could you please add an answer that includes your suggestion?
-
Yash M over 2 yearsI am getting this error when trying to run the same using meta
can't pickle _thread.RLock objects
. Any suggestions?