Skip to content

Dask Working Notes

Rotate Long Column Wide - Multiple

import pandas as pd
import dask.dataframe as dd
import numpy as np


def convert_dates_to_strings(df, date_columns: list, date_format) -> dd.DataFrame:
    for col in date_columns:
        df[col] = df[col].dt.strftime(date_format)
        df[col] = df[col].replace("01011800", "")
    return df


df = pd.DataFrame(
    {
        "bene_id": ["1", "1", "1", "1", "1"],
        "hdr_icn": ["1", "1", "2", "2", "2"],
        "rot1": ["A", "B", "C", " ", "E"],
        "rot2": ["X", "Y", "Z", " ", " "],
        "rot3": [
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
        ],
    }
)
ddf = dd.from_pandas(df, 4)
ddf = convert_dates_to_strings(ddf, ["rot3"], "%m%d%Y")
ddf = ddf.set_index("bene_id").persist()


def agg_func(x):
    return pd.Series(
        dict(
            # 14.7 ms ± 287 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
            # rot1="%s" % ";".join(x["rot1"]),
            # rot2="%s" % ";".join(x["rot2"]),
            # rot3="%s" % ";".join(x["rot3"]),
            # 14.7 ms ± 493 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
            # rot1=f'{";".join(x["rot1"])}',
            # rot2=f'{";".join(x["rot2"])}',
            # rot3=f'{";".join(x["rot3"])}',
            # 14.8 ms ± 314 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
            rot1=";".join(x["rot1"]),
            rot2=";".join(x["rot2"]),
            rot3=";".join(x["rot3"]),
        )
    )


ddf.groupby(["bene_id", "hdr_icn"]).apply(
    agg_func, meta={"rot1": "str", "rot2": "str", "rot3": "str"}
).compute()

We can condense some of this code down by passing a list of columns to agg_func.

import pandas as pd
import dask.dataframe as dd
import numpy as np


def convert_dates_to_strings(df, date_columns: list, date_format) -> dd.DataFrame:
    for col in date_columns:
        df[col] = df[col].dt.strftime(date_format)
        df[col] = df[col].replace("01011800", "")
    return df


df = pd.DataFrame(
    {
        "bene_id": ["1", "1", "1", "1", "1"],
        "hdr_icn": ["1", "1", "2", "2", "2"],
        "rot1": ["A", "B", "C", " ", "E"],
        "rot2": ["X", "Y", "Z", " ", " "],
        "rot3": [
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
        ],
    }
)
ddf = dd.from_pandas(df, 4)
ddf = convert_dates_to_strings(ddf, ["rot3"], "%m%d%Y")
ddf = ddf.set_index("bene_id").persist()


def agg_func(x, cols):
    return pd.Series({col: ";".join(x[col]) for col in cols})


ddf.groupby(["bene_id", "hdr_icn"]).apply(agg_func, ["rot1", "rot2", "rot3"]).compute()

We can further reduce runtime by providing agg_func with the expected "meta" output from .apply(...)

import pandas as pd
import dask.dataframe as dd
import numpy as np


def convert_dates_to_strings(df, date_columns: list, date_format) -> dd.DataFrame:
    for col in date_columns:
        df[col] = df[col].dt.strftime(date_format)
        df[col] = df[col].replace("01011800", "")
    return df


df = pd.DataFrame(
    {
        "bene_id": ["1", "1", "1", "1", "1"],
        "hdr_icn": ["1", "1", "2", "2", "2"],
        "rot1": ["A", "B", "C", " ", "E"],
        "rot2": ["X", "Y", "Z", " ", " "],
        "rot3": [
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
        ],
    }
)
ddf = dd.from_pandas(df, 4)
ddf = convert_dates_to_strings(ddf, ["rot3"], "%m%d%Y")
ddf = ddf.set_index("bene_id").persist()


def agg_func(x, cols):
    return pd.Series({col: ";".join(x[col]) for col in cols})


# On a subset of data, pass the groupby object to dask.dataframe.utils.make_meta()
ddf_meta = dd.utils.make_meta(ddf.groupby(["bene_id", "hdr_icn"]).apply(agg_func, ["rot1", "rot2", "rot3"]))
# You can view what you'll need to use as the meta object by accessing ddf_meta & ddf_meta.index
>> ddf_meta
Empty DataFrame
Columns: [rot1, rot2, rot3]
Index: []

>> ddf_meta.index
MultiIndex(levels=[['a', 'b'], ['foo']],
           codes=[[], []],
           names=['bene_id', 'hdr_icn'])

# The ddf_meta object obviously wont be available @ runtime - we use the output to manually define
# the object's properties

typed_ddf_meta = pd.DataFrame(
    columns=["rot1", "rot2", "rot3"],
    index=pd.MultiIndex(
        levels=[["a", "b"], ["foo"]], codes=[[], []], names=["bene_id", "hdr_icn"]
    ),
)
typed_ddf_meta = ddf_meta.astype(dtype={"rot1": "str", "rot2": "str", "rot3": "str"})

# note that the output of typed_ddf_meta and ddf_meta are equivalent

# you can now pass typed_ddf_meta as the meta object to .apply() - you should notice a marginal speedup
ddf.groupby(["bene_id", "hdr_icn"]).apply(agg_func, ["rot1", "rot2", "rot3"], meta=typed_ddf_meta).compute()

It wouldn't make sense to run the .groupby() @ runtime, every time, to generate the meta_ddf information, so we'll define it by hand below:

import pandas as pd
import dask.dataframe as dd
import numpy as np


def convert_dates_to_strings(df, date_columns: list, date_format) -> dd.DataFrame:
    for col in date_columns:
        df[col] = df[col].dt.strftime(date_format)
        df[col] = df[col].replace("01011800", "")
    return df


df = pd.DataFrame(
    {
        "bene_id": ["1", "1", "1", "1", "1"],
        "hdr_icn": ["1", "1", "2", "2", "2"],
        "rot1": ["A", "B", "C", " ", "E"],
        "rot2": ["X", "Y", "Z", " ", " "],
        "rot3": [
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
            pd.to_datetime("2018-01-01"),
        ],
    }
)
ddf = dd.from_pandas(df, 4)
ddf = convert_dates_to_strings(ddf, ["rot3"], "%m%d%Y")
ddf = ddf.set_index("bene_id").persist()


def agg_func(x, cols):
    return pd.Series({col: ";".join(x[col]) for col in cols})


# Create ddf_meta object representing expected output from .apply()
ddf_meta = pd.DataFrame(
    columns=["rot1", "rot2", "rot3"],
    index=pd.MultiIndex(
        levels=[["a", "b"], ["foo"]], codes=[[], []], names=["bene_id", "hdr_icn"]
    ),
)
ddf_meta = ddf_meta.astype(dtype={"rot1": "str", "rot2": "str", "rot3": "str"})

# Pass ddf_meta to .apply(..., meta=ddf_meta)
ddf.groupby(["bene_id", "hdr_icn"]).apply(agg_func, ["rot1", "rot2", "rot3"], meta=ddf_meta).compute()

Joining on Index + Separate Column

Setup Data

import pandas as pd
import dask.dataframe as dd

from io import StringIO

ddf1_string = StringIO(
    """hdr_icn_num,ver,col1
1,1,foo
2,1,foo
3,1,foo
4,1,foo
5,1,foo
    """
)
ddf1 = dd.from_pandas(pd.read_csv(ddf1_string, sep=","), 5)
ddf1 = ddf1.set_index("hdr_icn_num")

ddf2_string = StringIO(
    """hdr_icn_num,ver,col2,col3
1,1,bar!,baz
1,2,X,X
2,1,bar!,
2,2,X,X
3,1,bar!,baz
3,2,X,X
4,1,bar!,
4,2,X,X
5,1,bar!,baz
5,2,X,X
    """
)

ddf2 = dd.from_pandas(pd.read_csv(ddf2_string, sep=",", keep_default_na=False), 5)
ddf2 = ddf2.set_index("hdr_icn_num")

ddf1.head(10, ddf1.npartitions)

ddf2.head(10, ddf2.npartitions)

Works

join_df = dd.merge(
    left=ddf1,
    right=ddf2,
    on=["hdr_icn_num"],
    left_on=["hdr_icn_num", "ver"],
    right_on=["hdr_icn_num", "ver"],
    how="left",
)
join_df.head(10, join_df.npartitions)

Works

join_df = dd.merge(
    left=ddf1,
    right=ddf2,
    left_on=["hdr_icn_num", "ver"],
    right_on=["hdr_icn_num", "ver"],
    how="left",
)
join_df.head(10, join_df.npartitions)

Does Not Work

join_df = dd.merge(
    left=ddf1,
    right=ddf2,
    left_index=True,
    right_index=True,
    left_on=["ver"],
    right_on=["ver"],
    how="left",
)
join_df.head(10, join_df.npartitions)

Task / Scheduler Notes

Run on Scheduler

client.run_on_scheduler takes a custom function. The custom function should include an argument dask_scheduler if the custom function requires access to the Scheduler object and its API.

client = Client("etcetc")

# shows tasks on workers
client.has_what()

def get_task_stream(dask_scheduler):
    # Shows tasks that have been run/submitted in the past?
    return dask_scheduler.get_task_stream() 

def kill_task_across_clients(dask_sheduler, task_key):
    # Kill some task that exist w/some client
    # task_key == 'etc_etc-c07efb0ea6dd8c15df9a948ccd19e28c'
    for client in dask_scheduler.clients:
        dask_scheduler.cancel_key(key=task_key, client=client)

task_stream = client.run_on_scheduler(get_task_stream)

def user_info() -> dict:
    """
    Returns dict of user information for the current process.

    Returns: Dict
    """
    import os
    import getpass

    return dict(
        uid=os.geteuid(),
        gid=os.getgid(),
        egid=os.getegid(),
        groups=os.getgroups(),
        user=getpass.getuser(),
    )

# Retrieve user info for the scheduler owner
client.run_on_scheduler(user_info)
user_info()