Source code for tubular.aggregations
"""Contains transformers for performing data aggregations."""
from enum import Enum
from typing import Any
import narwhals as nw
from beartype import beartype
from beartype.typing import Annotated, List, Optional
from beartype.vale import Is
from tubular._utils import (
_convert_dataframe_to_narwhals,
_return_narwhals_or_native_dataframe,
block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.mixins import DropOriginalMixin
from tubular.types import DataFrame, ListOfStrs, NumericTypes
class ColumnsOverRowAggregationOptions(str, Enum):
"""Aggregation options for ColumnsOverRowAggregationTransformer."""
MIN = "min"
MAX = "max"
MEAN = "mean"
SUM = "sum"
# not currently easy to implement row-wise
# median or count, so leaving out for now
class RowsOverColumnsAggregationOptions(str, Enum):
"""Aggregation options for RowsOverColumnAggregationTransformer."""
MIN = "min"
MAX = "max"
MEAN = "mean"
SUM = "sum"
MEDIAN = "median"
COUNT = "count"
ListOfColumnsOverRowAggregations = Annotated[
List,
Is[
lambda list_value: all(
entry in ColumnsOverRowAggregationOptions._value2member_map_
for entry in list_value
)
],
]
ListOfRowsOverColumnsAggregations = Annotated[
List,
Is[
lambda list_value: all(
entry in RowsOverColumnsAggregationOptions._value2member_map_
for entry in list_value
)
],
]
@register
class BaseAggregationTransformer(BaseTransformer, DropOriginalMixin):
"""Base class for aggregation transformers.
This class provides the foundation for aggregation-based transformations,
handling common setup tasks such as validating aggregation methods and
managing column specifications.
Attributes:
----------
columns : Union[str, list[str]]
Columns to apply the transformations to.
aggregations : list[str]
Aggregation methods to apply.
drop_original : bool
Indicator for dropping original columns.
verbose : bool
Indicator for verbose output.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
BaseAggregationTransformer(aggregations=['min', 'max'], columns=['a'])
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: (
ListOfColumnsOverRowAggregations | ListOfRowsOverColumnsAggregations
),
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : list[str]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply. Valid methods include 'min', 'max',
'mean', 'median', and 'count'.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(columns=columns, **kwargs)
self.aggregations = aggregations
self.drop_original = drop_original
self.is_fitted_ = True # Does not fit
@block_from_json
def to_json(self) -> dict[str, Any]:
"""Dump transformer to json dict.
Returns:
-------
dict[str, Any]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Example:
-------
```pycon
>>> baseAggregationTransformer = BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
>>> baseAggregationTransformer.to_json() # doctest: +NORMALIZE_WHITESPACE
{'tubular_version': ...,
'classname': 'BaseAggregationTransformer',
'init': {'columns': ['a'],
'copy': False,
'verbose': False,
'return_native': True,
'aggregations': ['min', 'max'],
'drop_original': False},
'fit': {'is_fitted_': True}}
```
"""
json_dict = super().to_json()
json_dict["init"].update(
{"aggregations": self.aggregations, "drop_original": self.drop_original}
)
return json_dict
@beartype
def transform(
self,
X: DataFrame,
return_native_override: Optional[bool] = None,
) -> DataFrame:
"""Perform pre-transform safety checks.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating specified columns.
return_native_override: Optional[bool]
option to override return_native attr in transformer,
useful when calling parent methods
Returns
-------
DataFrame
checked dataframe to transform.
Raises
------
TypeError: If columns are non-numeric.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
>>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> # base transformers have no effect on data
>>> transformer.transform(test_df)
shape: (2, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1 ┆ 3 │
│ 2 ┆ 4 │
└─────┴─────┘
```
"""
return_native = self._process_return_native(return_native_override)
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
schema = X.collect_schema()
non_numerical_columns = [
col for col in self.columns if schema[col] not in NumericTypes
]
# convert to list and sort for consistency in return
non_numerical_columns = list(non_numerical_columns)
non_numerical_columns.sort()
if len(non_numerical_columns) != 0:
msg = f"{self.classname}: attempting to call transformer on non-numeric columns {non_numerical_columns}, which is not supported" # noqa:E501
raise TypeError(msg)
return _return_narwhals_or_native_dataframe(X, return_native=return_native)
[docs]
@register
class AggregateRowsOverColumnTransformer(BaseAggregationTransformer):
"""Aggregation transformer.
Aggregate rows over specified columns,
where rows are grouped by provided key column.
Attributes:
----------
columns : Union[str, list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
key : str
Column name to group by for aggregation.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
AggregateRowsOverColumnTransformer(aggregations=['min', 'max'], columns=['a'],
key='b')
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: ListOfRowsOverColumnsAggregations,
key: str,
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : Union[str, list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
key : str
Column name to group by for aggregation.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(
columns=columns,
aggregations=aggregations,
drop_original=drop_original,
**kwargs,
)
self.key = key
self.is_fitted_ = True # Does not fit
[docs]
@block_from_json
def to_json(self) -> dict[str, Any]:
"""Dump transformer to json dict.
Returns:
-------
dict[str, Any]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Example:
-------
```pycon
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... key="c",
... aggregations=["min", "max"],
... )
>>> transformer.to_json() # doctest: +NORMALIZE_WHITESPACE
{'tubular_version': ...,
'classname': 'AggregateRowsOverColumnTransformer',
'init': {'columns': ['a'],
'copy': False,
'verbose': False,
'return_native': True,
'aggregations': ['min', 'max'],
'drop_original': False,
'key': 'c'},
'fit': {'is_fitted_': True}}
```
"""
json_dict = super().to_json()
json_dict["init"].update({"key": self.key})
return json_dict
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
>>> transformer.get_feature_names_out()
['a_min', 'a_max']
```
"""
return [f"{col}_{agg}" for col in self.columns for agg in self.aggregations]
[docs]
@beartype
def transform(
self,
X: DataFrame,
) -> DataFrame:
"""Transform the dataframe by aggregating rows over specified columns.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating specified columns.
Returns
-------
DataFrame
Transformed DataFrame with aggregated columns.
Raises
------
ValueError
If the key column is not found in the DataFrame.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
>>> test_df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 1, 2], "c": [1, 2, 3]})
>>> transformer.transform(test_df)
shape: (3, 5)
┌─────┬─────┬─────┬───────┬───────┐
│ a ┆ b ┆ c ┆ a_min ┆ a_max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═════╪═════╪═════╪═══════╪═══════╡
│ 1 ┆ 1 ┆ 1 ┆ 1 ┆ 2 │
│ 2 ┆ 1 ┆ 2 ┆ 1 ┆ 2 │
│ 3 ┆ 2 ┆ 3 ┆ 3 ┆ 3 │
└─────┴─────┴─────┴───────┴───────┘
```
"""
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
if self.key not in X.collect_schema().names():
msg = f"{self.classname()}: key '{self.key}' not found in dataframe columns"
raise ValueError(msg)
expr_dict = {
f"{col}_{agg}": getattr(nw.col(col), agg)().over(self.key)
for col in self.columns
for agg in self.aggregations
}
X = X.with_columns(**expr_dict) if expr_dict else X
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
self.columns,
return_native=False,
)
# Use mixin method to drop original columns
return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs]
@register
class AggregateColumnsOverRowTransformer(BaseAggregationTransformer):
"""Aggregate provided columns over each row.
This transformer aggregates data within specified columns
and can optionally drop the original columns post-transformation.
Attributes:
----------
columns : Union[str,list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
AggregateColumnsOverRowTransformer(aggregations=['min', 'max'],
columns=['a', 'b'])
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: ListOfColumnsOverRowAggregations,
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : Union[str,list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(
columns=columns,
aggregations=aggregations,
drop_original=drop_original,
**kwargs,
)
self.is_fitted_ = True # Does not fit
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> transformer = AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
>>> transformer.get_feature_names_out()
['a_b_min', 'a_b_max']
```
"""
return ["_".join(self.columns) + "_" + agg for agg in self.aggregations]
[docs]
@beartype
def transform(
self,
X: DataFrame,
) -> DataFrame:
"""Transform the dataframe by aggregating provided columns over each row.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating provided columns over each row
Returns
-------
DataFrame
Transformed DataFrame with aggregated columns.
Example:
--------
```pycon
>>> import polars as pl
>>> transformer = AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
>>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4], "c": [5, 6]})
>>> transformer.transform(test_df)
shape: (2, 5)
┌─────┬─────┬─────┬─────────┬─────────┐
│ a ┆ b ┆ c ┆ a_b_min ┆ a_b_max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═════╪═════╪═════╪═════════╪═════════╡
│ 1 ┆ 3 ┆ 5 ┆ 1 ┆ 3 │
│ 2 ┆ 4 ┆ 6 ┆ 2 ┆ 4 │
└─────┴─────┴─────┴─────────┴─────────┘
```
"""
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
expr_map = (
{
"min": nw.min_horizontal(*self.columns),
"max": nw.max_horizontal(*self.columns),
"sum": nw.sum_horizontal(*self.columns),
"mean": nw.mean_horizontal(*self.columns),
}
if self.columns
else {}
)
transform_dict = (
{
"_".join(self.columns) + "_" + aggregation: expr_map[aggregation].alias(
"_".join(self.columns) + "_" + aggregation,
)
for aggregation in self.aggregations
}
if expr_map
else {}
)
X = X.with_columns(**transform_dict) if transform_dict else X
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
self.columns,
return_native=False,
)
# Use mixin method to drop original columns
return _return_narwhals_or_native_dataframe(X, self.return_native)