Source code for tubular.aggregations

"""Contains transformers for performing data aggregations."""

from typing import Any

import narwhals as nw
from beartype import beartype
from beartype.typing import Optional

from tubular._utils import (
    _convert_dataframe_to_narwhals,
    _return_narwhals_or_native_dataframe,
    block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.functions.aggregations import (
    ListOfColumnsOverRowAggregations,
    ListOfRowsOverColumnsAggregations,
    aggregate_over_columns,
    aggregate_over_rows,
)
from tubular.mixins import DropOriginalMixin
from tubular.types import DataFrame, ListOfStrs, NumericTypes


@register
class BaseAggregationTransformer(BaseTransformer, DropOriginalMixin):
    """Base class for aggregation transformers.

    This class provides the foundation for aggregation-based transformations,
    handling common setup tasks such as validating aggregation methods and
    managing column specifications.

    Attributes:
    ----------
    columns : Union[str, list[str]]
        Columns to apply the transformations to.

    aggregations : list[str]
        Aggregation methods to apply.

    drop_original : bool
        Indicator for dropping original columns.

    verbose : bool
        Indicator for verbose output.

    built_from_json: bool
        indicates if transformer was reconstructed from json,
        which limits it's supported functionality to .transform

    polars_compatible: bool
        Indicates if transformer will work with polars frames

    jsonable: bool
        class attribute, indicates if transformer supports to/from_json methods

    FITS: bool
        class attribute, indicates whether transform requires fit to be run first

    lazyframe_compatible: bool
        class attribute, indicates whether transformer works with lazyframes

    Example:
    -------
    ```pycon
    >>> BaseAggregationTransformer(
    ...     columns="a",
    ...     aggregations=["min", "max"],
    ... )
    BaseAggregationTransformer(aggregations=['min', 'max'], columns=['a'])

    ```

    """

    polars_compatible = True

    lazyframe_compatible = True

    FITS = False

    jsonable = True

    @beartype
    def __init__(
        self,
        columns: str | ListOfStrs,
        aggregations: (
            ListOfColumnsOverRowAggregations | ListOfRowsOverColumnsAggregations
        ),
        drop_original: bool = False,
        **kwargs: bool,
    ) -> None:
        """Initialise class.

        Parameters
        ----------
        columns : list[str]
            List of column names to apply the aggregation transformations to.
        aggregations : list[str]
            List of aggregation methods to apply. Valid methods include 'min', 'max',
            'mean', 'median', and 'count'.
        drop_original : bool, optional
            Whether to drop the original columns after transformation. Default is False.
        kwargs: bool
            parameters for base class, e.g. verbose

        """
        super().__init__(columns=columns, **kwargs)

        self.aggregations = aggregations

        self.drop_original = drop_original
        self.is_fitted_ = True  # Does not fit

    @block_from_json
    def to_json(self) -> dict[str, Any]:
        """Dump transformer to json dict.

        Returns:
        -------
        dict[str, Any]:
            jsonified transformer. Nested dict containing levels for attributes
            set at init and fit.


        Example:
        -------
        ```pycon
        >>> baseAggregationTransformer = BaseAggregationTransformer(
        ...     columns="a",
        ...     aggregations=["min", "max"],
        ... )
        >>> baseAggregationTransformer.to_json()  # doctest: +NORMALIZE_WHITESPACE
        {'tubular_version': ...,
        'classname': 'BaseAggregationTransformer',
        'init': {'columns': ['a'],
        'copy': False,
        'verbose': False,
        'return_native': True,
        'aggregations': ['min', 'max'],
        'drop_original': False},
        'fit': {'is_fitted_': True}}

        ```

        """
        json_dict = super().to_json()
        json_dict["init"].update(
            {"aggregations": self.aggregations, "drop_original": self.drop_original}
        )
        return json_dict

    @beartype
    def transform(
        self,
        X: DataFrame,
        return_native_override: Optional[bool] = None,
    ) -> DataFrame:
        """Perform pre-transform safety checks.

        Parameters
        ----------
        X : DataFrame
            DataFrame to transform by aggregating specified columns.

        return_native_override: Optional[bool]
            option to override return_native attr in transformer,
            useful when calling parent methods

        Returns
        -------
        DataFrame
            checked dataframe to transform.

        Raises
        ------
        TypeError: If columns are non-numeric.

        Examples
        --------
        ```pycon
        >>> import polars as pl

        >>> transformer = BaseAggregationTransformer(
        ...     columns="a",
        ...     aggregations=["min", "max"],
        ... )

        >>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})

        >>> # base transformers have no effect on data
        >>> transformer.transform(test_df)
        shape: (2, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 3   │
        │ 2   ┆ 4   │
        └─────┴─────┘

        ```

        """
        return_native = self._process_return_native(return_native_override)

        X = _convert_dataframe_to_narwhals(X)

        X = super().transform(X, return_native_override=False)

        schema = X.collect_schema()

        non_numerical_columns = [
            col for col in self.columns if schema[col] not in NumericTypes
        ]

        # convert to list and sort for consistency in return
        non_numerical_columns = list(non_numerical_columns)
        non_numerical_columns.sort()
        if len(non_numerical_columns) != 0:
            msg = f"{self.classname}: attempting to call transformer on non-numeric columns {non_numerical_columns}, which is not supported"  # noqa:E501
            raise TypeError(msg)

        return _return_narwhals_or_native_dataframe(X, return_native=return_native)


[docs] @register class AggregateRowsOverColumnTransformer(BaseAggregationTransformer): """Aggregation transformer. Aggregate rows over specified columns, where rows are grouped by provided key column. Attributes: ---------- columns : Union[str, list[str]] List of column names to apply the aggregation transformations to. aggregations : list[str] List of aggregation methods to apply. key : str Column name to group by for aggregation. drop_original : bool, optional Whether to drop the original columns after transformation. Default is False. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible: bool Indicates if transformer will work with polars frames jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> AggregateRowsOverColumnTransformer( ... columns="a", ... aggregations=["min", "max"], ... key="b", ... ) AggregateRowsOverColumnTransformer(aggregations=['min', 'max'], columns=['a'], key='b') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: str | ListOfStrs, aggregations: ListOfRowsOverColumnsAggregations, key: str, drop_original: bool = False, **kwargs: bool, ) -> None: """Initialise class. Parameters ---------- columns : Union[str, list[str]] List of column names to apply the aggregation transformations to. aggregations : list[str] List of aggregation methods to apply. key : str Column name to group by for aggregation. drop_original : bool, optional Whether to drop the original columns after transformation. Default is False. kwargs: bool parameters for base class, e.g. verbose """ super().__init__( columns=columns, aggregations=aggregations, drop_original=drop_original, **kwargs, ) self.key = key self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, Any]: """Dump transformer to json dict. Returns: ------- dict[str, Any]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Example: ------- ```pycon >>> transformer = AggregateRowsOverColumnTransformer( ... columns="a", ... key="c", ... aggregations=["min", "max"], ... ) >>> transformer.to_json() # doctest: +NORMALIZE_WHITESPACE {'tubular_version': ..., 'classname': 'AggregateRowsOverColumnTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'aggregations': ['min', 'max'], 'drop_original': False, 'key': 'c'}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"].update({"key": self.key}) return json_dict
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> transformer = AggregateRowsOverColumnTransformer( ... columns="a", ... aggregations=["min", "max"], ... key="b", ... ) >>> transformer.get_feature_names_out() ['a_min', 'a_max'] ``` """ return [f"{col}_{agg}" for col in self.columns for agg in self.aggregations]
[docs] def get_transform_exprs(self) -> list[nw.Expr]: """Get transform expressions. Returns ------- list[nw.Expr]: transform expressions for class """ return aggregate_over_rows( columns=self.columns, key=self.key, aggregations=self.aggregations )
[docs] @beartype def transform( self, X: DataFrame, ) -> DataFrame: """Transform the dataframe by aggregating rows over specified columns. Parameters ---------- X : DataFrame DataFrame to transform by aggregating specified columns. Returns ------- DataFrame Transformed DataFrame with aggregated columns. Raises ------ ValueError If the key column is not found in the DataFrame. Examples -------- ```pycon >>> import polars as pl >>> transformer = AggregateRowsOverColumnTransformer( ... columns="a", ... aggregations=["min", "max"], ... key="b", ... ) >>> test_df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 1, 2], "c": [1, 2, 3]}) >>> transformer.transform(test_df) shape: (3, 5) ┌─────┬─────┬─────┬───────┬───────┐ │ a ┆ b ┆ c ┆ a_min ┆ a_max │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │ ╞═════╪═════╪═════╪═══════╪═══════╡ │ 1 ┆ 1 ┆ 1 ┆ 1 ┆ 2 │ │ 2 ┆ 1 ┆ 2 ┆ 1 ┆ 2 │ │ 3 ┆ 2 ┆ 3 ┆ 3 ┆ 3 │ └─────┴─────┴─────┴───────┴───────┘ ``` """ X = _convert_dataframe_to_narwhals(X) X = super().transform(X, return_native_override=False) if self.key not in X.collect_schema().names(): msg = f"{self.classname()}: key '{self.key}' not found in dataframe columns" raise ValueError(msg) self.transform_exprs = self.get_transform_exprs() X = X.with_columns(*self.transform_exprs) if self.transform_exprs else X X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) # Use mixin method to drop original columns return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] @register class AggregateColumnsOverRowTransformer(BaseAggregationTransformer): """Aggregate provided columns over each row. This transformer aggregates data within specified columns and can optionally drop the original columns post-transformation. Attributes: ---------- columns : Union[str,list[str]] List of column names to apply the aggregation transformations to. aggregations : list[str] List of aggregation methods to apply. drop_original : bool, optional Whether to drop the original columns after transformation. Default is False. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible: bool Indicates if transformer will work with polars frames jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> AggregateColumnsOverRowTransformer( ... columns=["a", "b"], ... aggregations=["min", "max"], ... ) AggregateColumnsOverRowTransformer(aggregations=['min', 'max'], columns=['a', 'b']) ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: str | ListOfStrs, aggregations: ListOfColumnsOverRowAggregations, drop_original: bool = False, **kwargs: bool, ) -> None: """Initialise class. Parameters ---------- columns : Union[str,list[str]] List of column names to apply the aggregation transformations to. aggregations : list[str] List of aggregation methods to apply. drop_original : bool, optional Whether to drop the original columns after transformation. Default is False. kwargs: bool parameters for base class, e.g. verbose """ super().__init__( columns=columns, aggregations=aggregations, drop_original=drop_original, **kwargs, ) self.is_fitted_ = True # Does not fit
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> transformer = AggregateColumnsOverRowTransformer( ... columns=["a", "b"], ... aggregations=["min", "max"], ... ) >>> transformer.get_feature_names_out() ['a_b_min', 'a_b_max'] ``` """ return ["_".join(self.columns) + "_" + agg for agg in self.aggregations]
[docs] def get_transform_exprs(self) -> list[nw.Expr]: """Get transform expressions. Returns ------- list[nw.Expr]: transform expressions for class """ return aggregate_over_columns( columns=self.columns, aggregations=self.aggregations )
[docs] @beartype def transform( self, X: DataFrame, ) -> DataFrame: """Transform the dataframe by aggregating provided columns over each row. Parameters ---------- X : DataFrame DataFrame to transform by aggregating provided columns over each row Returns ------- DataFrame Transformed DataFrame with aggregated columns. Example: -------- ```pycon >>> import polars as pl >>> transformer = AggregateColumnsOverRowTransformer( ... columns=["a", "b"], ... aggregations=["min", "max"], ... ) >>> test_df = pl.DataFrame({"a": [1, 2], "b": [3, 4], "c": [5, 6]}) >>> transformer.transform(test_df) shape: (2, 5) ┌─────┬─────┬─────┬─────────┬─────────┐ │ a ┆ b ┆ c ┆ a_b_min ┆ a_b_max │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │ ╞═════╪═════╪═════╪═════════╪═════════╡ │ 1 ┆ 3 ┆ 5 ┆ 1 ┆ 3 │ │ 2 ┆ 4 ┆ 6 ┆ 2 ┆ 4 │ └─────┴─────┴─────┴─────────┴─────────┘ ``` """ X = _convert_dataframe_to_narwhals(X) X = super().transform(X, return_native_override=False) transform_exprs = self.get_transform_exprs() X = X.with_columns(*transform_exprs) if transform_exprs else X X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) # Use mixin method to drop original columns return _return_narwhals_or_native_dataframe(X, self.return_native)