A few years back, I was working with some very large datasets in a codebase. As time passed and I was getting more and more data for processing, the time taken for analytics as well as memory consumption was blowing up. After some profiling, I found one of the lines in the code would perform an operation like
z = df.groupby("ID").agg({"VAL1": ",".join, "VAL2": ",".join}).reset_index()
And this would cause a ~30 GB spike in memory usage (without this spike, average memory usage was around 20 GB) leading to total memory usage of ~50 GB and ~50% of the processing time was spend here.
Analyzing your data can give you great insights which can help you choose the right path.
In this post, I'll share a custom approach to perform fast grouping and aggregation. This method shines when the number of unique values in the columns to be aggregated is small (e.g., ≤ 10), achieving significant performance improvements over conventional approaches. We’ll also dive into how this technique aligns with concepts like one-hot encoding and cross-tabulation while avoiding the inefficiencies of full matrix operations.
The above df in question had similar properties where unique counts for VAL1 and VAL2 were 7 and 13. So we can use this approach to optimize the process and see how it performs.
The Approach
def better_groupby(df, group_col, agg_cols):
g_labels, g_uniques = pd.factorize(df[group_col])
res = None
for col in agg_cols:
labels, uniques = pd.factorize(df[col].fillna(value=''))
mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
np.add.at(mat, (g_labels, labels), 1)
df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
for c in df_.columns:
df_[c] = np.where(df_[c] >= 1, c, '')
df_[col] = ''
df_[col] = df_.agg(",".join, axis=1).str.strip(",")
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
df_ = df_.rename_axis(group_col).reset_index()
if res is None:
res = df_
else:
res = pd.merge(res, df_, on=group_col)
return res[[group_col] + agg_cols]
Let's see line by line functioning
>>> df tid val1 0 1 a 1 1 b 2 2 c 3 3 d 4 3 e 5 3 f >>> better_groupby(df, "tid", ["val1"]) tid val1 0 1 a,b 1 2 c 2 3 d,e,f
-
g_labels, g_uniques = pd.factorize(df[group_col])- This gives labels for each unique value ofdf[group_col], and unique values ofd[group_col]. In our case,g_labelswill bearray([0, 0, 1, 2, 2, 2])andg_uniqueswill beIndex(['1', '2', '3'], dtype='object') -
labels, uniques = pd.factorize(df[col].fillna(value=""))- This gives lables and uniques for column to be aggregated. In our caselabelswill bearray([0, 1, 2, 3, 4, 5])anduniqueswould beIndex(['a', 'b', 'c', 'd', 'e', 'f'], dtype='object') -
The creation of the matrix with np.zeros and the subsequent assignment of labels (np.add.at(mat, (i, labels), 1)) mimics a form of one-hot encoding, where each unique value of
val1gets assigned an index.
>>> mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64) >>> np.add.at(mat, (g_labels, labels), 1) >>> mat array([ [1, 1, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1] ])
- If you look closely, the above matrix columns represent value of
val1and rows representtid. Let's take a look at next step.
>>> df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
a b c d e f
1 1 1 0 0 0 0
2 0 0 1 0 0 0
3 0 0 0 1 1 1
- Now all we do is replace all
1s with the column name and0s with empty string, and concatenate all columns across each row into a single string value separated by,.
>>> for c in df_.columns:
df_[c] = np.where(df_[c] >= 1, c, "")
>>> df_
a b c d e f
1 a b
2 c
3 d e f
>>> df_[col] = ""
df_[col] = df_.agg(','.join, axis=1).str.strip(',')
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
>>> df_
val1
1 a,b
2 c
3 d,e,f
Great! Now how does it perform in terms of performance?
Performance Comparison
I've written some utility functions. I've also generated a sample dataframe and stored it in a csv, it has 1 million rows.
# utils.py from contextlib import contextmanager import os import psutil import time @contextmanager def memory_usage(): process = psutil.Process(os.getpid()) memory_before = process.memory_info().rss / (1024 ** 2) print(f"Memory usage before: {memory_before:.2f} MB") try: yield finally: memory_after = process.memory_info().rss / (1024 ** 2) print(f"Memory usage after: {memory_after:.2f} MB") print(f"Memory used: {memory_after - memory_before:.2f} MB") @contextmanager def timer(): start = time.perf_counter() try: yield finally: end = time.perf_counter() print(f"Time taken: {end - start} seconds")
# normal.py import pandas as pd from utils import * def group(df, group_col, agg_cols): return df.groupby(group_col).agg({col: ','.join for col in agg_cols}).reset_index() if __name__ == "__main__": df = pd.read_csv("./data.csv") with memory_usage(), timer(): _ = group(df, "tid", ["val1", "val2"])
# optimized.py import numpy as np import pandas as pd from utils import * def better_groupby(df, group_col, agg_cols): g_labels, g_uniques = pd.factorize(df[group_col]) res = None for col in agg_cols: labels, uniques = pd.factorize(df[col].fillna(value="")) mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64) np.add.at(mat, (g_labels, labels), 1) df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques) for c in df_.columns: df_[c] = np.where(df_[c] >= 1, c, "") df_[col] = "" df_[col] = df_.agg(','.join, axis=1).str.strip(',') df_.drop(columns=[c for c in df_.columns if c != col], inplace=True) df_ = df_.rename_axis(group_col).reset_index() if res is None: res = df_ else: res = pd.merge(res, df_, on=group_col) return res[[group_col] + agg_cols] if __name__ == "__main__": df = pd.read_csv("./data.csv") with memory_usage(), timer(): _ = better_groupby(df, "tid", ["val1", "val2"])
$ python normal.py Memory usage before: 620.79 MB Time taken: 4.337185889999091 seconds Memory usage after: 1422.88 MB Memory used: 802.09 MB $ python optimized.py Memory usage before: 620.86 MB Time taken: 3.781618000997696 seconds Memory usage after: 735.93 MB Memory used: 115.07 MB
So, we see an improvement in
- runtime from 4.34 seconds > 3.78 seconds
- memory consumption from 802.09 MB > 115.07 MB