Source code for tubular.aggregations
"""Contains transformers for performing data aggregations."""
from typing import Any
import narwhals as nw
from beartype import beartype
from beartype.typing import Optional
from tubular._utils import (
_convert_dataframe_to_narwhals,
_return_narwhals_or_native_dataframe,
block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.functions.aggregations import (
ListOfColumnsOverRowAggregations,
ListOfRowsOverColumnsAggregations,
aggregate_over_columns,
aggregate_over_rows,
)
from tubular.mixins import DropOriginalMixin
from tubular.types import DataFrame, ListOfStrs, NumericTypes
@register
class BaseAggregationTransformer(BaseTransformer, DropOriginalMixin):
"""Base class for aggregation transformers.
This class provides the foundation for aggregation-based transformations,
handling common setup tasks such as validating aggregation methods and
managing column specifications.
Attributes:
----------
columns : Union[str, list[str]]
Columns to apply the transformations to.
aggregations : list[str]
Aggregation methods to apply.
drop_original : bool
Indicator for dropping original columns.
verbose : bool
Indicator for verbose output.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
BaseAggregationTransformer(aggregations=['min', 'max'], columns=['a'])
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: (
ListOfColumnsOverRowAggregations | ListOfRowsOverColumnsAggregations
),
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : list[str]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply. Valid methods include 'min', 'max',
'mean', 'median', and 'count'.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(columns=columns, **kwargs)
self.aggregations = aggregations
self.drop_original = drop_original
self.is_fitted_ = True # Does not fit
@block_from_json
def to_json(self) -> dict[str, Any]:
"""Dump transformer to json dict.
Returns:
-------
dict[str, Any]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Example:
-------
```pycon
>>> baseAggregationTransformer = BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
>>> baseAggregationTransformer.to_json() # doctest: +NORMALIZE_WHITESPACE
{'tubular_version': ...,
'classname': 'BaseAggregationTransformer',
'init': {'columns': ['a'],
'copy': False,
'verbose': False,
'return_native': True,
'aggregations': ['min', 'max'],
'drop_original': False},
'fit': {'is_fitted_': True}}
```
"""
json_dict = super().to_json()
json_dict["init"].update(
{"aggregations": self.aggregations, "drop_original": self.drop_original}
)
return json_dict
@beartype
def transform(
self,
X: DataFrame,
return_native_override: Optional[bool] = None,
) -> DataFrame:
"""Perform pre-transform safety checks.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating specified columns.
return_native_override: Optional[bool]
option to override return_native attr in transformer,
useful when calling parent methods
Returns
-------
DataFrame
checked dataframe to transform.
Raises
------
TypeError: If columns are non-numeric.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = BaseAggregationTransformer(
... columns="a",
... aggregations=["min", "max"],
... )
>>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> # base transformers have no effect on data
>>> transformer.transform(test_df)
shape: (2, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1 ┆ 3 │
│ 2 ┆ 4 │
└─────┴─────┘
```
"""
return_native = self._process_return_native(return_native_override)
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
schema = X.collect_schema()
non_numerical_columns = [
col for col in self.columns if schema[col] not in NumericTypes
]
# convert to list and sort for consistency in return
non_numerical_columns = list(non_numerical_columns)
non_numerical_columns.sort()
if len(non_numerical_columns) != 0:
msg = f"{self.classname}: attempting to call transformer on non-numeric columns {non_numerical_columns}, which is not supported" # noqa:E501
raise TypeError(msg)
return _return_narwhals_or_native_dataframe(X, return_native=return_native)
[docs]
@register
class AggregateRowsOverColumnTransformer(BaseAggregationTransformer):
"""Aggregation transformer.
Aggregate rows over specified columns,
where rows are grouped by provided key column.
Attributes:
----------
columns : Union[str, list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
key : str
Column name to group by for aggregation.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
AggregateRowsOverColumnTransformer(aggregations=['min', 'max'], columns=['a'],
key='b')
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: ListOfRowsOverColumnsAggregations,
key: str,
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : Union[str, list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
key : str
Column name to group by for aggregation.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(
columns=columns,
aggregations=aggregations,
drop_original=drop_original,
**kwargs,
)
self.key = key
self.is_fitted_ = True # Does not fit
[docs]
@block_from_json
def to_json(self) -> dict[str, Any]:
"""Dump transformer to json dict.
Returns:
-------
dict[str, Any]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Example:
-------
```pycon
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... key="c",
... aggregations=["min", "max"],
... )
>>> transformer.to_json() # doctest: +NORMALIZE_WHITESPACE
{'tubular_version': ...,
'classname': 'AggregateRowsOverColumnTransformer',
'init': {'columns': ['a'],
'copy': False,
'verbose': False,
'return_native': True,
'aggregations': ['min', 'max'],
'drop_original': False,
'key': 'c'},
'fit': {'is_fitted_': True}}
```
"""
json_dict = super().to_json()
json_dict["init"].update({"key": self.key})
return json_dict
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
>>> transformer.get_feature_names_out()
['a_min', 'a_max']
```
"""
return [f"{col}_{agg}" for col in self.columns for agg in self.aggregations]
[docs]
def get_transform_exprs(self) -> list[nw.Expr]:
"""Get transform expressions.
Returns
-------
list[nw.Expr]: transform expressions for class
"""
return aggregate_over_rows(
columns=self.columns, key=self.key, aggregations=self.aggregations
)
[docs]
@beartype
def transform(
self,
X: DataFrame,
) -> DataFrame:
"""Transform the dataframe by aggregating rows over specified columns.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating specified columns.
Returns
-------
DataFrame
Transformed DataFrame with aggregated columns.
Raises
------
ValueError
If the key column is not found in the DataFrame.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = AggregateRowsOverColumnTransformer(
... columns="a",
... aggregations=["min", "max"],
... key="b",
... )
>>> test_df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 1, 2], "c": [1, 2, 3]})
>>> transformer.transform(test_df)
shape: (3, 5)
┌─────┬─────┬─────┬───────┬───────┐
│ a ┆ b ┆ c ┆ a_min ┆ a_max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═════╪═════╪═════╪═══════╪═══════╡
│ 1 ┆ 1 ┆ 1 ┆ 1 ┆ 2 │
│ 2 ┆ 1 ┆ 2 ┆ 1 ┆ 2 │
│ 3 ┆ 2 ┆ 3 ┆ 3 ┆ 3 │
└─────┴─────┴─────┴───────┴───────┘
```
"""
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
if self.key not in X.collect_schema().names():
msg = f"{self.classname()}: key '{self.key}' not found in dataframe columns"
raise ValueError(msg)
self.transform_exprs = self.get_transform_exprs()
X = X.with_columns(*self.transform_exprs) if self.transform_exprs else X
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
self.columns,
return_native=False,
)
# Use mixin method to drop original columns
return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs]
@register
class AggregateColumnsOverRowTransformer(BaseAggregationTransformer):
"""Aggregate provided columns over each row.
This transformer aggregates data within specified columns
and can optionally drop the original columns post-transformation.
Attributes:
----------
columns : Union[str,list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
built_from_json: bool
indicates if transformer was reconstructed from json,
which limits it's supported functionality to .transform
polars_compatible: bool
Indicates if transformer will work with polars frames
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Example:
-------
```pycon
>>> AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
AggregateColumnsOverRowTransformer(aggregations=['min', 'max'],
columns=['a', 'b'])
```
"""
polars_compatible = True
lazyframe_compatible = True
FITS = False
jsonable = True
@beartype
def __init__(
self,
columns: str | ListOfStrs,
aggregations: ListOfColumnsOverRowAggregations,
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class.
Parameters
----------
columns : Union[str,list[str]]
List of column names to apply the aggregation transformations to.
aggregations : list[str]
List of aggregation methods to apply.
drop_original : bool, optional
Whether to drop the original columns after transformation. Default is False.
kwargs: bool
parameters for base class, e.g. verbose
"""
super().__init__(
columns=columns,
aggregations=aggregations,
drop_original=drop_original,
**kwargs,
)
self.is_fitted_ = True # Does not fit
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> transformer = AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
>>> transformer.get_feature_names_out()
['a_b_min', 'a_b_max']
```
"""
return ["_".join(self.columns) + "_" + agg for agg in self.aggregations]
[docs]
def get_transform_exprs(self) -> list[nw.Expr]:
"""Get transform expressions.
Returns
-------
list[nw.Expr]: transform expressions for class
"""
return aggregate_over_columns(
columns=self.columns, aggregations=self.aggregations
)
[docs]
@beartype
def transform(
self,
X: DataFrame,
) -> DataFrame:
"""Transform the dataframe by aggregating provided columns over each row.
Parameters
----------
X : DataFrame
DataFrame to transform by aggregating provided columns over each row
Returns
-------
DataFrame
Transformed DataFrame with aggregated columns.
Example:
--------
```pycon
>>> import polars as pl
>>> transformer = AggregateColumnsOverRowTransformer(
... columns=["a", "b"],
... aggregations=["min", "max"],
... )
>>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4], "c": [5, 6]})
>>> transformer.transform(test_df)
shape: (2, 5)
┌─────┬─────┬─────┬─────────┬─────────┐
│ a ┆ b ┆ c ┆ a_b_min ┆ a_b_max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═════╪═════╪═════╪═════════╪═════════╡
│ 1 ┆ 3 ┆ 5 ┆ 1 ┆ 3 │
│ 2 ┆ 4 ┆ 6 ┆ 2 ┆ 4 │
└─────┴─────┴─────┴─────────┴─────────┘
```
"""
X = _convert_dataframe_to_narwhals(X)
X = super().transform(X, return_native_override=False)
transform_exprs = self.get_transform_exprs()
X = X.with_columns(*transform_exprs) if transform_exprs else X
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
self.columns,
return_native=False,
)
# Use mixin method to drop original columns
return _return_narwhals_or_native_dataframe(X, self.return_native)