Source code for tubular.nominal
"""Contains transformers that apply encodings to nominal columns."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any, Literal
import narwhals as nw
import numpy as np
from beartype import beartype
from narwhals.dtypes import DType # noqa: F401
from typing_extensions import deprecated
from tubular._stats import (
_get_mean_calculation_expressions,
_get_median_calculation_expression,
)
from tubular._utils import (
_collect_frame,
_collect_series,
_convert_dataframe_to_narwhals,
_convert_series_to_narwhals,
_is_null,
_return_narwhals_or_native_dataframe,
block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.mapping import BaseMappingTransformer, BaseMappingTransformMixin
from tubular.mixins import DropOriginalMixin, WeightColumnMixin
from tubular.types import (
DataFrame,
FloatBetweenZeroOne,
LazyFrame,
ListOfStrs,
PositiveInt,
Series,
)
if TYPE_CHECKING:
import pandas as pd
[docs]
@register
class GroupRareLevelsTransformer(BaseTransformer, WeightColumnMixin):
"""Group together rare levels of nominal variables into a new rare level.
Rare levels are defined by a cut off percentage, which can either be based on the
number of rows or sum of weights. Any levels below this cut off value will be
grouped into the rare level.
Attributes
----------
cut_off_percent : float
Cut off percentage (either in terms of number of rows or sum of weight) for a given
nominal level to be considered rare.
non_rare_levels : dict
Created in fit. A dict of non-rare levels (i.e. levels with more than cut_off_percent weight or rows)
that is used to identify rare levels in transform.
rare_level_name : any
Must be of the same type as columns.
Label for the new nominal level that will be added to group together rare levels (as
defined by cut_off_percent).
record_rare_levels : bool
Should the 'rare' levels that will be grouped together be recorded? If not they will be lost
after the fit and the only information remaining will be the 'non'rare' levels.
rare_levels_record : dict
Only created (in fit) if record_rare_levels is True. This is dict containing a list of
levels that were grouped into 'rare' for each column the transformer was applied to.
weights_column : str
Name of weights columns to use if cut_off_percent should be in terms of sum of weight
not number of rows.
unseen_levels_to_rare : bool
If True, unseen levels in new data will be passed to rare, if set to false they will be left unchanged.
training_data_levels : dict[set]
Dictionary containing the set of values present in the training data for each column in self.columns. It
will only exist in if unseen_levels_to_rare is set to False.
built_from_json: bool
indicates if transformer was reconstructed from json, which limits it's supported
functionality to .transform
polars_compatible : bool
class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Examples
--------
```pycon
>>> GroupRareLevelsTransformer(
... columns="a",
... cut_off_percent=0.02,
... rare_level_name="rare_level",
... )
GroupRareLevelsTransformer(columns=['a'], cut_off_percent=0.02,
rare_level_name='rare_level')
```
"""
polars_compatible = True
lazyframe_compatible = True
jsonable = True
FITS = True
@beartype
def __init__( # noqa: PLR0917, PLR0913
self,
columns: str | ListOfStrs | None = None,
cut_off_percent: FloatBetweenZeroOne = 0.01,
weights_column: str | None = None,
rare_level_name: str | ListOfStrs = "rare",
record_rare_levels: bool = True,
unseen_levels_to_rare: bool = True,
**kwargs: bool,
) -> None:
"""Initialise class instance.
Parameters
----------
columns : None or str or list, default = None
Columns to transform, if the default of None is supplied all object and category
columns in X are used.
cut_off_percent : float, default = 0.01
Cut off for the percent of rows or percent of weight for a level, levels below
this value will be grouped.
weights_column : None or str, default = None
Name of weights column that should be used so cut_off_percent applies to sum of weights
rather than number of rows.
rare_level_name : any,default = 'rare'.
Must be of the same type as columns.
Label for the new 'rare' level.
record_rare_levels : bool, default = False
If True, an attribute called rare_levels_record will be added to the object. This will be a dict
of key (column name) value (level from column considered rare according to cut_off_percent) pairs.
Care should be taken if working with nominal variables with many levels as this could potentially
result in many being stored in this attribute.
unseen_levels_to_rare : bool, default = True
If True, unseen levels in new data will be passed to rare, if set to false they will be left unchanged.
**kwargs
Arbitrary keyword arguments passed onto BaseTransformer.init method.
"""
super().__init__(columns=columns, **kwargs)
self.cut_off_percent = cut_off_percent
self.weights_column = weights_column
self.rare_level_name = rare_level_name
self.record_rare_levels = record_rare_levels
self.unseen_levels_to_rare = unseen_levels_to_rare
[docs]
@block_from_json
def to_json(self) -> dict[str, dict[str, Any]]:
"""Dump transformer to json dict.
Returns
-------
dict[str, dict[str, Any]]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Examples
--------
```pycon
>>> import tests.test_data as d
>>> df = d.create_df_8("pandas")
>>> x = GroupRareLevelsTransformer(
... columns=["b", "c"], cut_off_percent=0.4, unseen_levels_to_rare=False
... )
>>> x.fit(df)
GroupRareLevelsTransformer(columns=['b', 'c'], cut_off_percent=0.4,
unseen_levels_to_rare=False)
>>> x.to_json()
{'tubular_version': ..., 'classname': 'GroupRareLevelsTransformer', 'init': {'columns': ['b', 'c'], 'copy': False, 'verbose': False, 'return_native': True, 'cut_off_percent': 0.4, 'weights_column': None, 'rare_level_name': 'rare', 'record_rare_levels': True, 'unseen_levels_to_rare': False}, 'fit': {'is_fitted_': True, 'non_rare_levels': {'b': ['w'], 'c': ['a']}, 'training_data_levels': {'b': ['w', 'x', 'y', 'z'], 'c': ['a', 'b', 'c']}, 'rare_levels_record': {'b': ['x', 'y', 'z'], 'c': ['b', 'c']}}}
```
"""
self.check_is_fitted(["non_rare_levels"])
json_dict = super().to_json()
json_dict["init"].update(
{
"cut_off_percent": self.cut_off_percent,
"weights_column": self.weights_column,
"rare_level_name": self.rare_level_name,
"record_rare_levels": self.record_rare_levels,
"unseen_levels_to_rare": self.unseen_levels_to_rare,
},
)
json_dict["fit"]["non_rare_levels"] = self.non_rare_levels
if not self.unseen_levels_to_rare:
self.check_is_fitted(["training_data_levels"])
json_dict["fit"]["training_data_levels"] = self.training_data_levels
if self.record_rare_levels:
self.check_is_fitted(["rare_levels_record"])
json_dict["fit"]["rare_levels_record"] = self.rare_levels_record
return json_dict
@beartype
def _check_str_like_columns(self, schema: nw.Schema) -> None:
"""Check that transformer being called on only str-like columns.
Parameters
----------
schema: nw.Schema
schema of input data
Raises
------
TypeError: if columns are not str-like
Examples
--------
```pycon
>>> import polars as pl
>>> import narwhals as nw
>>> transformer = GroupRareLevelsTransformer(
... columns="a",
... cut_off_percent=0.02,
... rare_level_name="rare_level",
... )
>>> # non erroring example
>>> test_df = pl.DataFrame({"a": ["w", "x"], "b": ["y", "z"]})
>>> schema = nw.from_native(test_df).schema
>>> transformer._check_str_like_columns(schema)
>>> # erroring example
>>> test_df = pl.DataFrame({"a": [1, 2], "b": ["y", "z"]})
>>> schema = nw.from_native(test_df).schema
>>> transformer._check_str_like_columns(schema)
Traceback (most recent call last):
...
TypeError: ...
```
"""
str_like_columns = [
col
for col in self.columns
if schema[col] in {nw.String, nw.Categorical, nw.Object}
]
non_str_like_columns = set(self.columns).difference(
set(
str_like_columns,
),
)
if len(non_str_like_columns) != 0:
msg = f"{self.classname()}: transformer must run on str-like columns, but got non str-like {non_str_like_columns}"
raise TypeError(msg)
[docs]
@block_from_json
@beartype
def fit(
self,
X: DataFrame,
y: Series | LazyFrame | None = None,
) -> GroupRareLevelsTransformer:
"""Record non-rare levels for categorical variables.
When transform is called, only levels records in non_rare_levels during fit will remain
unchanged - all other levels will be grouped. If record_rare_levels is True then the
rare levels will also be recorded.
The label for the rare levels must be of the same type as the columns.
Parameters
----------
X : DataFrame
Data to identify non-rare levels from.
y : Series or LazyFrame or None, default = None
Optional argument only required for the transformer to work with sklearn pipelines.
Returns
-------
GroupRareLevelsTransformer: fitted class instance
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = GroupRareLevelsTransformer(
... columns="a",
... cut_off_percent=0.02,
... rare_level_name="rare_level",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]})
>>> transformer.fit(test_df)
GroupRareLevelsTransformer(columns=['a'], cut_off_percent=0.02,
rare_level_name='rare_level')
```
"""
X = _convert_dataframe_to_narwhals(X)
y = _convert_series_to_narwhals(y)
super().fit(X, y)
weights_column = self.weights_column
if self.weights_column is None:
X, weights_column = WeightColumnMixin._create_unit_weights_column(
X,
return_native=False,
verbose=self.verbose,
)
WeightColumnMixin.check_weights_column(self, X, weights_column)
valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr(
weights_column, self.verbose
)
X = X.filter(valid_weights_filter_expr)
schema = X.collect_schema()
self._check_str_like_columns(schema)
self.non_rare_levels = {}
self.rare_levels_record = {}
present_levels = {}
total_weight = _collect_frame(X.select(nw.col(weights_column).sum())).item()
level_weights_expr = nw.col(weights_column) / total_weight
if not self.unseen_levels_to_rare:
self.training_data_levels = {}
for c in self.columns:
group = X.group_by(c).agg(nw.col(weights_column).sum())
non_rare_levels_expr = (
nw.when(level_weights_expr >= self.cut_off_percent)
.then(nw.col(c))
.otherwise(None)
.alias(f"{c}_non_rare_levels")
)
results = group.select(non_rare_levels_expr, nw.col(c))
results = _collect_frame(results).to_dict(as_series=True)
self.non_rare_levels[c] = sorted(
val
for val in results[f"{c}_non_rare_levels"].unique().to_list()
if not _is_null(val)
)
present_levels[c] = sorted(value for value in results[c].unique().to_list())
if self.record_rare_levels:
self.rare_levels_record[c] = sorted(
set(present_levels[c]).difference(self.non_rare_levels[c]),
)
self.rare_levels_record[c] = sorted(
self.rare_levels_record[c],
key=str,
)
if not self.unseen_levels_to_rare:
self.training_data_levels[c] = present_levels[c]
self.is_fitted_ = True
return self
[docs]
@beartype
def transform(self, X: DataFrame) -> DataFrame:
"""Group rare levels together into a new 'rare' level.
Parameters
----------
X : DataFrame
Data to with catgeorical variables to apply rare level grouping to.
Returns
-------
X : DataFrame
Transformed input X with rare levels grouped for into a new rare level.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = GroupRareLevelsTransformer(
... columns="a",
... cut_off_percent=0.5,
... rare_level_name="rare_level",
... )
>>> test_df = pl.DataFrame({"a": ["x", "x", "y"], "b": ["w", "z", "z"]})
>>> _ = transformer.fit(test_df)
>>> transformer.transform(test_df)
shape: (3, 2)
┌────────────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ str ┆ str │
╞════════════╪═════╡
│ x ┆ w │
│ x ┆ z │
│ rare_level ┆ z │
└────────────┴─────┘
```
"""
X = BaseTransformer.transform(self, X, return_native_override=False)
X = _convert_dataframe_to_narwhals(X)
schema = X.collect_schema()
self._check_str_like_columns(schema)
self.check_is_fitted(["non_rare_levels"])
transform_expressions = []
for col in self.columns:
non_rare_condition_expression = (
nw.col(col).is_in(self.non_rare_levels[col])
if self.unseen_levels_to_rare
# if unseen levels are mapped to rare,
# the condition becomes either in
# non rare levels OR not in training data
# levels (unseen)
else (
nw.col(col).is_in(self.non_rare_levels[col])
| ~nw.col(col).is_in(self.training_data_levels[col])
)
)
transform_expression = (
nw.col(col).cast(
nw.String,
)
if schema[col] in {nw.Categorical, nw.Enum}
else nw.col(col)
)
transform_expression = (
nw.when(non_rare_condition_expression | nw.col(col).is_null())
.then(transform_expression)
.otherwise(nw.lit(self.rare_level_name))
)
transform_expression = (
transform_expression.cast(
nw.Enum(self.non_rare_levels[col] + [self.rare_level_name]),
)
if (schema[col] in {nw.Categorical, nw.Enum})
else transform_expression
)
transform_expressions.append(transform_expression)
X = X.with_columns(*transform_expressions) if transform_expressions else X
return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs]
@register
class MeanResponseTransformer(
BaseTransformer,
WeightColumnMixin,
DropOriginalMixin,
):
"""Convert categorical variables to numeric by mapping levels to the mean response for level.
For a continuous or binary response the categorical columns specified will have values
replaced with the mean response for each category.
For an n > 1 level categorical response, up to n binary responses can be created, which in
turn can then be used to encode each categorical column specified. This will generate up
to n * len(columns) new columns, of with names of the form {column}_{response_level}. The
original columns will be removed from the dataframe. This functionality is controlled using
the 'level' parameter. Note that the above only works for a n > 1 level categorical response.
Do not use 'level' parameter for a n = 1 level numerical response. In this case, use the standard
mean response transformer without the 'level' parameter.
If a categorical variable contains null values these will not be transformed.
The same weights and prior are applied to each response level in the multi-level case.
Attributes
----------
columns : str or list
Categorical columns to encode in the input data.
weights_column : str or None
Weights column to use when calculating the mean response.
prior : int, default = 0
Regularisation parameter, can be thought of roughly as the size a category should be in order for
its statistics to be considered reliable (hence default value of 0 means no regularisation).
level : str, int, float, list or None, default = None
Parameter to control encoding against a multi-level categorical response. If None the response will be
treated as binary or continuous, if 'all' all response levels will be encoded against and if it is a list of
levels then only the levels specified will be encoded against.
response_levels : list
Only created in the multi-level case. Generated from level, list of all the response levels to encode against.
mappings : dict
Created in fit. A nested Dict of {column names : column specific mapping dictionary} pairs. Column
specific mapping dictionaries contain {initial value : mapped value} pairs.
mapped_columns : list
Only created in the multi-level case. A list of the new columns produced by encoded the columns in self.columns
against multiple response levels, of the form {column}_{level}.
transformer_dict : dict
Only created in the multi-level case. A dictionary of the form level : transformer containing the mean response
transformers for each level to be encoded against.
unseen_levels_encoding_dict: dict
Dict containing the values (based on chosen unseen_level_handling) derived from the encoded columns to use when handling unseen levels in data passed to transform method.
return_type: Literal['float32', 'float64']
What type to cast return column as. Defaults to float32.
built_from_json: bool
indicates if transformer was reconstructed from json, which limits it's supported
functionality to .transform
polars_compatible : bool
class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... unseen_level_handling="mean",
... )
>>> transformer
MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean')
>>> # once fit, transformer can also be dumped to json and reinitialised
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [0, 1]})
>>> _ = transformer.fit(test_df[["a"]], test_df["b"])
>>> json_dump = transformer.to_json()
>>> json_dump
{'tubular_version': ..., 'classname': 'MeanResponseTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'weights_column': None, 'prior': 1, 'level': None, 'unseen_level_handling': 'mean', 'return_type': 'Float32', 'drop_original': True}, 'fit': {'is_fitted_': True, 'mappings': {'a': {'x': 0.25, 'y': 0.75}}, 'return_dtypes': {'a': 'Float32'}, 'column_to_encoded_columns': {'a': ['a']}, 'encoded_columns': ['a'], 'unseen_levels_encoding_dict': {'a': 0.5}}}
>>> MeanResponseTransformer.from_json(json_dump)
MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean')
```
"""
polars_compatible = True
jsonable = True
lazyframe_compatible = True
FITS = True
@beartype
def __init__( # noqa: PLR0917, PLR0913
self,
columns: str | list[str] | None = None,
weights_column: str | None = None,
prior: PositiveInt = 0,
level: float | int | str | list | None = None,
unseen_level_handling: float
| int
| Literal["mean", "median", "min", "max"]
| None = None,
return_type: Literal["Float32", "Float64"] = "Float32",
drop_original: bool = True,
**kwargs: bool,
) -> None:
"""Initialise class instance.
Parameters
----------
columns : None or str or list, default = None
Columns to transform, if the default of None is supplied all object and category
columns in X are used.
weights_column : str or None
Weights column to use when calculating the mean response.
prior : int, default = 0
Regularisation parameter, can be thought of roughly as the size a category should be in order for
its statistics to be considered reliable (hence default value of 0 means no regularisation).
level : str, list or None, default = None
Parameter to control encoding against a multi-level categorical response. For a continuous or
binary response, leave this as None. In the multi-level case, set to 'all' to encode against every
response level or provide a list of response levels to encode against.
unseen_level_handling : str("mean", "median", "min", "max") or int/float, default = None
Parameter to control the logic for handling unseen levels of the categorical features to encode in
data when using transform method. Default value of None will output error when attempting to use transform
on data with unseen levels in categorical columns to encode. Set this parameter to one of the options above
in order to encode unseen levels in each categorical column with the mean, median etc. of
each column. One can also pass an arbitrary int/float value to use for encoding unseen levels.
return_type: Literal['float32', 'float64']
What type to cast return column as, consider exploring float32 to save memory. Defaults to float32.
drop_original: bool
controls whether original columns are dropped after encoded columns created.
**kwargs
Arbitrary keyword arguments passed onto BaseTransformer.init method.
"""
self.weights_column = weights_column
self.prior = prior
self.unseen_level_handling = unseen_level_handling
self.return_type = return_type
self.drop_original = drop_original
self.MULTI_LEVEL = False
if level == "all" or (isinstance(level, list)):
self.MULTI_LEVEL = True
# if working with single level, put into list for easier handling
elif isinstance(level, (str, int, float)):
level = [level]
self.MULTI_LEVEL = True
self.level = level
BaseTransformer.__init__(self, columns=columns, **kwargs)
[docs]
@block_from_json
def to_json(self) -> dict[str, dict[str, Any]]:
"""Dump transformer to json dict.
Returns
-------
dict[str, dict[str, Any]]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = MeanResponseTransformer(columns=["a"])
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [0, 1]})
>>> _ = transformer.fit(test_df[["a"]], test_df["b"])
>>> transformer.to_json()
{'tubular_version': ..., 'classname': 'MeanResponseTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'weights_column': None, 'prior': 0, 'level': None, 'unseen_level_handling': None, 'return_type': 'Float32', 'drop_original': True}, 'fit': {'is_fitted_': True, 'mappings': {'a': {'x': 0.0, 'y': 1.0}}, 'return_dtypes': {'a': 'Float32'}, 'column_to_encoded_columns': {'a': ['a']}, 'encoded_columns': ['a']}}
```
"""
self.check_is_fitted(
[
"mappings",
"return_dtypes",
"column_to_encoded_columns",
"encoded_columns",
],
)
json_dict = super().to_json()
json_dict["init"].update(
{
"weights_column": self.weights_column,
"prior": self.prior,
"level": self.level,
"unseen_level_handling": self.unseen_level_handling,
"return_type": self.return_type,
"drop_original": self.drop_original,
},
)
# make sure mappings dict is sorted for consistent repr
mappings = {
key: {
value: self.mappings[key][value] for value in sorted(self.mappings[key])
}
for key in sorted(self.mappings)
}
json_dict["fit"].update(
{
"mappings": mappings,
"return_dtypes": self.return_dtypes,
"column_to_encoded_columns": self.column_to_encoded_columns,
"encoded_columns": self.encoded_columns,
},
)
if self.unseen_level_handling:
self.check_is_fitted(["unseen_levels_encoding_dict"])
json_dict["fit"]["unseen_levels_encoding_dict"] = (
self.unseen_levels_encoding_dict
)
return json_dict
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... unseen_level_handling="mean",
... )
>>> transformer.get_feature_names_out()
['a']
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... level=["x", "y"],
... unseen_level_handling="mean",
... )
>>> transformer.get_feature_names_out()
['a_x', 'a_y']
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... level="all",
... unseen_level_handling="mean",
... )
>>> transformer.get_feature_names_out()
Traceback (most recent call last):
...
sklearn.exceptions.NotFittedError: ...
>>> test_df = pl.DataFrame({"a": ["x", "y", "x"], "b": ["cat", "dog", "rat"]})
>>> _ = transformer.fit(test_df, test_df["b"])
>>> transformer.get_feature_names_out()
['a_cat', 'a_dog', 'a_rat']
```
"""
# if level is specified as 'all', this function
# depends on fit having been called
if self.level == "all":
self.check_is_fitted("encoded_columns")
return self.encoded_columns
return (
self.columns
if not self.MULTI_LEVEL
else [
column + "_" + str(level)
for column in self.columns
for level in self.level
]
)
@block_from_json
def _prior_regularisation(
self,
global_means: dict[str, float],
groups: dict[str, nw.DataFrame],
) -> dict[str, nw.Expr]:
"""Regularise encoding values by pushing encodings of infrequent categories towards the global mean. If prior is zero this will return target_means unaltered.
The formula used is:
(weight*value + prior*global_mean)/(weight + prior)
Parameters
----------
global_means: dict[str, float]
dictionary of global means per binary target
groups: dict[str, nw.DataFrame]
dict of grouped dataframes per input column
Returns
-------
prior_exprs: dict[str, nw.Expr]
dictionary of format col:prior expression for col
# this private method is not intended to be used outside
# of the fit process, so not including examples
"""
exprs_dict = {
encoded_column + "_mapped": (
(
nw.col(
f"{self.encoded_columns_to_response_columns[encoded_column]}_weighted_sum"
)
+ (
global_means[
self.encoded_columns_to_response_columns[encoded_column]
]
* nw.lit(self.prior)
)
)
/ (nw.col("weight_sum") + nw.lit(self.prior)).cast(
getattr(nw, self.return_type),
)
).alias(encoded_column + "_mapped")
for encoded_column in self.encoded_columns
}
return {
encoded_column: _collect_frame(
groups[self.encoded_columns_to_columns[encoded_column]].select(
exprs_dict[encoded_column + "_mapped"],
nw.col(self.encoded_columns_to_columns[encoded_column]),
)
)
for encoded_column in self.encoded_columns
}
@block_from_json
def _setup_fit_multi_level(
self,
y_vals: list[int | float],
response_column: str,
) -> None:
"""Set attrs needed for fit, for multi level case.
Parameters
----------
y_vals: list[Union[int, float]]
y values present in data
response_column: str
name of response column
# this private method is not intended to be used outside
# of the fit process, so not including examples
Raises
------
ValueError: if user provided levels are not present in y
"""
self.response_levels = self.level
if self.level == "all":
self.response_levels = y_vals
elif any(level not in y_vals for level in self.level):
msg = "Levels contains a level to encode against that is not present in the response."
raise ValueError(msg)
self.column_to_encoded_columns = {
c: [c + "_" + str(level) for level in self.response_levels]
for c in self.columns
}
self.encoded_columns_to_response_columns = {
c + "_" + str(level): response_column + "_" + str(level)
for c in self.columns
for level in self.response_levels
}
self.response_columns = [
response_column + "_" + level for level in self.response_levels
]
def _check_for_failed_fit(self) -> None:
"""Check if fit failed to find needed attrs.
Occurs if mapping values or unseen_level_handling_dict values are null
unexpectedly.
Raises
------
ValueError: if mapping values or unseen_level_handling_dict values
have come out as None unexpectedly
"""
failed_columns = []
for col in self.encoded_columns:
if any(_is_null(value) for value in self.mappings[col].values()):
failed_columns.append(col)
break
if self.unseen_level_handling and _is_null(
self.unseen_levels_encoding_dict[col]
):
failed_columns.append(col)
break
if failed_columns:
msg = f"fit has failed for columns {failed_columns}, it is possible that all rows are invalid - check for null/negative weights, all null columns, or other invalid conditions listed in the docstring"
raise ValueError(msg)
@block_from_json
def _setup_fit_single_level(self, response_column: str) -> None:
"""Set attrs needed for fit, for non-multi level case.
Parameters
----------
response_column: str
name of response column
# this private method is not intended to be used outside
# of the fit process, so not including examples
"""
# arbitrary len 1 iterable so logic can be shared with multi level
self.response_levels = ["SINGLE_LEVEL"]
self.column_to_encoded_columns = {c: [c] for c in self.columns}
self.encoded_columns_to_response_columns = dict.fromkeys(
self.columns,
response_column,
)
self.response_columns = [
response_column,
]
[docs]
@block_from_json
@beartype
def fit(self, X: DataFrame, y: Series | LazyFrame) -> MeanResponseTransformer: # noqa:PLR0914, will simplify in future issue
"""Identify mapping of categorical levels to mean response values.
If the user specified the weights_column arg in when initialising the transformer
the weighted mean response will be calculated using that column.
In the multi-level case this method learns which response levels are present and
are to be encoded against.
Parameters
----------
X : DataFrame
Data to with catgeorical variable columns to transform and also containing response_column
column.
y : Series or LazyFrame
Response variable or target.
Returns
-------
MeanResponseTransformer: fitted class instance
Raises
------
ValueError: if y contains null values
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... unseen_level_handling="mean",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]})
>>> transformer.fit(test_df, test_df["target"])
MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean')
```
"""
X = _convert_dataframe_to_narwhals(X)
y = _convert_series_to_narwhals(y)
# Collect lazy y to enable operations like .unique().to_list()
y = _collect_series(y)
BaseTransformer.fit(self, X, y)
self.mappings = {}
self.unseen_levels_encoding_dict = {}
weights_column = self.weights_column
if self.weights_column is None:
X, weights_column = WeightColumnMixin._create_unit_weights_column(
X,
return_native=False,
verbose=self.verbose,
)
WeightColumnMixin.check_weights_column(self, X, weights_column)
valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr(
weights_column, self.verbose
)
y_vals = y.unique().to_list()
if (response_null_count := y.is_null().sum()) > 0:
msg = f"{self.classname()}: y has {response_null_count} null values"
raise ValueError(msg)
X_y = self._combine_X_y(X, y, return_native_override=False)
response_column = "_temporary_response"
X_y = X_y.filter(valid_weights_filter_expr)
if self.MULTI_LEVEL:
self._setup_fit_multi_level(y_vals, response_column)
else:
self._setup_fit_single_level(response_column)
self.encoded_columns_to_columns = {
encoded_column: c
for c in self.columns
for encoded_column in self.column_to_encoded_columns[c]
}
self.encoded_columns = [
encoded_column
for c in self.columns
for encoded_column in self.column_to_encoded_columns[c]
]
self.encoded_columns.sort()
# start by creating new columns as clones
encoded_column_exprs = {
encoded_column: nw.col(
self.encoded_columns_to_columns[encoded_column],
).alias(encoded_column)
for encoded_column in self.encoded_columns
}
# then setup binary response expressions for each level
response_exprs = {
response_column + "_" + level if self.MULTI_LEVEL else response_column: (
nw.col(response_column) == level
)
if self.MULTI_LEVEL
else nw.col(response_column)
for level in self.response_levels
}
weighted_response_exprs = {
"weighted_" + response_column: response_exprs[response_column]
* nw.col(weights_column).alias("weighted_" + response_column)
for response_column in self.response_columns
}
all_response_exprs = {}
all_response_exprs.update(response_exprs)
all_response_exprs.update(weighted_response_exprs)
# materialise these for global mean
# calculations to work with
X_y = X_y.with_columns(**all_response_exprs)
global_means = {}
global_mean_exprs = _get_mean_calculation_expressions(
self.response_columns,
weights_column,
)
global_means = _collect_frame(X_y.select(**global_mean_exprs)).to_dict(
as_series=False
)
global_means = {
response_column: global_means[response_column][0]
for response_column in self.response_columns
}
# now get the weighted response per group
aggs = {
c: [
nw.col(weights_column).sum().alias("weight_sum"),
*[
nw.col("weighted_" + binary_response_column)
.sum()
.alias(f"{binary_response_column}_weighted_sum")
for binary_response_column in self.response_columns
],
]
for c in self.columns
}
groups = {c: X_y.group_by(c).agg(aggs[c]) for c in self.columns}
# the previous two then make up the inputs for our encoding algorithm
prior_encodings = self._prior_regularisation(
global_means,
groups,
)
results_dict = {
c: prior_encodings[c].to_dict(as_series=False) for c in prior_encodings
}
self.mappings.update(
{
encoded_column: dict(
zip(
results_dict[encoded_column][
self.encoded_columns_to_columns[encoded_column]
],
results_dict[encoded_column][encoded_column + "_mapped"],
strict=False,
),
)
for encoded_column in self.encoded_columns
},
)
# set this attr up for BaseMappingTransformerMixin
# this is used to cast the narwhals mapping df, so uses narwhals types
self.return_dtypes = dict.fromkeys(self.encoded_columns, self.return_type)
# use BaseMappingTransformer init to process args
# extract null_mappings from mappings etc
base_mapping_transformer = BaseMappingTransformer(
mappings=self.mappings,
return_dtypes=self.return_dtypes,
)
self.mappings = base_mapping_transformer.mappings
self.mappings_from_null = base_mapping_transformer.mappings_from_null
self.return_dtypes = base_mapping_transformer.return_dtypes
self._fit_unseen_level_handling_dict(X_y, encoded_column_exprs, weights_column)
self._check_for_failed_fit()
self.is_fitted_ = True
return self
@beartype
@block_from_json
def _fit_unseen_level_handling_dict(
self,
X_y: DataFrame,
encoded_column_exprs: dict[str, nw.Expr],
weights_column: str,
) -> None:
"""Learn values for unseen levels to be mapped to.
Potential cases depend on unseen_level_handling attr:
- if int/float value has been provided, this will cast to the appropriate type
and be directly used
- if median/mean/min/max, the appropriate weighted statistic is calculated on the mapped data, and
cast to the appropriate type
Parameters
----------
X_y : DataFrame
Data to with categorical variable columns to transform and also containing response_column
column.
encoded_column_exprs: dict[str, nw.Expr]
dict of format str: expression for creating initial encoded columns. Needed for Median
unseen level option which requires intermediate materialisations.
weights_column : str
name of weights column
# this private method is not intended to be used outside
# of the fit process, so not including examples
"""
if isinstance(self.unseen_level_handling, (int, float)):
self.unseen_levels_encoding_dict.update(
dict.fromkeys(self.encoded_columns, self.unseen_level_handling)
)
elif isinstance(self.unseen_level_handling, str):
unseen_level_exprs = {}
mapping_expressions = {
encoded_col: nw.col(col)
.alias(encoded_col)
.replace_strict(
self.mappings[encoded_col],
return_dtype=getattr(nw, self.return_dtypes[encoded_col]),
)
for col in self.columns
for encoded_col in self.column_to_encoded_columns[col]
}
if self.unseen_level_handling in {"mean", "median"}:
if self.unseen_level_handling == "mean":
# have to call this many times as weights column varies with c
unseen_level_exprs.update(
_get_mean_calculation_expressions(
self.encoded_columns,
weights_column,
initial_columns_exprs=mapping_expressions,
),
)
# else, median
else:
for c in self.encoded_columns:
null_filter_expr = ~nw.col(
self.encoded_columns_to_columns[c]
).is_null()
X_temp = (
X_y.with_columns(**encoded_column_exprs)
.filter(null_filter_expr)
.sort(c)
)
median_expr = _get_median_calculation_expression(
values_column=self.encoded_columns_to_response_columns[c],
weights_column=weights_column,
)
self.unseen_levels_encoding_dict[c] = _collect_frame(
X_temp.select(median_expr)
).item(0, 0)
# else, min/max
else:
unseen_level_exprs.update(
{
c: getattr(mapping_expressions[c], self.unseen_level_handling)()
for c in self.encoded_columns
},
)
# median will already have fit as it requires sorting/materialising
if self.unseen_level_handling != "median":
unseen_level_results = _collect_frame(
X_y.select(**unseen_level_exprs)
).to_dict(
as_series=True,
)
self.unseen_levels_encoding_dict = {
c: unseen_level_results[c].item(0) for c in self.encoded_columns
}
[docs]
@beartype
def transform(self, X: DataFrame) -> DataFrame:
"""Apply mean response encoding stored in the mappings attribute to columns.
Parameters
----------
X : DataFrame
Data with nominal columns to transform.
Returns
-------
X : DataFrame
Transformed input X with levels mapped according to mappings dict.
Examples
--------
```pycon
>>> import polars as pl
>>> # example with no prior
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=0,
... unseen_level_handling="mean",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]})
>>> _ = transformer.fit(test_df, test_df["target"])
>>> transformer.transform(test_df)
shape: (2, 3)
┌─────┬─────┬────────┐
│ a ┆ b ┆ target │
│ --- ┆ --- ┆ --- │
│ f32 ┆ i64 ┆ i64 │
╞═════╪═════╪════════╡
│ 0.0 ┆ 1 ┆ 0 │
│ 1.0 ┆ 2 ┆ 1 │
└─────┴─────┴────────┘
# example with prior
>>> transformer = MeanResponseTransformer(
... columns="a",
... prior=1,
... unseen_level_handling="mean",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]})
>>> _ = transformer.fit(test_df, test_df["target"])
>>> transformer.transform(test_df)
shape: (2, 3)
┌──────┬─────┬────────┐
│ a ┆ b ┆ target │
│ --- ┆ --- ┆ --- │
│ f32 ┆ i64 ┆ i64 │
╞══════╪═════╪════════╡
│ 0.25 ┆ 1 ┆ 0 │
│ 0.75 ┆ 2 ┆ 1 │
└──────┴─────┴────────┘
```
"""
self.check_is_fitted(
[
"mappings",
"return_dtypes",
"column_to_encoded_columns",
"encoded_columns",
],
)
X = _convert_dataframe_to_narwhals(X)
X = super().transform(
X,
return_native_override=False,
)
transform_expressions = {
encoded_col: nw.col(col)
.alias(encoded_col)
.replace_strict(
self.mappings[encoded_col],
default=self.unseen_levels_encoding_dict[encoded_col]
if self.unseen_level_handling
else None,
)
.cast(getattr(nw, self.return_dtypes[encoded_col]))
for col in self.columns
for encoded_col in self.column_to_encoded_columns[col]
}
X = (
X.with_columns(
**transform_expressions,
)
if transform_expressions
else X
)
columns_to_drop = [
col for col in self.columns if col not in self.encoded_columns
]
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
columns_to_drop,
return_native=False,
)
return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs]
@register
class OneHotEncodingTransformer(
DropOriginalMixin,
BaseTransformer,
):
"""Transformer to convert categorical variables into dummy columns.
Attributes
----------
separator : str
Separator used in naming for dummy columns.
drop_original : bool
Should original columns be dropped after creating dummy fields?
built_from_json: bool
indicates if transformer was reconstructed from json, which limits it's supported
functionality to .transform
polars_compatible : bool
class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... )
>>> transformer
OneHotEncodingTransformer(columns=['a'])
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]})
>>> _ = transformer.fit(test_df)
>>> # transformer can also be dumped to json and reinitialised
>>> json_dump = transformer.to_json()
>>> json_dump
{'tubular_version': ..., 'classname': 'OneHotEncodingTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'wanted_values': None, 'separator': '_', 'drop_original': False}, 'fit': {'is_fitted_': True, 'categories_': {'a': ['x', 'y']}, 'new_feature_names_': {'a': ['a_x', 'a_y']}}}
>>> OneHotEncodingTransformer.from_json(json_dump)
OneHotEncodingTransformer(columns=['a'])
```
"""
polars_compatible = True
lazyframe_compatible = True
jsonable = True
FITS = True
MAX_LEVELS = 100
@beartype
def __init__(
self,
columns: str | ListOfStrs | None = None,
wanted_values: dict[str, ListOfStrs] | None = None,
separator: str = "_",
drop_original: bool = False,
**kwargs: bool,
) -> None:
"""Initialise class instance.
Parameters
----------
columns : str or list of strings or None, default = None
Names of columns to transform. If the default of None is supplied all object and category
columns in X are used.
wanted_values: dict[str, list[str] or None , default = None
Optional parameter to select specific column levels to be transformed. If it is None, all levels in the categorical column will be encoded. It will take the format {col1: [level_1, level_2, ...]}.
separator : str
Used to create dummy column names, the name will take
the format [categorical feature][separator][category level]
drop_original : bool, default = False
Should original columns be dropped after creating dummy fields?
**kwargs
Arbitrary keyword arguments passed onto sklearn OneHotEncoder.init method.
Raises
------
ValueError: if keys of wanted_values arg are not in columns arg
"""
BaseTransformer.__init__(
self,
columns=columns,
**kwargs,
)
if wanted_values and set(wanted_values.keys()) != set(self.columns):
msg = f"{self.classname()}: keys of wanted values should match provided columns"
raise ValueError(msg)
self.wanted_values = wanted_values
self.drop_original = drop_original
self.separator = separator
[docs]
@block_from_json
def to_json(self) -> dict[str, dict[str, Any]]:
"""Dump transformer to json dict.
Returns
-------
dict[str, dict[str, Any]]:
jsonified transformer. Nested dict containing levels for attributes
set at init and fit.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(columns=["a"])
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]})
>>> _ = transformer.fit(test_df)
>>> # version will vary for local vs CI, so use ... as generic match
>>> transformer.to_json()
{'tubular_version': ..., 'classname': 'OneHotEncodingTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'wanted_values': None, 'separator': '_', 'drop_original': False}, 'fit': {'is_fitted_': True, 'categories_': {'a': ['x', 'y']}, 'new_feature_names_': {'a': ['a_x', 'a_y']}}}
```
"""
self.check_is_fitted(["categories_", "new_feature_names_"])
json_dict = super().to_json()
json_dict["init"].update(
{
"wanted_values": self.wanted_values,
"separator": self.separator,
"drop_original": self.drop_original,
},
)
json_dict["fit"].update(
{
"categories_": self.categories_,
"new_feature_names_": self.new_feature_names_,
},
)
return json_dict
[docs]
def get_feature_names_out(self) -> list[str]:
"""List features modified/created by the transformer.
Returns
-------
list[str]:
list of features modified/created by the transformer
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... wanted_values={"a": ["cat", "dog"]},
... )
>>> transformer.get_feature_names_out()
['a_cat', 'a_dog']
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... )
>>> transformer.get_feature_names_out()
Traceback (most recent call last):
...
sklearn.exceptions.NotFittedError: ...
>>> test_df = pl.DataFrame({"a": ["cat", "dog", "rat"]})
>>> _ = transformer.fit(test_df)
>>> transformer.get_feature_names_out()
['a_cat', 'a_dog', 'a_rat']
```
"""
# if wanted values is not provided, this function
# depends on fit having been called
if not self.wanted_values:
self.check_is_fitted("categories_")
return [
output_column
for column in self.columns
for output_column in self._get_feature_names(column)
]
return [
column + self.separator + str(level)
for column in self.columns
for level in self.wanted_values[column]
]
[docs]
@block_from_json
@beartype
def fit(
self,
X: DataFrame,
y: Series | LazyFrame | None = None,
) -> OneHotEncodingTransformer:
"""Get list of levels for each column to be transformed.
This defines which dummy columns will be created in transform.
Parameters
----------
X : DataFrame
Data to identify levels from.
y : None
Ignored. This parameter exists only for compatibility with sklearn.pipeline.Pipeline.
Returns
-------
OneHotEncodingTransformer: fitted class instance
Raises
------
ValueError: if column has >100 levels
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]})
>>> transformer.fit(test_df)
OneHotEncodingTransformer(columns=['a'])
```
"""
X = _convert_dataframe_to_narwhals(X)
y = _convert_series_to_narwhals(y)
BaseTransformer.fit(self, X=X, y=y)
self.categories_ = {}
self.new_feature_names_ = {}
results = X.select(nw.col(c) for c in self.columns)
results = _collect_frame(results)
results_dict = results.to_dict()
# Check each field has less than 100 categories/levels
for c in self.columns:
results_list = results_dict[c].unique().to_list()
non_null_results_list = [val for val in results_list if not _is_null(val)]
if self.verbose and len(non_null_results_list) < len(results_list):
warnings.warn(
f"{self.classname()}: Column {c} contains null values which will be ignored during fitting",
stacklevel=2,
)
# if the user has provided a 'wanted_values' as a list of expected dummies,
# then there is actually nothing we need to fit on data here
self.categories_[c] = (
sorted(category for category in non_null_results_list)
if not self.wanted_values
else self.wanted_values[c]
)
level_count = len(self.categories_[c])
if level_count > self.MAX_LEVELS:
raise ValueError(
f"{self.classname()}: column %s has over {self.MAX_LEVELS} unique values - consider another type of encoding"
% c,
)
self.new_feature_names_[c] = self._get_feature_names(column=c)
self.is_fitted_ = True
return self
@beartype
def _get_feature_names(
self,
column: str,
) -> list[str]:
"""Get list of features that will be output by transformer.
Parameters
----------
column: str
column to get dummy feature names for
Returns
-------
list[str]: list of output features
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]})
>>> _ = transformer.fit(test_df)
>>> transformer._get_feature_names("a")
['a_x', 'a_y']
```
"""
return [
column + self.separator + str(level) for level in self.categories_[column]
]
[docs]
@beartype
def transform(
self,
X: DataFrame,
return_native_override: bool | None = None,
) -> DataFrame:
"""Create new dummy columns from categorical fields.
Parameters
----------
X : DataFrame
Data to apply one hot encoding to.
return_native_override: bool
controls whether transformer returns narwhals or native type.
return_native_override: Optional[bool]
option to override return_native attr in transformer, useful when calling parent
methods
Returns
-------
X_transformed : DataFrame
Transformed input X with dummy columns derived from categorical columns added. If drop_original
= True then the original categorical columns that the dummies are created from will not be in
the output X.
Examples
--------
```pycon
>>> import polars as pl
>>> transformer = OneHotEncodingTransformer(
... columns="a",
... )
>>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]})
>>> _ = transformer.fit(test_df)
>>> transformer.transform(test_df)
shape: (2, 4)
┌─────┬─────┬───────┬───────┐
│ a ┆ b ┆ a_x ┆ a_y │
│ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ bool ┆ bool │
╞═════╪═════╪═══════╪═══════╡
│ x ┆ 1 ┆ true ┆ false │
│ y ┆ 2 ┆ false ┆ true │
└─────┴─────┴───────┴───────┘
```
"""
return_native = self._process_return_native(return_native_override)
# Check that transformer has been fit before calling transform
self.check_is_fitted(["categories_", "new_feature_names_"])
X = _convert_dataframe_to_narwhals(X)
X = BaseTransformer.transform(self, X, return_native_override=False)
transform_expressions = {}
for c in self.columns:
for level in self.categories_[c]:
if c + self.separator + str(level) in self.new_feature_names_[c]:
transform_expressions[c + self.separator + str(level)] = (
nw.col(c) == level
)
# make column order consistent
sorted_keys = sorted(transform_expressions.keys())
X = (
X.with_columns(**{key: transform_expressions[key] for key in sorted_keys})
if transform_expressions
else X
)
# Drop original columns if self.drop_original is True
X = DropOriginalMixin.drop_original_column(
X,
self.drop_original,
self.columns,
return_native=False,
)
return _return_narwhals_or_native_dataframe(X, return_native)
# DEPRECATED TRANSFORMERS
[docs]
@deprecated(
"""This transformer has not been selected for conversion to polars/narwhals,
and so has been deprecated. If it is useful to you, please raise an issue
for it to be modernised
""",
)
class OrdinalEncoderTransformer(
BaseMappingTransformMixin,
WeightColumnMixin,
):
"""Encode categorical variables into ascending rank-ordered integer values variables.
Maps levels to the target-mean response for that level.
Values will be sorted in ascending order only i.e. categorical level with lowest target mean response to
be encoded as 1, the next highest value as 2 and so on.
If a categorical variable contains null values these will not be transformed.
Attributes
----------
weights_column : str or None
Weights column to use when calculating the mean response.
mappings : dict
Created in fit. Dict of key (column names) value (mapping of categorical levels to numeric,
ordinal encoded response values) pairs.
built_from_json: bool
indicates if transformer was reconstructed from json, which limits it's supported
functionality to .transform
polars_compatible : bool
class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
deprecated: bool
indicates if class has been deprecated
"""
polars_compatible = False
lazyframe_compatible = False
jsonable = False
FITS = True
deprecated = True
@beartype
def __init__(
self,
columns: str | list[str],
weights_column: str | None = None,
**kwargs: bool,
) -> None:
"""Initialise class instance.
Parameters
----------
columns : None or str or list, default = None
Columns to transform, if the default of None is supplied all object and category
columns in X are used.
weights_column : str or None
Weights column to use when calculating the mean response.
**kwargs
Arbitrary keyword arguments passed onto BaseTransformer.init method.
"""
self.weights_column = weights_column
BaseTransformer.__init__(self, columns=columns, **kwargs)
# this transformer shouldn't really be used with huge numbers of levels
# so setup to use int8 type
# if there are more levels than this, will get a type error
self.return_dtypes = dict.fromkeys(self.columns, "Int8")
def _check_for_failed_fit(self) -> None:
"""Check if fit failed to find needed attrs.
Occurs if mapping values are null unexpectedly.
Raises
------
ValueError: if mapping values have come out as None unexpectedly
"""
failed_columns = []
for col in self.columns:
if len(self.mappings[col]) == 0:
failed_columns.append(col)
break
if failed_columns:
msg = f"fit has failed for columns {failed_columns}, it is possible that all rows are invalid - check for null/negative weights, all null columns, or other invalid conditions listed in the docstring"
raise ValueError(msg)
[docs]
@beartype
def fit(self, X: DataFrame, y: Series) -> OrdinalEncoderTransformer:
"""Identify mapping of categorical levels to rank-ordered integer values by target-mean in ascending order.
If the user specified the weights_column arg in when initialising the transformer
the weighted mean response will be calculated using that column.
Parameters
----------
X : DataFrame
Data to with catgeorical variable columns to transform and response_column column
specified when object was initialised.
y : Series or LazyFrame
Response column or target.
Returns
-------
OrdinalEncoderTransformer: fitted class instance
Raises
------
ValueError: if y contains nulls
"""
X = _convert_dataframe_to_narwhals(X)
y = _convert_series_to_narwhals(y)
BaseTransformer.fit(self, X, y)
self.mappings = {}
weights_column = self.weights_column
if self.weights_column is None:
X, weights_column = WeightColumnMixin._create_unit_weights_column(
X,
return_native=False,
verbose=self.verbose,
)
WeightColumnMixin.check_weights_column(self, X, weights_column)
valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr(
weights_column, self.verbose
)
if (response_null_count := y.is_null().sum()) > 0:
msg = f"{self.classname()}: y has {response_null_count} null values"
raise ValueError(msg)
X_y = self._combine_X_y(X, y, return_native_override=False)
response_column = "_temporary_response"
X_y = X_y.filter(valid_weights_filter_expr)
# the need to sort for each c limits the optimisation we can do here,
# as it is still necessarily to materialise for each column
for c in self.columns:
groupby_sum = X_y.group_by([c]).agg(
nw.col(response_column).sum(), nw.col(weights_column).sum()
)
# get the indexes of the sorted target mean-encoded dict
encodings = (
groupby_sum.select(
(nw.col(response_column) / nw.col(weights_column)).alias(
"encodings"
),
nw.col(c),
)
.sort(by="encodings", descending=False)
.to_dict()
)
# create a dictionary whose keys are the levels of the categorical variable
# sorted ascending by their target-mean value
# and whose values are ascending ordinal integers
ordinal_encoded_dict = {
encodings[c][k]: k + 1 for k in range(len(encodings[c]))
}
self.mappings[c] = ordinal_encoded_dict
for col in self.columns:
# if more levels than int8 type can handle, then error
if len(self.mappings[col]) > np.iinfo(np.int8).max:
msg = f"{self.classname()}: column {c} has too many levels to encode"
raise ValueError(
msg,
)
# use BaseMappingTransformer init to process args
# extract null_mappings from mappings etc
base_mapping_transformer = BaseMappingTransformer(
mappings=self.mappings,
return_dtypes=self.return_dtypes,
)
self.mappings = base_mapping_transformer.mappings
self.mappings_from_null = base_mapping_transformer.mappings_from_null
self.return_dtypes = base_mapping_transformer.return_dtypes
self._check_for_failed_fit()
return self
[docs]
@beartype
def transform(self, X: DataFrame) -> DataFrame:
"""Apply ordinal encoding stored in the mappings attribute to columns.
This maps categorical levels to rank-ordered integer values by target-mean in ascending order.
Parameters
----------
X : DataFrame
Data to with catgeorical variable columns to transform.
Returns
-------
X : DataFrame
Transformed data with levels mapped to ordinal encoded values for categorical variables.
"""
X = BaseTransformer.transform(self, X)
return BaseMappingTransformMixin.transform(self, X)
[docs]
@deprecated(
"""This transformer has not been selected for conversion to polars/narwhals,
and so has been deprecated. If it is useful to you, please raise an issue
for it to be modernised
""",
)
class NominalToIntegerTransformer(BaseMappingTransformMixin):
"""Transformer to convert columns containing nominal values into integer values.
The nominal levels that are mapped to integers are not ordered in any way.
Attributes
----------
start_encoding : int
Value to start the encoding / mapping of nominal to integer from.
mappings : dict
Created in fit. A dict of key (column names) value (mappings between levels and integers for given
column) pairs.
built_from_json: bool
indicates if transformer was reconstructed from json, which limits it's supported
functionality to .transform
polars_compatible : bool
class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
jsonable: bool
class attribute, indicates if transformer supports to/from_json methods
FITS: bool
class attribute, indicates whether transform requires fit to be run first
lazyframe_compatible: bool
class attribute, indicates whether transformer works with lazyframes
deprecated: bool
indicates if class has been deprecated
"""
polars_compatible = False
lazyframe_compatible = False
jsonable = False
FITS = True
deprecated = True
def __init__(
self,
columns: str | list[str] | None = None,
start_encoding: int = 0,
**kwargs: dict[str, bool],
) -> None:
"""Initialise class instance.
Parameters
----------
columns : None or str or list, default = None
Columns to transform, if the default of None is supplied all object and category
columns in X are used.
start_encoding : int, default = 0
Value to start the encoding from e.g. if start_encoding = 0 then the encoding would be
{'A': 0, 'B': 1, 'C': 3} etc.. or if start_encoding = 5 then the same encoding would be
{'A': 5, 'B': 6, 'C': 7}. Can be positive or negative.
**kwargs
Arbitrary keyword arguments passed onto BaseTransformer.init method.
Raises
------
ValueError: if `start_encoding` is not int
"""
BaseTransformer.__init__(self, columns=columns, **kwargs)
# this transformer shouldn't really be used with huge numbers of levels
# so setup to use int8 type
# if there are more levels than this, will get a type error
self.return_dtypes = dict.fromkeys(self.columns, "Int8")
if not isinstance(start_encoding, int):
msg = f"{self.classname()}: start_encoding should be an integer"
raise ValueError(msg)
self.start_encoding = start_encoding
[docs]
def fit(self, X: pd.DataFrame, y: pd.Series | None = None) -> pd.DataFrame:
"""Create mapping between nominal levels and integer values for categorical variables.
Parameters
----------
X : pd.DataFrame
Data to fit the transformer on, this sets the nominal levels that can be mapped.
y : None or pd.DataFrame or pd.Series, default = None
Optional argument only required for the transformer to work with sklearn pipelines.
Returns
-------
NominalToIntegerTransformer: fitted class instance
Raises
------
ValueError: if column has more levels than can be encoded as int8
"""
BaseTransformer.fit(self, X, y)
self.mappings = {}
for c in self.columns:
col_values = X[c].unique()
self.mappings[c] = {
k: i for i, k in enumerate(col_values, self.start_encoding)
}
# if more levels than int8 type can handle, then error
if len(self.mappings[c]) > np.iinfo(np.int8).max:
msg = f"{self.classname()}: column {c} has too many levels to encode"
raise ValueError(
msg,
)
# use BaseMappingTransformer init to process args
# extract null_mappings from mappings etc
base_mapping_transformer = BaseMappingTransformer(
mappings=self.mappings,
return_dtypes=self.return_dtypes,
)
self.mappings = base_mapping_transformer.mappings
self.mappings_from_null = base_mapping_transformer.mappings_from_null
self.return_dtypes = base_mapping_transformer.return_dtypes
return self
[docs]
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
"""Apply integer encoding stored in the mappings attribute to columns.
Parameters
----------
X : pd.DataFrame
Data with nominal columns to transform.
Returns
-------
X : pd.DataFrame
Transformed input X with levels mapped according to mappings dict.
"""
X = BaseTransformer.transform(self, X)
return BaseMappingTransformMixin.transform(self, X)