Source code for tubular.nominal

"""Contains transformers that apply encodings to nominal columns."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Literal

import narwhals as nw
import numpy as np
from beartype import beartype
from narwhals.dtypes import DType  # noqa: F401
from typing_extensions import deprecated

from tubular._stats import (
    _get_mean_calculation_expressions,
    _get_median_calculation_expression,
)
from tubular._utils import (
    _collect_frame,
    _collect_series,
    _convert_dataframe_to_narwhals,
    _convert_series_to_narwhals,
    _is_null,
    _return_narwhals_or_native_dataframe,
    block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.mapping import BaseMappingTransformer, BaseMappingTransformMixin
from tubular.mixins import DropOriginalMixin, WeightColumnMixin
from tubular.types import (
    DataFrame,
    FloatBetweenZeroOne,
    LazyFrame,
    ListOfStrs,
    PositiveInt,
    Series,
)

if TYPE_CHECKING:
    import pandas as pd


[docs] @register class GroupRareLevelsTransformer(BaseTransformer, WeightColumnMixin): """Group together rare levels of nominal variables into a new rare level. Rare levels are defined by a cut off percentage, which can either be based on the number of rows or sum of weights. Any levels below this cut off value will be grouped into the rare level. Attributes ---------- cut_off_percent : float Cut off percentage (either in terms of number of rows or sum of weight) for a given nominal level to be considered rare. non_rare_levels : dict Created in fit. A dict of non-rare levels (i.e. levels with more than cut_off_percent weight or rows) that is used to identify rare levels in transform. rare_level_name : any Must be of the same type as columns. Label for the new nominal level that will be added to group together rare levels (as defined by cut_off_percent). record_rare_levels : bool Should the 'rare' levels that will be grouped together be recorded? If not they will be lost after the fit and the only information remaining will be the 'non'rare' levels. rare_levels_record : dict Only created (in fit) if record_rare_levels is True. This is dict containing a list of levels that were grouped into 'rare' for each column the transformer was applied to. weights_column : str Name of weights columns to use if cut_off_percent should be in terms of sum of weight not number of rows. unseen_levels_to_rare : bool If True, unseen levels in new data will be passed to rare, if set to false they will be left unchanged. training_data_levels : dict[set] Dictionary containing the set of values present in the training data for each column in self.columns. It will only exist in if unseen_levels_to_rare is set to False. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Examples -------- ```pycon >>> GroupRareLevelsTransformer( ... columns="a", ... cut_off_percent=0.02, ... rare_level_name="rare_level", ... ) GroupRareLevelsTransformer(columns=['a'], cut_off_percent=0.02, rare_level_name='rare_level') ``` """ polars_compatible = True lazyframe_compatible = True jsonable = True FITS = True @beartype def __init__( # noqa: PLR0917, PLR0913 self, columns: str | ListOfStrs | None = None, cut_off_percent: FloatBetweenZeroOne = 0.01, weights_column: str | None = None, rare_level_name: str | ListOfStrs = "rare", record_rare_levels: bool = True, unseen_levels_to_rare: bool = True, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : None or str or list, default = None Columns to transform, if the default of None is supplied all object and category columns in X are used. cut_off_percent : float, default = 0.01 Cut off for the percent of rows or percent of weight for a level, levels below this value will be grouped. weights_column : None or str, default = None Name of weights column that should be used so cut_off_percent applies to sum of weights rather than number of rows. rare_level_name : any,default = 'rare'. Must be of the same type as columns. Label for the new 'rare' level. record_rare_levels : bool, default = False If True, an attribute called rare_levels_record will be added to the object. This will be a dict of key (column name) value (level from column considered rare according to cut_off_percent) pairs. Care should be taken if working with nominal variables with many levels as this could potentially result in many being stored in this attribute. unseen_levels_to_rare : bool, default = True If True, unseen levels in new data will be passed to rare, if set to false they will be left unchanged. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ super().__init__(columns=columns, **kwargs) self.cut_off_percent = cut_off_percent self.weights_column = weights_column self.rare_level_name = rare_level_name self.record_rare_levels = record_rare_levels self.unseen_levels_to_rare = unseen_levels_to_rare
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> import tests.test_data as d >>> df = d.create_df_8("pandas") >>> x = GroupRareLevelsTransformer( ... columns=["b", "c"], cut_off_percent=0.4, unseen_levels_to_rare=False ... ) >>> x.fit(df) GroupRareLevelsTransformer(columns=['b', 'c'], cut_off_percent=0.4, unseen_levels_to_rare=False) >>> x.to_json() {'tubular_version': ..., 'classname': 'GroupRareLevelsTransformer', 'init': {'columns': ['b', 'c'], 'copy': False, 'verbose': False, 'return_native': True, 'cut_off_percent': 0.4, 'weights_column': None, 'rare_level_name': 'rare', 'record_rare_levels': True, 'unseen_levels_to_rare': False}, 'fit': {'is_fitted_': True, 'non_rare_levels': {'b': ['w'], 'c': ['a']}, 'training_data_levels': {'b': ['w', 'x', 'y', 'z'], 'c': ['a', 'b', 'c']}, 'rare_levels_record': {'b': ['x', 'y', 'z'], 'c': ['b', 'c']}}} ``` """ self.check_is_fitted(["non_rare_levels"]) json_dict = super().to_json() json_dict["init"].update( { "cut_off_percent": self.cut_off_percent, "weights_column": self.weights_column, "rare_level_name": self.rare_level_name, "record_rare_levels": self.record_rare_levels, "unseen_levels_to_rare": self.unseen_levels_to_rare, }, ) json_dict["fit"]["non_rare_levels"] = self.non_rare_levels if not self.unseen_levels_to_rare: self.check_is_fitted(["training_data_levels"]) json_dict["fit"]["training_data_levels"] = self.training_data_levels if self.record_rare_levels: self.check_is_fitted(["rare_levels_record"]) json_dict["fit"]["rare_levels_record"] = self.rare_levels_record return json_dict
@beartype def _check_str_like_columns(self, schema: nw.Schema) -> None: """Check that transformer being called on only str-like columns. Parameters ---------- schema: nw.Schema schema of input data Raises ------ TypeError: if columns are not str-like Examples -------- ```pycon >>> import polars as pl >>> import narwhals as nw >>> transformer = GroupRareLevelsTransformer( ... columns="a", ... cut_off_percent=0.02, ... rare_level_name="rare_level", ... ) >>> # non erroring example >>> test_df = pl.DataFrame({"a": ["w", "x"], "b": ["y", "z"]}) >>> schema = nw.from_native(test_df).schema >>> transformer._check_str_like_columns(schema) >>> # erroring example >>> test_df = pl.DataFrame({"a": [1, 2], "b": ["y", "z"]}) >>> schema = nw.from_native(test_df).schema >>> transformer._check_str_like_columns(schema) Traceback (most recent call last): ... TypeError: ... ``` """ str_like_columns = [ col for col in self.columns if schema[col] in {nw.String, nw.Categorical, nw.Object} ] non_str_like_columns = set(self.columns).difference( set( str_like_columns, ), ) if len(non_str_like_columns) != 0: msg = f"{self.classname()}: transformer must run on str-like columns, but got non str-like {non_str_like_columns}" raise TypeError(msg)
[docs] @block_from_json @beartype def fit( self, X: DataFrame, y: Series | LazyFrame | None = None, ) -> GroupRareLevelsTransformer: """Record non-rare levels for categorical variables. When transform is called, only levels records in non_rare_levels during fit will remain unchanged - all other levels will be grouped. If record_rare_levels is True then the rare levels will also be recorded. The label for the rare levels must be of the same type as the columns. Parameters ---------- X : DataFrame Data to identify non-rare levels from. y : Series or LazyFrame or None, default = None Optional argument only required for the transformer to work with sklearn pipelines. Returns ------- GroupRareLevelsTransformer: fitted class instance Examples -------- ```pycon >>> import polars as pl >>> transformer = GroupRareLevelsTransformer( ... columns="a", ... cut_off_percent=0.02, ... rare_level_name="rare_level", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]}) >>> transformer.fit(test_df) GroupRareLevelsTransformer(columns=['a'], cut_off_percent=0.02, rare_level_name='rare_level') ``` """ X = _convert_dataframe_to_narwhals(X) y = _convert_series_to_narwhals(y) super().fit(X, y) weights_column = self.weights_column if self.weights_column is None: X, weights_column = WeightColumnMixin._create_unit_weights_column( X, return_native=False, verbose=self.verbose, ) WeightColumnMixin.check_weights_column(self, X, weights_column) valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr( weights_column, self.verbose ) X = X.filter(valid_weights_filter_expr) schema = X.collect_schema() self._check_str_like_columns(schema) self.non_rare_levels = {} self.rare_levels_record = {} present_levels = {} total_weight = _collect_frame(X.select(nw.col(weights_column).sum())).item() level_weights_expr = nw.col(weights_column) / total_weight if not self.unseen_levels_to_rare: self.training_data_levels = {} for c in self.columns: group = X.group_by(c).agg(nw.col(weights_column).sum()) non_rare_levels_expr = ( nw.when(level_weights_expr >= self.cut_off_percent) .then(nw.col(c)) .otherwise(None) .alias(f"{c}_non_rare_levels") ) results = group.select(non_rare_levels_expr, nw.col(c)) results = _collect_frame(results).to_dict(as_series=True) self.non_rare_levels[c] = sorted( val for val in results[f"{c}_non_rare_levels"].unique().to_list() if not _is_null(val) ) present_levels[c] = sorted(value for value in results[c].unique().to_list()) if self.record_rare_levels: self.rare_levels_record[c] = sorted( set(present_levels[c]).difference(self.non_rare_levels[c]), ) self.rare_levels_record[c] = sorted( self.rare_levels_record[c], key=str, ) if not self.unseen_levels_to_rare: self.training_data_levels[c] = present_levels[c] self.is_fitted_ = True return self
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Group rare levels together into a new 'rare' level. Parameters ---------- X : DataFrame Data to with catgeorical variables to apply rare level grouping to. Returns ------- X : DataFrame Transformed input X with rare levels grouped for into a new rare level. Examples -------- ```pycon >>> import polars as pl >>> transformer = GroupRareLevelsTransformer( ... columns="a", ... cut_off_percent=0.5, ... rare_level_name="rare_level", ... ) >>> test_df = pl.DataFrame({"a": ["x", "x", "y"], "b": ["w", "z", "z"]}) >>> _ = transformer.fit(test_df) >>> transformer.transform(test_df) shape: (3, 2) ┌────────────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ str ┆ str │ ╞════════════╪═════╡ │ x ┆ w │ │ x ┆ z │ │ rare_level ┆ z │ └────────────┴─────┘ ``` """ X = BaseTransformer.transform(self, X, return_native_override=False) X = _convert_dataframe_to_narwhals(X) schema = X.collect_schema() self._check_str_like_columns(schema) self.check_is_fitted(["non_rare_levels"]) transform_expressions = [] for col in self.columns: non_rare_condition_expression = ( nw.col(col).is_in(self.non_rare_levels[col]) if self.unseen_levels_to_rare # if unseen levels are mapped to rare, # the condition becomes either in # non rare levels OR not in training data # levels (unseen) else ( nw.col(col).is_in(self.non_rare_levels[col]) | ~nw.col(col).is_in(self.training_data_levels[col]) ) ) transform_expression = ( nw.col(col).cast( nw.String, ) if schema[col] in {nw.Categorical, nw.Enum} else nw.col(col) ) transform_expression = ( nw.when(non_rare_condition_expression | nw.col(col).is_null()) .then(transform_expression) .otherwise(nw.lit(self.rare_level_name)) ) transform_expression = ( transform_expression.cast( nw.Enum(self.non_rare_levels[col] + [self.rare_level_name]), ) if (schema[col] in {nw.Categorical, nw.Enum}) else transform_expression ) transform_expressions.append(transform_expression) X = X.with_columns(*transform_expressions) if transform_expressions else X return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] @register class MeanResponseTransformer( BaseTransformer, WeightColumnMixin, DropOriginalMixin, ): """Convert categorical variables to numeric by mapping levels to the mean response for level. For a continuous or binary response the categorical columns specified will have values replaced with the mean response for each category. For an n > 1 level categorical response, up to n binary responses can be created, which in turn can then be used to encode each categorical column specified. This will generate up to n * len(columns) new columns, of with names of the form {column}_{response_level}. The original columns will be removed from the dataframe. This functionality is controlled using the 'level' parameter. Note that the above only works for a n > 1 level categorical response. Do not use 'level' parameter for a n = 1 level numerical response. In this case, use the standard mean response transformer without the 'level' parameter. If a categorical variable contains null values these will not be transformed. The same weights and prior are applied to each response level in the multi-level case. Attributes ---------- columns : str or list Categorical columns to encode in the input data. weights_column : str or None Weights column to use when calculating the mean response. prior : int, default = 0 Regularisation parameter, can be thought of roughly as the size a category should be in order for its statistics to be considered reliable (hence default value of 0 means no regularisation). level : str, int, float, list or None, default = None Parameter to control encoding against a multi-level categorical response. If None the response will be treated as binary or continuous, if 'all' all response levels will be encoded against and if it is a list of levels then only the levels specified will be encoded against. response_levels : list Only created in the multi-level case. Generated from level, list of all the response levels to encode against. mappings : dict Created in fit. A nested Dict of {column names : column specific mapping dictionary} pairs. Column specific mapping dictionaries contain {initial value : mapped value} pairs. mapped_columns : list Only created in the multi-level case. A list of the new columns produced by encoded the columns in self.columns against multiple response levels, of the form {column}_{level}. transformer_dict : dict Only created in the multi-level case. A dictionary of the form level : transformer containing the mean response transformers for each level to be encoded against. unseen_levels_encoding_dict: dict Dict containing the values (based on chosen unseen_level_handling) derived from the encoded columns to use when handling unseen levels in data passed to transform method. return_type: Literal['float32', 'float64'] What type to cast return column as. Defaults to float32. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Examples -------- ```pycon >>> import polars as pl >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... unseen_level_handling="mean", ... ) >>> transformer MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean') >>> # once fit, transformer can also be dumped to json and reinitialised >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [0, 1]}) >>> _ = transformer.fit(test_df[["a"]], test_df["b"]) >>> json_dump = transformer.to_json() >>> json_dump {'tubular_version': ..., 'classname': 'MeanResponseTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'weights_column': None, 'prior': 1, 'level': None, 'unseen_level_handling': 'mean', 'return_type': 'Float32', 'drop_original': True}, 'fit': {'is_fitted_': True, 'mappings': {'a': {'x': 0.25, 'y': 0.75}}, 'return_dtypes': {'a': 'Float32'}, 'column_to_encoded_columns': {'a': ['a']}, 'encoded_columns': ['a'], 'unseen_levels_encoding_dict': {'a': 0.5}}} >>> MeanResponseTransformer.from_json(json_dump) MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean') ``` """ polars_compatible = True jsonable = True lazyframe_compatible = True FITS = True @beartype def __init__( # noqa: PLR0917, PLR0913 self, columns: str | list[str] | None = None, weights_column: str | None = None, prior: PositiveInt = 0, level: float | int | str | list | None = None, unseen_level_handling: float | int | Literal["mean", "median", "min", "max"] | None = None, return_type: Literal["Float32", "Float64"] = "Float32", drop_original: bool = True, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : None or str or list, default = None Columns to transform, if the default of None is supplied all object and category columns in X are used. weights_column : str or None Weights column to use when calculating the mean response. prior : int, default = 0 Regularisation parameter, can be thought of roughly as the size a category should be in order for its statistics to be considered reliable (hence default value of 0 means no regularisation). level : str, list or None, default = None Parameter to control encoding against a multi-level categorical response. For a continuous or binary response, leave this as None. In the multi-level case, set to 'all' to encode against every response level or provide a list of response levels to encode against. unseen_level_handling : str("mean", "median", "min", "max") or int/float, default = None Parameter to control the logic for handling unseen levels of the categorical features to encode in data when using transform method. Default value of None will output error when attempting to use transform on data with unseen levels in categorical columns to encode. Set this parameter to one of the options above in order to encode unseen levels in each categorical column with the mean, median etc. of each column. One can also pass an arbitrary int/float value to use for encoding unseen levels. return_type: Literal['float32', 'float64'] What type to cast return column as, consider exploring float32 to save memory. Defaults to float32. drop_original: bool controls whether original columns are dropped after encoded columns created. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ self.weights_column = weights_column self.prior = prior self.unseen_level_handling = unseen_level_handling self.return_type = return_type self.drop_original = drop_original self.MULTI_LEVEL = False if level == "all" or (isinstance(level, list)): self.MULTI_LEVEL = True # if working with single level, put into list for easier handling elif isinstance(level, (str, int, float)): level = [level] self.MULTI_LEVEL = True self.level = level BaseTransformer.__init__(self, columns=columns, **kwargs)
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> import polars as pl >>> transformer = MeanResponseTransformer(columns=["a"]) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [0, 1]}) >>> _ = transformer.fit(test_df[["a"]], test_df["b"]) >>> transformer.to_json() {'tubular_version': ..., 'classname': 'MeanResponseTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'weights_column': None, 'prior': 0, 'level': None, 'unseen_level_handling': None, 'return_type': 'Float32', 'drop_original': True}, 'fit': {'is_fitted_': True, 'mappings': {'a': {'x': 0.0, 'y': 1.0}}, 'return_dtypes': {'a': 'Float32'}, 'column_to_encoded_columns': {'a': ['a']}, 'encoded_columns': ['a']}} ``` """ self.check_is_fitted( [ "mappings", "return_dtypes", "column_to_encoded_columns", "encoded_columns", ], ) json_dict = super().to_json() json_dict["init"].update( { "weights_column": self.weights_column, "prior": self.prior, "level": self.level, "unseen_level_handling": self.unseen_level_handling, "return_type": self.return_type, "drop_original": self.drop_original, }, ) # make sure mappings dict is sorted for consistent repr mappings = { key: { value: self.mappings[key][value] for value in sorted(self.mappings[key]) } for key in sorted(self.mappings) } json_dict["fit"].update( { "mappings": mappings, "return_dtypes": self.return_dtypes, "column_to_encoded_columns": self.column_to_encoded_columns, "encoded_columns": self.encoded_columns, }, ) if self.unseen_level_handling: self.check_is_fitted(["unseen_levels_encoding_dict"]) json_dict["fit"]["unseen_levels_encoding_dict"] = ( self.unseen_levels_encoding_dict ) return json_dict
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> import polars as pl >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... unseen_level_handling="mean", ... ) >>> transformer.get_feature_names_out() ['a'] >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... level=["x", "y"], ... unseen_level_handling="mean", ... ) >>> transformer.get_feature_names_out() ['a_x', 'a_y'] >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... level="all", ... unseen_level_handling="mean", ... ) >>> transformer.get_feature_names_out() Traceback (most recent call last): ... sklearn.exceptions.NotFittedError: ... >>> test_df = pl.DataFrame({"a": ["x", "y", "x"], "b": ["cat", "dog", "rat"]}) >>> _ = transformer.fit(test_df, test_df["b"]) >>> transformer.get_feature_names_out() ['a_cat', 'a_dog', 'a_rat'] ``` """ # if level is specified as 'all', this function # depends on fit having been called if self.level == "all": self.check_is_fitted("encoded_columns") return self.encoded_columns return ( self.columns if not self.MULTI_LEVEL else [ column + "_" + str(level) for column in self.columns for level in self.level ] )
@block_from_json def _prior_regularisation( self, global_means: dict[str, float], groups: dict[str, nw.DataFrame], ) -> dict[str, nw.Expr]: """Regularise encoding values by pushing encodings of infrequent categories towards the global mean. If prior is zero this will return target_means unaltered. The formula used is: (weight*value + prior*global_mean)/(weight + prior) Parameters ---------- global_means: dict[str, float] dictionary of global means per binary target groups: dict[str, nw.DataFrame] dict of grouped dataframes per input column Returns ------- prior_exprs: dict[str, nw.Expr] dictionary of format col:prior expression for col # this private method is not intended to be used outside # of the fit process, so not including examples """ exprs_dict = { encoded_column + "_mapped": ( ( nw.col( f"{self.encoded_columns_to_response_columns[encoded_column]}_weighted_sum" ) + ( global_means[ self.encoded_columns_to_response_columns[encoded_column] ] * nw.lit(self.prior) ) ) / (nw.col("weight_sum") + nw.lit(self.prior)).cast( getattr(nw, self.return_type), ) ).alias(encoded_column + "_mapped") for encoded_column in self.encoded_columns } return { encoded_column: _collect_frame( groups[self.encoded_columns_to_columns[encoded_column]].select( exprs_dict[encoded_column + "_mapped"], nw.col(self.encoded_columns_to_columns[encoded_column]), ) ) for encoded_column in self.encoded_columns } @block_from_json def _setup_fit_multi_level( self, y_vals: list[int | float], response_column: str, ) -> None: """Set attrs needed for fit, for multi level case. Parameters ---------- y_vals: list[Union[int, float]] y values present in data response_column: str name of response column # this private method is not intended to be used outside # of the fit process, so not including examples Raises ------ ValueError: if user provided levels are not present in y """ self.response_levels = self.level if self.level == "all": self.response_levels = y_vals elif any(level not in y_vals for level in self.level): msg = "Levels contains a level to encode against that is not present in the response." raise ValueError(msg) self.column_to_encoded_columns = { c: [c + "_" + str(level) for level in self.response_levels] for c in self.columns } self.encoded_columns_to_response_columns = { c + "_" + str(level): response_column + "_" + str(level) for c in self.columns for level in self.response_levels } self.response_columns = [ response_column + "_" + level for level in self.response_levels ] def _check_for_failed_fit(self) -> None: """Check if fit failed to find needed attrs. Occurs if mapping values or unseen_level_handling_dict values are null unexpectedly. Raises ------ ValueError: if mapping values or unseen_level_handling_dict values have come out as None unexpectedly """ failed_columns = [] for col in self.encoded_columns: if any(_is_null(value) for value in self.mappings[col].values()): failed_columns.append(col) break if self.unseen_level_handling and _is_null( self.unseen_levels_encoding_dict[col] ): failed_columns.append(col) break if failed_columns: msg = f"fit has failed for columns {failed_columns}, it is possible that all rows are invalid - check for null/negative weights, all null columns, or other invalid conditions listed in the docstring" raise ValueError(msg) @block_from_json def _setup_fit_single_level(self, response_column: str) -> None: """Set attrs needed for fit, for non-multi level case. Parameters ---------- response_column: str name of response column # this private method is not intended to be used outside # of the fit process, so not including examples """ # arbitrary len 1 iterable so logic can be shared with multi level self.response_levels = ["SINGLE_LEVEL"] self.column_to_encoded_columns = {c: [c] for c in self.columns} self.encoded_columns_to_response_columns = dict.fromkeys( self.columns, response_column, ) self.response_columns = [ response_column, ]
[docs] @block_from_json @beartype def fit(self, X: DataFrame, y: Series | LazyFrame) -> MeanResponseTransformer: # noqa:PLR0914, will simplify in future issue """Identify mapping of categorical levels to mean response values. If the user specified the weights_column arg in when initialising the transformer the weighted mean response will be calculated using that column. In the multi-level case this method learns which response levels are present and are to be encoded against. Parameters ---------- X : DataFrame Data to with catgeorical variable columns to transform and also containing response_column column. y : Series or LazyFrame Response variable or target. Returns ------- MeanResponseTransformer: fitted class instance Raises ------ ValueError: if y contains null values Examples -------- ```pycon >>> import polars as pl >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... unseen_level_handling="mean", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]}) >>> transformer.fit(test_df, test_df["target"]) MeanResponseTransformer(columns=['a'], prior=1, unseen_level_handling='mean') ``` """ X = _convert_dataframe_to_narwhals(X) y = _convert_series_to_narwhals(y) # Collect lazy y to enable operations like .unique().to_list() y = _collect_series(y) BaseTransformer.fit(self, X, y) self.mappings = {} self.unseen_levels_encoding_dict = {} weights_column = self.weights_column if self.weights_column is None: X, weights_column = WeightColumnMixin._create_unit_weights_column( X, return_native=False, verbose=self.verbose, ) WeightColumnMixin.check_weights_column(self, X, weights_column) valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr( weights_column, self.verbose ) y_vals = y.unique().to_list() if (response_null_count := y.is_null().sum()) > 0: msg = f"{self.classname()}: y has {response_null_count} null values" raise ValueError(msg) X_y = self._combine_X_y(X, y, return_native_override=False) response_column = "_temporary_response" X_y = X_y.filter(valid_weights_filter_expr) if self.MULTI_LEVEL: self._setup_fit_multi_level(y_vals, response_column) else: self._setup_fit_single_level(response_column) self.encoded_columns_to_columns = { encoded_column: c for c in self.columns for encoded_column in self.column_to_encoded_columns[c] } self.encoded_columns = [ encoded_column for c in self.columns for encoded_column in self.column_to_encoded_columns[c] ] self.encoded_columns.sort() # start by creating new columns as clones encoded_column_exprs = { encoded_column: nw.col( self.encoded_columns_to_columns[encoded_column], ).alias(encoded_column) for encoded_column in self.encoded_columns } # then setup binary response expressions for each level response_exprs = { response_column + "_" + level if self.MULTI_LEVEL else response_column: ( nw.col(response_column) == level ) if self.MULTI_LEVEL else nw.col(response_column) for level in self.response_levels } weighted_response_exprs = { "weighted_" + response_column: response_exprs[response_column] * nw.col(weights_column).alias("weighted_" + response_column) for response_column in self.response_columns } all_response_exprs = {} all_response_exprs.update(response_exprs) all_response_exprs.update(weighted_response_exprs) # materialise these for global mean # calculations to work with X_y = X_y.with_columns(**all_response_exprs) global_means = {} global_mean_exprs = _get_mean_calculation_expressions( self.response_columns, weights_column, ) global_means = _collect_frame(X_y.select(**global_mean_exprs)).to_dict( as_series=False ) global_means = { response_column: global_means[response_column][0] for response_column in self.response_columns } # now get the weighted response per group aggs = { c: [ nw.col(weights_column).sum().alias("weight_sum"), *[ nw.col("weighted_" + binary_response_column) .sum() .alias(f"{binary_response_column}_weighted_sum") for binary_response_column in self.response_columns ], ] for c in self.columns } groups = {c: X_y.group_by(c).agg(aggs[c]) for c in self.columns} # the previous two then make up the inputs for our encoding algorithm prior_encodings = self._prior_regularisation( global_means, groups, ) results_dict = { c: prior_encodings[c].to_dict(as_series=False) for c in prior_encodings } self.mappings.update( { encoded_column: dict( zip( results_dict[encoded_column][ self.encoded_columns_to_columns[encoded_column] ], results_dict[encoded_column][encoded_column + "_mapped"], strict=False, ), ) for encoded_column in self.encoded_columns }, ) # set this attr up for BaseMappingTransformerMixin # this is used to cast the narwhals mapping df, so uses narwhals types self.return_dtypes = dict.fromkeys(self.encoded_columns, self.return_type) # use BaseMappingTransformer init to process args # extract null_mappings from mappings etc base_mapping_transformer = BaseMappingTransformer( mappings=self.mappings, return_dtypes=self.return_dtypes, ) self.mappings = base_mapping_transformer.mappings self.mappings_from_null = base_mapping_transformer.mappings_from_null self.return_dtypes = base_mapping_transformer.return_dtypes self._fit_unseen_level_handling_dict(X_y, encoded_column_exprs, weights_column) self._check_for_failed_fit() self.is_fitted_ = True return self
@beartype @block_from_json def _fit_unseen_level_handling_dict( self, X_y: DataFrame, encoded_column_exprs: dict[str, nw.Expr], weights_column: str, ) -> None: """Learn values for unseen levels to be mapped to. Potential cases depend on unseen_level_handling attr: - if int/float value has been provided, this will cast to the appropriate type and be directly used - if median/mean/min/max, the appropriate weighted statistic is calculated on the mapped data, and cast to the appropriate type Parameters ---------- X_y : DataFrame Data to with categorical variable columns to transform and also containing response_column column. encoded_column_exprs: dict[str, nw.Expr] dict of format str: expression for creating initial encoded columns. Needed for Median unseen level option which requires intermediate materialisations. weights_column : str name of weights column # this private method is not intended to be used outside # of the fit process, so not including examples """ if isinstance(self.unseen_level_handling, (int, float)): self.unseen_levels_encoding_dict.update( dict.fromkeys(self.encoded_columns, self.unseen_level_handling) ) elif isinstance(self.unseen_level_handling, str): unseen_level_exprs = {} mapping_expressions = { encoded_col: nw.col(col) .alias(encoded_col) .replace_strict( self.mappings[encoded_col], return_dtype=getattr(nw, self.return_dtypes[encoded_col]), ) for col in self.columns for encoded_col in self.column_to_encoded_columns[col] } if self.unseen_level_handling in {"mean", "median"}: if self.unseen_level_handling == "mean": # have to call this many times as weights column varies with c unseen_level_exprs.update( _get_mean_calculation_expressions( self.encoded_columns, weights_column, initial_columns_exprs=mapping_expressions, ), ) # else, median else: for c in self.encoded_columns: null_filter_expr = ~nw.col( self.encoded_columns_to_columns[c] ).is_null() X_temp = ( X_y.with_columns(**encoded_column_exprs) .filter(null_filter_expr) .sort(c) ) median_expr = _get_median_calculation_expression( values_column=self.encoded_columns_to_response_columns[c], weights_column=weights_column, ) self.unseen_levels_encoding_dict[c] = _collect_frame( X_temp.select(median_expr) ).item(0, 0) # else, min/max else: unseen_level_exprs.update( { c: getattr(mapping_expressions[c], self.unseen_level_handling)() for c in self.encoded_columns }, ) # median will already have fit as it requires sorting/materialising if self.unseen_level_handling != "median": unseen_level_results = _collect_frame( X_y.select(**unseen_level_exprs) ).to_dict( as_series=True, ) self.unseen_levels_encoding_dict = { c: unseen_level_results[c].item(0) for c in self.encoded_columns }
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Apply mean response encoding stored in the mappings attribute to columns. Parameters ---------- X : DataFrame Data with nominal columns to transform. Returns ------- X : DataFrame Transformed input X with levels mapped according to mappings dict. Examples -------- ```pycon >>> import polars as pl >>> # example with no prior >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=0, ... unseen_level_handling="mean", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]}) >>> _ = transformer.fit(test_df, test_df["target"]) >>> transformer.transform(test_df) shape: (2, 3) ┌─────┬─────┬────────┐ │ a ┆ b ┆ target │ │ --- ┆ --- ┆ --- │ │ f32 ┆ i64 ┆ i64 │ ╞═════╪═════╪════════╡ │ 0.0 ┆ 1 ┆ 0 │ │ 1.0 ┆ 2 ┆ 1 │ └─────┴─────┴────────┘ # example with prior >>> transformer = MeanResponseTransformer( ... columns="a", ... prior=1, ... unseen_level_handling="mean", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2], "target": [0, 1]}) >>> _ = transformer.fit(test_df, test_df["target"]) >>> transformer.transform(test_df) shape: (2, 3) ┌──────┬─────┬────────┐ │ a ┆ b ┆ target │ │ --- ┆ --- ┆ --- │ │ f32 ┆ i64 ┆ i64 │ ╞══════╪═════╪════════╡ │ 0.25 ┆ 1 ┆ 0 │ │ 0.75 ┆ 2 ┆ 1 │ └──────┴─────┴────────┘ ``` """ self.check_is_fitted( [ "mappings", "return_dtypes", "column_to_encoded_columns", "encoded_columns", ], ) X = _convert_dataframe_to_narwhals(X) X = super().transform( X, return_native_override=False, ) transform_expressions = { encoded_col: nw.col(col) .alias(encoded_col) .replace_strict( self.mappings[encoded_col], default=self.unseen_levels_encoding_dict[encoded_col] if self.unseen_level_handling else None, ) .cast(getattr(nw, self.return_dtypes[encoded_col])) for col in self.columns for encoded_col in self.column_to_encoded_columns[col] } X = ( X.with_columns( **transform_expressions, ) if transform_expressions else X ) columns_to_drop = [ col for col in self.columns if col not in self.encoded_columns ] X = DropOriginalMixin.drop_original_column( X, self.drop_original, columns_to_drop, return_native=False, ) return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] @register class OneHotEncodingTransformer( DropOriginalMixin, BaseTransformer, ): """Transformer to convert categorical variables into dummy columns. Attributes ---------- separator : str Separator used in naming for dummy columns. drop_original : bool Should original columns be dropped after creating dummy fields? built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer( ... columns="a", ... ) >>> transformer OneHotEncodingTransformer(columns=['a']) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]}) >>> _ = transformer.fit(test_df) >>> # transformer can also be dumped to json and reinitialised >>> json_dump = transformer.to_json() >>> json_dump {'tubular_version': ..., 'classname': 'OneHotEncodingTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'wanted_values': None, 'separator': '_', 'drop_original': False}, 'fit': {'is_fitted_': True, 'categories_': {'a': ['x', 'y']}, 'new_feature_names_': {'a': ['a_x', 'a_y']}}} >>> OneHotEncodingTransformer.from_json(json_dump) OneHotEncodingTransformer(columns=['a']) ``` """ polars_compatible = True lazyframe_compatible = True jsonable = True FITS = True MAX_LEVELS = 100 @beartype def __init__( self, columns: str | ListOfStrs | None = None, wanted_values: dict[str, ListOfStrs] | None = None, separator: str = "_", drop_original: bool = False, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : str or list of strings or None, default = None Names of columns to transform. If the default of None is supplied all object and category columns in X are used. wanted_values: dict[str, list[str] or None , default = None Optional parameter to select specific column levels to be transformed. If it is None, all levels in the categorical column will be encoded. It will take the format {col1: [level_1, level_2, ...]}. separator : str Used to create dummy column names, the name will take the format [categorical feature][separator][category level] drop_original : bool, default = False Should original columns be dropped after creating dummy fields? **kwargs Arbitrary keyword arguments passed onto sklearn OneHotEncoder.init method. Raises ------ ValueError: if keys of wanted_values arg are not in columns arg """ BaseTransformer.__init__( self, columns=columns, **kwargs, ) if wanted_values and set(wanted_values.keys()) != set(self.columns): msg = f"{self.classname()}: keys of wanted values should match provided columns" raise ValueError(msg) self.wanted_values = wanted_values self.drop_original = drop_original self.separator = separator
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer(columns=["a"]) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": ["w", "z"]}) >>> _ = transformer.fit(test_df) >>> # version will vary for local vs CI, so use ... as generic match >>> transformer.to_json() {'tubular_version': ..., 'classname': 'OneHotEncodingTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'wanted_values': None, 'separator': '_', 'drop_original': False}, 'fit': {'is_fitted_': True, 'categories_': {'a': ['x', 'y']}, 'new_feature_names_': {'a': ['a_x', 'a_y']}}} ``` """ self.check_is_fitted(["categories_", "new_feature_names_"]) json_dict = super().to_json() json_dict["init"].update( { "wanted_values": self.wanted_values, "separator": self.separator, "drop_original": self.drop_original, }, ) json_dict["fit"].update( { "categories_": self.categories_, "new_feature_names_": self.new_feature_names_, }, ) return json_dict
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer( ... columns="a", ... wanted_values={"a": ["cat", "dog"]}, ... ) >>> transformer.get_feature_names_out() ['a_cat', 'a_dog'] >>> transformer = OneHotEncodingTransformer( ... columns="a", ... ) >>> transformer.get_feature_names_out() Traceback (most recent call last): ... sklearn.exceptions.NotFittedError: ... >>> test_df = pl.DataFrame({"a": ["cat", "dog", "rat"]}) >>> _ = transformer.fit(test_df) >>> transformer.get_feature_names_out() ['a_cat', 'a_dog', 'a_rat'] ``` """ # if wanted values is not provided, this function # depends on fit having been called if not self.wanted_values: self.check_is_fitted("categories_") return [ output_column for column in self.columns for output_column in self._get_feature_names(column) ] return [ column + self.separator + str(level) for column in self.columns for level in self.wanted_values[column] ]
[docs] @block_from_json @beartype def fit( self, X: DataFrame, y: Series | LazyFrame | None = None, ) -> OneHotEncodingTransformer: """Get list of levels for each column to be transformed. This defines which dummy columns will be created in transform. Parameters ---------- X : DataFrame Data to identify levels from. y : None Ignored. This parameter exists only for compatibility with sklearn.pipeline.Pipeline. Returns ------- OneHotEncodingTransformer: fitted class instance Raises ------ ValueError: if column has >100 levels Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer( ... columns="a", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]}) >>> transformer.fit(test_df) OneHotEncodingTransformer(columns=['a']) ``` """ X = _convert_dataframe_to_narwhals(X) y = _convert_series_to_narwhals(y) BaseTransformer.fit(self, X=X, y=y) self.categories_ = {} self.new_feature_names_ = {} results = X.select(nw.col(c) for c in self.columns) results = _collect_frame(results) results_dict = results.to_dict() # Check each field has less than 100 categories/levels for c in self.columns: results_list = results_dict[c].unique().to_list() non_null_results_list = [val for val in results_list if not _is_null(val)] if self.verbose and len(non_null_results_list) < len(results_list): warnings.warn( f"{self.classname()}: Column {c} contains null values which will be ignored during fitting", stacklevel=2, ) # if the user has provided a 'wanted_values' as a list of expected dummies, # then there is actually nothing we need to fit on data here self.categories_[c] = ( sorted(category for category in non_null_results_list) if not self.wanted_values else self.wanted_values[c] ) level_count = len(self.categories_[c]) if level_count > self.MAX_LEVELS: raise ValueError( f"{self.classname()}: column %s has over {self.MAX_LEVELS} unique values - consider another type of encoding" % c, ) self.new_feature_names_[c] = self._get_feature_names(column=c) self.is_fitted_ = True return self
@beartype def _get_feature_names( self, column: str, ) -> list[str]: """Get list of features that will be output by transformer. Parameters ---------- column: str column to get dummy feature names for Returns ------- list[str]: list of output features Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer( ... columns="a", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]}) >>> _ = transformer.fit(test_df) >>> transformer._get_feature_names("a") ['a_x', 'a_y'] ``` """ return [ column + self.separator + str(level) for level in self.categories_[column] ]
[docs] @beartype def transform( self, X: DataFrame, return_native_override: bool | None = None, ) -> DataFrame: """Create new dummy columns from categorical fields. Parameters ---------- X : DataFrame Data to apply one hot encoding to. return_native_override: bool controls whether transformer returns narwhals or native type. return_native_override: Optional[bool] option to override return_native attr in transformer, useful when calling parent methods Returns ------- X_transformed : DataFrame Transformed input X with dummy columns derived from categorical columns added. If drop_original = True then the original categorical columns that the dummies are created from will not be in the output X. Examples -------- ```pycon >>> import polars as pl >>> transformer = OneHotEncodingTransformer( ... columns="a", ... ) >>> test_df = pl.DataFrame({"a": ["x", "y"], "b": [1, 2]}) >>> _ = transformer.fit(test_df) >>> transformer.transform(test_df) shape: (2, 4) ┌─────┬─────┬───────┬───────┐ │ a ┆ b ┆ a_x ┆ a_y │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ bool ┆ bool │ ╞═════╪═════╪═══════╪═══════╡ │ x ┆ 1 ┆ true ┆ false │ │ y ┆ 2 ┆ false ┆ true │ └─────┴─────┴───────┴───────┘ ``` """ return_native = self._process_return_native(return_native_override) # Check that transformer has been fit before calling transform self.check_is_fitted(["categories_", "new_feature_names_"]) X = _convert_dataframe_to_narwhals(X) X = BaseTransformer.transform(self, X, return_native_override=False) transform_expressions = {} for c in self.columns: for level in self.categories_[c]: if c + self.separator + str(level) in self.new_feature_names_[c]: transform_expressions[c + self.separator + str(level)] = ( nw.col(c) == level ) # make column order consistent sorted_keys = sorted(transform_expressions.keys()) X = ( X.with_columns(**{key: transform_expressions[key] for key in sorted_keys}) if transform_expressions else X ) # Drop original columns if self.drop_original is True X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) return _return_narwhals_or_native_dataframe(X, return_native)
# DEPRECATED TRANSFORMERS
[docs] @deprecated( """This transformer has not been selected for conversion to polars/narwhals, and so has been deprecated. If it is useful to you, please raise an issue for it to be modernised """, ) class OrdinalEncoderTransformer( BaseMappingTransformMixin, WeightColumnMixin, ): """Encode categorical variables into ascending rank-ordered integer values variables. Maps levels to the target-mean response for that level. Values will be sorted in ascending order only i.e. categorical level with lowest target mean response to be encoded as 1, the next highest value as 2 and so on. If a categorical variable contains null values these will not be transformed. Attributes ---------- weights_column : str or None Weights column to use when calculating the mean response. mappings : dict Created in fit. Dict of key (column names) value (mapping of categorical levels to numeric, ordinal encoded response values) pairs. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes deprecated: bool indicates if class has been deprecated """ polars_compatible = False lazyframe_compatible = False jsonable = False FITS = True deprecated = True @beartype def __init__( self, columns: str | list[str], weights_column: str | None = None, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : None or str or list, default = None Columns to transform, if the default of None is supplied all object and category columns in X are used. weights_column : str or None Weights column to use when calculating the mean response. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ self.weights_column = weights_column BaseTransformer.__init__(self, columns=columns, **kwargs) # this transformer shouldn't really be used with huge numbers of levels # so setup to use int8 type # if there are more levels than this, will get a type error self.return_dtypes = dict.fromkeys(self.columns, "Int8") def _check_for_failed_fit(self) -> None: """Check if fit failed to find needed attrs. Occurs if mapping values are null unexpectedly. Raises ------ ValueError: if mapping values have come out as None unexpectedly """ failed_columns = [] for col in self.columns: if len(self.mappings[col]) == 0: failed_columns.append(col) break if failed_columns: msg = f"fit has failed for columns {failed_columns}, it is possible that all rows are invalid - check for null/negative weights, all null columns, or other invalid conditions listed in the docstring" raise ValueError(msg)
[docs] @beartype def fit(self, X: DataFrame, y: Series) -> OrdinalEncoderTransformer: """Identify mapping of categorical levels to rank-ordered integer values by target-mean in ascending order. If the user specified the weights_column arg in when initialising the transformer the weighted mean response will be calculated using that column. Parameters ---------- X : DataFrame Data to with catgeorical variable columns to transform and response_column column specified when object was initialised. y : Series or LazyFrame Response column or target. Returns ------- OrdinalEncoderTransformer: fitted class instance Raises ------ ValueError: if y contains nulls """ X = _convert_dataframe_to_narwhals(X) y = _convert_series_to_narwhals(y) BaseTransformer.fit(self, X, y) self.mappings = {} weights_column = self.weights_column if self.weights_column is None: X, weights_column = WeightColumnMixin._create_unit_weights_column( X, return_native=False, verbose=self.verbose, ) WeightColumnMixin.check_weights_column(self, X, weights_column) valid_weights_filter_expr = WeightColumnMixin.get_valid_weights_filter_expr( weights_column, self.verbose ) if (response_null_count := y.is_null().sum()) > 0: msg = f"{self.classname()}: y has {response_null_count} null values" raise ValueError(msg) X_y = self._combine_X_y(X, y, return_native_override=False) response_column = "_temporary_response" X_y = X_y.filter(valid_weights_filter_expr) # the need to sort for each c limits the optimisation we can do here, # as it is still necessarily to materialise for each column for c in self.columns: groupby_sum = X_y.group_by([c]).agg( nw.col(response_column).sum(), nw.col(weights_column).sum() ) # get the indexes of the sorted target mean-encoded dict encodings = ( groupby_sum.select( (nw.col(response_column) / nw.col(weights_column)).alias( "encodings" ), nw.col(c), ) .sort(by="encodings", descending=False) .to_dict() ) # create a dictionary whose keys are the levels of the categorical variable # sorted ascending by their target-mean value # and whose values are ascending ordinal integers ordinal_encoded_dict = { encodings[c][k]: k + 1 for k in range(len(encodings[c])) } self.mappings[c] = ordinal_encoded_dict for col in self.columns: # if more levels than int8 type can handle, then error if len(self.mappings[col]) > np.iinfo(np.int8).max: msg = f"{self.classname()}: column {c} has too many levels to encode" raise ValueError( msg, ) # use BaseMappingTransformer init to process args # extract null_mappings from mappings etc base_mapping_transformer = BaseMappingTransformer( mappings=self.mappings, return_dtypes=self.return_dtypes, ) self.mappings = base_mapping_transformer.mappings self.mappings_from_null = base_mapping_transformer.mappings_from_null self.return_dtypes = base_mapping_transformer.return_dtypes self._check_for_failed_fit() return self
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Apply ordinal encoding stored in the mappings attribute to columns. This maps categorical levels to rank-ordered integer values by target-mean in ascending order. Parameters ---------- X : DataFrame Data to with catgeorical variable columns to transform. Returns ------- X : DataFrame Transformed data with levels mapped to ordinal encoded values for categorical variables. """ X = BaseTransformer.transform(self, X) return BaseMappingTransformMixin.transform(self, X)
[docs] @deprecated( """This transformer has not been selected for conversion to polars/narwhals, and so has been deprecated. If it is useful to you, please raise an issue for it to be modernised """, ) class NominalToIntegerTransformer(BaseMappingTransformMixin): """Transformer to convert columns containing nominal values into integer values. The nominal levels that are mapped to integers are not ordered in any way. Attributes ---------- start_encoding : int Value to start the encoding / mapping of nominal to integer from. mappings : dict Created in fit. A dict of key (column names) value (mappings between levels and integers for given column) pairs. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes deprecated: bool indicates if class has been deprecated """ polars_compatible = False lazyframe_compatible = False jsonable = False FITS = True deprecated = True def __init__( self, columns: str | list[str] | None = None, start_encoding: int = 0, **kwargs: dict[str, bool], ) -> None: """Initialise class instance. Parameters ---------- columns : None or str or list, default = None Columns to transform, if the default of None is supplied all object and category columns in X are used. start_encoding : int, default = 0 Value to start the encoding from e.g. if start_encoding = 0 then the encoding would be {'A': 0, 'B': 1, 'C': 3} etc.. or if start_encoding = 5 then the same encoding would be {'A': 5, 'B': 6, 'C': 7}. Can be positive or negative. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. Raises ------ ValueError: if `start_encoding` is not int """ BaseTransformer.__init__(self, columns=columns, **kwargs) # this transformer shouldn't really be used with huge numbers of levels # so setup to use int8 type # if there are more levels than this, will get a type error self.return_dtypes = dict.fromkeys(self.columns, "Int8") if not isinstance(start_encoding, int): msg = f"{self.classname()}: start_encoding should be an integer" raise ValueError(msg) self.start_encoding = start_encoding
[docs] def fit(self, X: pd.DataFrame, y: pd.Series | None = None) -> pd.DataFrame: """Create mapping between nominal levels and integer values for categorical variables. Parameters ---------- X : pd.DataFrame Data to fit the transformer on, this sets the nominal levels that can be mapped. y : None or pd.DataFrame or pd.Series, default = None Optional argument only required for the transformer to work with sklearn pipelines. Returns ------- NominalToIntegerTransformer: fitted class instance Raises ------ ValueError: if column has more levels than can be encoded as int8 """ BaseTransformer.fit(self, X, y) self.mappings = {} for c in self.columns: col_values = X[c].unique() self.mappings[c] = { k: i for i, k in enumerate(col_values, self.start_encoding) } # if more levels than int8 type can handle, then error if len(self.mappings[c]) > np.iinfo(np.int8).max: msg = f"{self.classname()}: column {c} has too many levels to encode" raise ValueError( msg, ) # use BaseMappingTransformer init to process args # extract null_mappings from mappings etc base_mapping_transformer = BaseMappingTransformer( mappings=self.mappings, return_dtypes=self.return_dtypes, ) self.mappings = base_mapping_transformer.mappings self.mappings_from_null = base_mapping_transformer.mappings_from_null self.return_dtypes = base_mapping_transformer.return_dtypes return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Apply integer encoding stored in the mappings attribute to columns. Parameters ---------- X : pd.DataFrame Data with nominal columns to transform. Returns ------- X : pd.DataFrame Transformed input X with levels mapped according to mappings dict. """ X = BaseTransformer.transform(self, X) return BaseMappingTransformMixin.transform(self, X)