Source code for tubular.dates

"""Contains transformers for working with date columns."""

from __future__ import annotations

import copy
import datetime
import warnings
from enum import Enum
from typing import TYPE_CHECKING, Annotated, Any, ClassVar

import narwhals as nw
import numpy as np
import pandas as pd
from beartype import beartype
from beartype.vale import Is
from typing_extensions import deprecated

from tubular._utils import (
    _convert_dataframe_to_narwhals,
    _return_narwhals_or_native_dataframe,
    block_from_json,
)
from tubular.base import BaseTransformer, register
from tubular.mixins import DropOriginalMixin
from tubular.types import (
    DataFrame,
    GenericKwargs,
    ListOfOneStr,
    ListOfThreeStrs,
    ListOfTwoStrs,
)

if TYPE_CHECKING:
    from narwhals.typing import FrameT

TIME_UNITS = ["us", "ns", "ms"]


[docs] @register class BaseGenericDateTransformer( DropOriginalMixin, BaseTransformer, ): """Extends BaseTransformer for datetime/date scenarios. Attributes: ---------- built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework return_native: bool, default = True Controls whether transformer returns narwhals or native pandas/polars type jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> BaseGenericDateTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) BaseGenericDateTransformer(columns=['a', 'b'], new_column_name='bla') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: list[str] | str, new_column_name: str, drop_original: bool = False, **kwargs: bool | None, ) -> None: """Initialise class instance. Parameters ---------- columns : Union[list[str], str] List of 2 columns. First column will be subtracted from second. new_column_name : str Name for the new year column. drop_original : bool Flag for whether to drop the original columns. return_native: bool, default = True Controls whether transformer returns narwhals or native pandas/polars type **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ super().__init__(columns=columns, **kwargs) self.drop_original = drop_original self.new_column_name = new_column_name self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> transformer = BaseGenericDateTransformer(columns=["a", "b"], new_column_name="bla") >>> transformer.to_json() {'tubular_version': ..., 'classname': 'BaseGenericDateTransformer', 'init': {'columns': ['a', 'b'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'bla', 'drop_original': False}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"]["new_column_name"] = self.new_column_name json_dict["init"]["drop_original"] = self.drop_original return json_dict
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> # base classes just return inputs >>> transformer = BaseGenericDateTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) >>> transformer.get_feature_names_out() ['a', 'b'] >>> # other classes return new columns >>> transformer = DateDifferenceTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) >>> transformer.get_feature_names_out() ['bla'] ``` """ # base classes just return columns, so need special handling return ( [*self.columns] if type(self) in { BaseGenericDateTransformer, BaseDatetimeTransformer, } else [self.new_column_name] )
[docs] @beartype def check_columns_are_date_or_datetime( self, X: DataFrame, datetime_only: bool, ) -> None: """Check types of provided columns. Columns must be datetime or date type, depending on the datetime_only flag. If a column does not meet the expected type criteria, a TypeError is raised. Parameters ---------- X: DataFrame Data to validate datetime_only: bool Indicates whether ONLY datetime types are accepted Raises ------ TypeError: if non date/datetime types are found TypeError: if mismatched date/datetime types are found, types should be consistent Examples -------- ```pycon >>> import polars as pl >>> transformer = BaseGenericDateTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.date(1993, 9, 27), datetime.date(2005, 10, 7)], ... "b": [datetime.date(1991, 5, 22), datetime.date(2001, 12, 10)], ... }, ... ) >>> transformer.check_columns_are_date_or_datetime(test_df, datetime_only=False) ``` """ X = _convert_dataframe_to_narwhals(X) type_msg = ["Datetime"] date_type = nw.Date allowed_types = [nw.Datetime] if not datetime_only: allowed_types = [*allowed_types, date_type] type_msg += ["Date"] schema = X.collect_schema() for col in self.columns: is_datetime = False is_date = False if isinstance(schema[col], nw.Datetime): is_datetime = True elif schema[col] == nw.Date: is_date = True # first check for invalid types (non date/datetime) if (not is_datetime) and (not (not datetime_only and is_date)): msg = f"{self.classname()}: {col} type should be in {type_msg} but got {schema[col]}. Note, Datetime columns should have time_unit in {TIME_UNITS} and time_zones from zoneinfo.available_timezones()" raise TypeError(msg) # process datetime types for more readable error messages present_types = { dtype if not isinstance(dtype, nw.Datetime) else nw.Datetime for name, dtype in schema.items() if name in self.columns } valid_types = present_types.issubset(set(allowed_types)) # convert to list and sort to ensure reproducible order present_types = {str(value) for value in present_types} present_types = list(present_types) present_types.sort() # next check for consistent types (all date or all datetime) if not valid_types or len(present_types) > 1: msg = rf"{self.classname()}: Columns fed to datetime transformers should be {type_msg} and have consistent types, but found {present_types}. Note, Datetime columns should have time_unit in {TIME_UNITS} and time_zones from zoneinfo.available_timezones(). Please use ToDatetimeTransformer to standardise." raise TypeError( msg, )
[docs] @beartype def transform( self, X: DataFrame, datetime_only: bool = False, return_native_override: bool | None = None, ) -> DataFrame: """Validate data pre transform. Parameters ---------- X : DataFrame Data containing self.columns datetime_only: bool Indicates whether ONLY datetime types are accepted return_native_override: Optional[bool] option to override return_native attr in transformer, useful when calling parent methods Returns ------- X : DataFrame Validated data Examples -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = BaseGenericDateTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.date(1993, 9, 27), datetime.date(2005, 10, 7)], ... "b": [datetime.date(1991, 5, 22), datetime.date(2001, 12, 10)], ... }, ... ) >>> # base transform has no effect on data >>> transformer.transform(test_df) shape: (2, 2) ┌────────────┬────────────┐ │ a ┆ b │ │ --- ┆ --- │ │ date ┆ date │ ╞════════════╪════════════╡ │ 1993-09-27 ┆ 1991-05-22 │ │ 2005-10-07 ┆ 2001-12-10 │ └────────────┴────────────┘ ``` """ return_native = self._process_return_native(return_native_override) X = super().transform(X, return_native_override=False) X = _convert_dataframe_to_narwhals(X) self.check_columns_are_date_or_datetime(X, datetime_only=datetime_only) return _return_narwhals_or_native_dataframe(X, return_native)
[docs] @register class BaseDatetimeTransformer(BaseGenericDateTransformer): """Extends BaseTransformer for datetime scenarios. Attributes: ---------- built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> BaseDatetimeTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) BaseDatetimeTransformer(columns=['a', 'b'], new_column_name='bla') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = False @beartype def __init__( self, columns: list[str] | str, new_column_name: str, drop_original: bool = False, **kwargs: bool | None, ) -> None: """Initialise class instance. Parameters ---------- columns : Union[list[str], str] List of 2 columns. First column will be subtracted from second. new_column_name : str Name for the new year column. drop_original : bool Flag for whether to drop the original columns. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ super().__init__( columns=columns, new_column_name=new_column_name, drop_original=drop_original, **kwargs, ) self.is_fitted_ = True # Does not fit
[docs] @beartype def transform( self, X: DataFrame, return_native_override: bool | None = None, ) -> DataFrame: """Check types of selected columns in provided data. Parameters ---------- X : DataFrame Data containing self.columns return_native_override: Optional[bool] option to override return_native attr in transformer, useful when calling parent methods Returns ------- X : DataFrame Validated data Example: -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = BaseDatetimeTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.datetime(1993, 9, 27), datetime.datetime(2005, 10, 7)], ... "b": [datetime.datetime(1991, 5, 22), datetime.datetime(2001, 12, 10)], ... }, ... ) >>> # base transform has no effect on data >>> transformer.transform(test_df) shape: (2, 2) ┌─────────────────────┬─────────────────────┐ │ a ┆ b │ │ --- ┆ --- │ │ datetime[μs] ┆ datetime[μs] │ ╞═════════════════════╪═════════════════════╡ │ 1993-09-27 00:00:00 ┆ 1991-05-22 00:00:00 │ │ 2005-10-07 00:00:00 ┆ 2001-12-10 00:00:00 │ └─────────────────────┴─────────────────────┘ ``` """ return_native = self._process_return_native(return_native_override) X = _convert_dataframe_to_narwhals(X) X = super().transform(X, datetime_only=True, return_native_override=False) return _return_narwhals_or_native_dataframe(X, return_native)
[docs] class DateDifferenceUnitsOptions(str, Enum): """Options for return units in DateDifferenceTransformer.""" __slots__ = () WEEK = "week" FORTNIGHT = "fortnight" LUNAR_MONTH = "lunar_month" COMMON_YEAR = "common_year" CUSTOM_DAYS = "custom_days" DAYS = "D" HOURS = "h" MINUTES = "m" SECONDS = "s"
DateDifferenceUnitsOptionsStr = Annotated[ str, Is[lambda s: s in DateDifferenceUnitsOptions._value2member_map_], ]
[docs] @register class DateDifferenceTransformer(BaseGenericDateTransformer): """Class to transform calculate the difference between 2 date fields in specified units. Attributes: ---------- built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> transformer = DateDifferenceTransformer( ... columns=["a", "b"], ... new_column_name="bla", ... units="common_year", ... ) >>> transformer DateDifferenceTransformer(columns=['a', 'b'], new_column_name='bla', units='common_year') >>> # transformer can also be dumped to json and reinitialised >>> json_dump = transformer.to_json() >>> json_dump {'tubular_version': ..., 'classname': 'DateDifferenceTransformer', 'init': {'columns': ['a', 'b'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'bla', 'drop_original': False, 'units': 'common_year', 'custom_days_divider': None}, 'fit': {'is_fitted_': True}} >>> DateDifferenceTransformer.from_json(json_dump) DateDifferenceTransformer(columns=['a', 'b'], new_column_name='bla', units='common_year') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: ListOfTwoStrs, new_column_name: str, units: DateDifferenceUnitsOptionsStr = "D", drop_original: bool = False, custom_days_divider: int | None = None, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : List[str] List of 2 columns. First column will be subtracted from second. new_column_name : str, default = None Name given to calculated datediff column. If None then {column_upper}_{column_lower}_datediff_{units} will be used. units : str, default = 'D' Accepted values are "week", "fortnight", "lunar_month", "common_year", "custom_days", 'D', 'h', 'm', 's' copy : bool, default = False Should X be copied prior to transform? Copy argument no longer used and will be deprecated in a future release verbose: bool, default = False Control level of detail in printouts drop_original: Boolean flag indicating whether to drop original columns. custom_days_divider: Integer value for the "custom_days" unit kwargs: arguments for base class, e.g. verbose """ self.units = units self.custom_days_divider = custom_days_divider super().__init__( columns=columns, new_column_name=new_column_name, drop_original=drop_original, **kwargs, ) # This attribute is not for use in any method, use 'columns' instead. # Here only as a fix to allow string representation of transformer. self.column_lower = columns[0] self.column_upper = columns[1] self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> transformer = DateDifferenceTransformer(columns=["a", "b"], new_column_name="a_diff_b") >>> # version will vary for local vs CI, so use ... as generic match >>> transformer.to_json() {'tubular_version': ..., 'classname': 'DateDifferenceTransformer', 'init': {'columns': ['a', 'b'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'a_diff_b', 'drop_original': False, 'units': 'D', 'custom_days_divider': None}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"].update( { "new_column_name": self.new_column_name, "units": self.units, "drop_original": self.drop_original, "custom_days_divider": self.custom_days_divider, }, ) return json_dict
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Calculate the difference between the given fields in the specified units. Parameters ---------- X : DataFrame Data containing self.columns Returns ------- DataFrame: dataframe with added date difference column Examples -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = DateDifferenceTransformer( ... columns=["a", "b"], ... new_column_name="a_b_difference_years", ... units="common_year", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.date(1993, 9, 27), datetime.date(2005, 10, 7)], ... "b": [datetime.date(1991, 5, 22), datetime.date(2001, 12, 10)], ... }, ... ) >>> transformer.transform(test_df) shape: (2, 3) ┌────────────┬────────────┬──────────────────────┐ │ a ┆ b ┆ a_b_difference_years │ │ --- ┆ --- ┆ --- │ │ date ┆ date ┆ f64 │ ╞════════════╪════════════╪══════════════════════╡ │ 1993-09-27 ┆ 1991-05-22 ┆ -2.353425 │ │ 2005-10-07 ┆ 2001-12-10 ┆ -3.827397 │ └────────────┴────────────┴──────────────────────┘ ``` """ X = _convert_dataframe_to_narwhals(X) X = super().transform(X, return_native_override=False) # mapping for units and corresponding timedelta arg values UNITS_TO_TIMEDELTA_PARAMS = { "week": (7, "D"), "fortnight": (14, "D"), "lunar_month": ( int(29.5 * 24), "h", ), # timedelta values need to be whole numbers so (29.5, 'D') cannot be used "common_year": (365, "D"), "D": (1, "D"), "h": (1, "h"), "m": (1, "m"), "s": (1, "s"), } # list of units that require time truncation UNITS_TO_TRUNCATE_TIME_FOR = [ "week", "fortnight", "lunar_month", "common_year", "custom_days", "D", ] start_date_col = nw.col(self.columns[0]) end_date_col = nw.col(self.columns[1]) # truncating time for specific units if self.units in UNITS_TO_TRUNCATE_TIME_FOR: start_date_col = start_date_col.dt.truncate("1d") end_date_col = end_date_col.dt.truncate("1d") if self.units == "custom_days": timedelta_value, timedelta_format = self.custom_days_divider, "D" denominator = np.timedelta64(timedelta_value, timedelta_format) else: timedelta_value, timedelta_format = UNITS_TO_TIMEDELTA_PARAMS[self.units] denominator = np.timedelta64(timedelta_value, timedelta_format) X = X.with_columns( ((end_date_col - start_date_col) / denominator).alias(self.new_column_name), ) # Drop original columns if self.drop_original is True X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] @register class ToDatetimeTransformer(BaseTransformer): """Class to transform convert specified columns to datetime. Class simply uses the pd.to_datetime method on the specified columns. Attributes: ---------- built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> transformer = ToDatetimeTransformer( ... columns="a", ... time_format="%d/%m/%Y", ... ) >>> transformer ToDatetimeTransformer(columns=['a'], time_format='%d/%m/%Y') >>> # version will vary for local vs CI, so use ... as generic match >>> json_dump = transformer.to_json() >>> json_dump {'tubular_version': ..., 'classname': 'ToDatetimeTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'time_format': '%d/%m/%Y'}, 'fit': {'is_fitted_': True}} ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: str | list[str], time_format: str | None = None, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : List[str] List of names of the column to convert to datetime. time_format: str str indicating format of time to parse, e.g. '%d/%m/%Y' **kwargs Arbitrary keyword arguments passed onto pd.to_datetime(). """ if not time_format: warnings.warn( "time_format arg has not been provided, so datetime format will be inferred", stacklevel=2, ) self.time_format = time_format super().__init__( columns=columns, **kwargs, ) self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> transformer = ToDatetimeTransformer(columns="a", time_format="%d/%m/%Y") >>> # version will vary for local vs CI, so use ... as generic match >>> transformer.to_json() {'tubular_version': ..., 'classname': 'ToDatetimeTransformer', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'time_format': '%d/%m/%Y'}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"].update( { "time_format": self.time_format, } ) return json_dict
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Convert specified column to datetime using pd.to_datetime. Parameters ---------- X : DataFrame Data with column to transform. Returns ------- DataFrame: dataframe with provided columns converted to datetime Examples -------- ```pycon >>> import polars as pl >>> transformer = ToDatetimeTransformer( ... columns="a", ... time_format="%d/%m/%Y", ... ) >>> test_df = pl.DataFrame({"a": ["01/02/2020", "10/12/1996"], "b": [1, 2]}) >>> transformer.transform(test_df) shape: (2, 2) ┌─────────────────────┬─────┐ │ a ┆ b │ │ --- ┆ --- │ │ datetime[μs] ┆ i64 │ ╞═════════════════════╪═════╡ │ 2020-02-01 00:00:00 ┆ 1 │ │ 1996-12-10 00:00:00 ┆ 2 │ └─────────────────────┴─────┘ ``` """ X = _convert_dataframe_to_narwhals(X) X = super().transform(X, return_native_override=False) X = X.with_columns( nw.col(col).str.to_datetime(format=self.time_format) for col in self.columns ) return _return_narwhals_or_native_dataframe(X, return_native=self.return_native)
[docs] @register class BetweenDatesTransformer(BaseGenericDateTransformer): """Transformer to generate a boolean column indicating if one date is between two others. If any row has column_lower greater than column_upper, the output column for that row will be null instead of raising a warning. Attributes: ---------- built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform column_lower : str Name of date column to subtract. This attribute is not for use in any method, use 'columns' instead. Here only as a fix to allow string representation of transformer. column_upper : str Name of date column to subtract from. This attribute is not for use in any method, use 'columns instead. Here only as a fix to allow string representation of transformer. column_between : str Name of column to check if it's values fall between column_lower and column_upper. This attribute is not for use in any method, use 'columns instead. Here only as a fix to allow string representation of transformer. columns : list Contains the names of the columns to compare in the order [column_lower, column_between column_upper]. new_column_name : str new_column_name argument passed when initialising the transformer. lower_inclusive : bool lower_inclusive argument passed when initialising the transformer. upper_inclusive : bool upper_inclusive argument passed when initialising the transformer. drop_original: bool indicates whether to drop original columns. polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> BetweenDatesTransformer( ... columns=["a", "b", "c"], ... new_column_name="b_between_a_c", ... lower_inclusive=True, ... upper_inclusive=True, ... ) BetweenDatesTransformer(columns=['a', 'b', 'c'], new_column_name='b_between_a_c') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: ListOfThreeStrs, new_column_name: str, drop_original: bool = False, lower_inclusive: bool = True, upper_inclusive: bool = True, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : list[str] List of columns for comparison, in format [lower, to_compare, upper] new_column_name : str Name for new column to be added to X. drop_original: bool indicates whether to drop original columns. lower_inclusive : bool, default = True If lower_inclusive is True the comparison to column_lower will be column_lower <= column_between, otherwise the comparison will be column_lower < column_between. upper_inclusive : bool, default = True If upper_inclusive is True the comparison to column_upper will be column_between <= column_upper, otherwise the comparison will be column_between < column_upper. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.__init__(). """ self.lower_inclusive = lower_inclusive self.upper_inclusive = upper_inclusive super().__init__( columns=columns, new_column_name=new_column_name, drop_original=drop_original, **kwargs, ) # This attribute is not for use in any method, use 'columns' instead. # Here only as a fix to allow string representation of transformer. self.column_lower = columns[0] self.column_upper = columns[2] self.column_between = columns[2] self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> transformer = BetweenDatesTransformer( ... columns=["a", "b", "c"], ... new_column_name="b_between_a_c", ... lower_inclusive=True, ... upper_inclusive=False, ... ) >>> transformer.to_json() {'tubular_version': ..., 'classname': 'BetweenDatesTransformer', 'init': {'columns': ['a', 'b', 'c'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'b_between_a_c', 'drop_original': False, 'lower_inclusive': True, 'upper_inclusive': False}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"].update( { "lower_inclusive": self.lower_inclusive, "upper_inclusive": self.upper_inclusive, }, ) return json_dict
[docs] @nw.narwhalify def transform(self, X: FrameT) -> FrameT: """Transform - creates column indicating if middle date is between the other two. Rows where the lower bound is greater than the upper bound will produce null in the resulting output column for that row. Parameters ---------- X : pd/pl/nw.DataFrame Data to transform. Returns ------- X : pd/pl/nw.DataFrame Input X with additional column (self.new_column_name) added. This column is boolean and indicates if the middle column is between the other 2. Example: -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = BetweenDatesTransformer( ... columns=["a", "b", "c"], ... new_column_name="b_between_a_c", ... lower_inclusive=True, ... upper_inclusive=True, ... ) >>> test_df = pl.DataFrame( ... { ... "a": [ ... datetime.date(1990, 9, 27), ... datetime.date(2005, 10, 7), ... datetime.date(2010, 1, 1), ... ], ... "b": [ ... datetime.date(1991, 5, 22), ... datetime.date(2001, 12, 10), ... datetime.date(2009, 1, 1), ... ], ... "c": [ ... datetime.date(1993, 4, 20), ... datetime.date(2007, 11, 8), ... datetime.date(2008, 1, 1), ... ], ... }, ... ) >>> transformer.transform(test_df) shape: (3, 4) ┌────────────┬────────────┬────────────┬───────────────┐ │ a ┆ b ┆ c ┆ b_between_a_c │ │ --- ┆ --- ┆ --- ┆ --- │ │ date ┆ date ┆ date ┆ bool │ ╞════════════╪════════════╪════════════╪═══════════════╡ │ 1990-09-27 ┆ 1991-05-22 ┆ 1993-04-20 ┆ true │ │ 2005-10-07 ┆ 2001-12-10 ┆ 2007-11-08 ┆ false │ │ 2010-01-01 ┆ 2009-01-01 ┆ 2008-01-01 ┆ null │ └────────────┴────────────┴────────────┴───────────────┘ ``` """ X = nw.from_native(super().transform(X)) lower_comparison = ( nw.col(self.columns[0]) <= nw.col(self.columns[1]) if self.lower_inclusive else nw.col(self.columns[0]) < nw.col(self.columns[1]) ) upper_comparison = ( nw.col(self.columns[1]) <= nw.col(self.columns[2]) if self.upper_inclusive else nw.col(self.columns[1]) < nw.col(self.columns[2]) ) X = X.with_columns( nw.when(nw.col(self.columns[0]) > nw.col(self.columns[2])) .then(None) .otherwise(lower_comparison & upper_comparison) .cast(nw.Boolean) .alias(self.new_column_name), ) # Drop original columns if self.drop_original is True return DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, )
[docs] class DatetimeInfoOptions(str, Enum): """Options for what is returned by DatetimeInfoExtractor.""" __slots__ = () TIME_OF_DAY = "timeofday" TIME_OF_MONTH = "timeofmonth" TIME_OF_YEAR = "timeofyear" DAY_OF_WEEK = "dayofweek"
DatetimeInfoOptionStr = Annotated[ str, Is[lambda s: s in DatetimeInfoOptions._value2member_map_], ] DatetimeInfoOptionList = Annotated[ list, Is[ lambda list_value: all( entry in DatetimeInfoOptions._value2member_map_ for entry in list_value ) ], ]
[docs] @register class DatetimeInfoExtractor(BaseDatetimeTransformer): """Transformer to extract various features from datetime var. Attributes: ---------- columns: List[str] List of columns for processing include : list of str, default = ["timeofday", "timeofmonth", "timeofyear", "dayofweek"] Which datetime categorical information to extract datetime_mappings : dict, default = None Optional argument to define custom mappings for datetime values. drop_original: str indicates whether to drop provided columns post transform built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> transformer = DatetimeInfoExtractor( ... columns="a", ... include="timeofday", ... ) >>> transformer DatetimeInfoExtractor(columns=['a'], datetime_mappings={}, include=['timeofday']) >>> transformer.to_json() {'tubular_version': ..., 'classname': 'DatetimeInfoExtractor', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'dummy', 'drop_original': False, 'include': ['timeofday'], 'datetime_mappings': {}}, 'fit': {'is_fitted_': True}} ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True DEFAULT_MAPPINGS: ClassVar[dict[str, dict[int, str]]] = { DatetimeInfoOptions.TIME_OF_DAY.value: { **dict.fromkeys(range(6), "night"), # Midnight - 6am **dict.fromkeys(range(6, 12), "morning"), # 6am - Noon **dict.fromkeys(range(12, 18), "afternoon"), # Noon - 6pm **dict.fromkeys(range(18, 24), "evening"), # 6pm - Midnight }, DatetimeInfoOptions.TIME_OF_MONTH.value: { **dict.fromkeys(range(1, 11), "start"), **dict.fromkeys(range(11, 21), "middle"), **dict.fromkeys(range(21, 32), "end"), }, DatetimeInfoOptions.TIME_OF_YEAR.value: { **dict.fromkeys(range(3, 6), "spring"), # Mar, Apr, May **dict.fromkeys(range(6, 9), "summer"), # Jun, Jul, Aug **dict.fromkeys(range(9, 12), "autumn"), # Sep, Oct, Nov **dict.fromkeys([12, 1, 2], "winter"), # Dec, Jan, Feb }, DatetimeInfoOptions.DAY_OF_WEEK.value: { 1: "monday", 2: "tuesday", 3: "wednesday", 4: "thursday", 5: "friday", 6: "saturday", 7: "sunday", }, } INCLUDE_OPTIONS: ClassVar[list[str]] = list(DEFAULT_MAPPINGS.keys()) RANGE_TO_MAP: ClassVar[dict[str, set[int]]] = { DatetimeInfoOptions.TIME_OF_DAY.value: set(range(24)), DatetimeInfoOptions.TIME_OF_MONTH.value: set(range(1, 32)), DatetimeInfoOptions.TIME_OF_YEAR.value: set(range(1, 13)), DatetimeInfoOptions.DAY_OF_WEEK.value: set(range(1, 8)), } DATETIME_ATTR: ClassVar[dict[str, str]] = { DatetimeInfoOptions.TIME_OF_DAY.value: "hour", DatetimeInfoOptions.TIME_OF_MONTH.value: "day", DatetimeInfoOptions.TIME_OF_YEAR.value: "month", DatetimeInfoOptions.DAY_OF_WEEK.value: "weekday", } @beartype def __init__( self, columns: str | list[str], include: DatetimeInfoOptionList | DatetimeInfoOptionStr | None = None, datetime_mappings: dict[DatetimeInfoOptionStr, dict[int, str]] | None = None, drop_original: bool | None = False, **kwargs: str | bool, ) -> None: """Initialise class instance. Parameters ---------- columns : str or list datetime columns to extract information from include : list of str, default = ["timeofday", "timeofmonth", "timeofyear", "dayofweek"] Which datetime categorical information to extract datetime_mappings : dict, default = {} Optional argument to define custom mappings for datetime values. Keys of the dictionary must be contained in `include`. All possible values of each feature must be included in the mappings, ie, a mapping for `dayofweek` must include all values 1-7; datetime_mappings = { "dayofweek": { **{i: "week" for i in range(1,6)}, **{i: "week" for i in range(6,8)} } } The required ranges for each mapping are: timeofday: 0-23 timeofmonth: 1-31 timeofyear: 1-12 dayofweek: 1-7 If an option is present in 'include' but no mappings are provided, then default values from cls.DEFAULT_MAPPINGS will be used for this option. drop_original: str indicates whether to drop provided columns post transform **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ if include is None: include = self.INCLUDE_OPTIONS if "new_column_name" in kwargs: warnings.warn( f"{self.classname()}: new_column_name argument is not used for this class", stacklevel=2, ) kwargs.pop("new_column_name") super().__init__( columns=columns, drop_original=drop_original, new_column_name="dummy", **kwargs, ) if isinstance(include, str): include = [include] self.include = include self._check_provided_mappings(datetime_mappings=datetime_mappings) self.datetime_mappings = datetime_mappings if self.datetime_mappings is None: self.datetime_mappings = {} self.is_fitted_ = True # Does not fit
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- >>> transformer=DatetimeInfoExtractor(columns='a') >>> transformer.to_json() {'tubular_version': ..., 'classname': 'DatetimeInfoExtractor', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'dummy', 'drop_original': False, 'include': ['timeofday', 'timeofmonth', 'timeofyear', 'dayofweek'], 'datetime_mappings': {}}, 'fit': {'is_fitted_': True}} """ json_dict = super().to_json() json_dict["init"].update( { "include": self.include, "datetime_mappings": self.datetime_mappings, }, ) return json_dict
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> transformer = DatetimeInfoExtractor( ... columns=["a", "b"], ... include=["timeofday", "timeofmonth"], ... ) >>> transformer.get_feature_names_out() ['a_timeofday', 'a_timeofmonth', 'b_timeofday', 'b_timeofmonth'] ``` """ return [ col + "_" + include_option for col in self.columns for include_option in self.include ]
def _check_provided_mappings( self, datetime_mappings: dict[DatetimeInfoOptionStr, dict[int, str]] | None, ) -> None: """Process user provided mappings. Sets datetime_mappings attribute, then validates against RANGE_TO_MAP. Raises ------ ValueError: keys in datetime mapping do not match values in include Examples -------- ```pycon >>> transformer = DatetimeInfoExtractor( ... columns="a", ... include="timeofday", ... ) >>> transformer._check_provided_mappings( ... { ... "timeofday": { ... **{i: "start" for i in range(0, 12)}, ... **{i: "end" for i in range(12, 24)}, ... } ... } ... ) ``` """ if datetime_mappings: for key in datetime_mappings: if key not in self.include: msg = f"{self.classname()}: keys in datetime_mappings should be in include" raise ValueError(msg) # check provided mappings fit required format if set(datetime_mappings[key].keys()) != self.RANGE_TO_MAP[key]: msg = f"{self.classname()}: {key} mapping dictionary should contain mapping for all values between {min(self.RANGE_TO_MAP[key])}-{max(self.RANGE_TO_MAP[key])}. {self.RANGE_TO_MAP[key] - set(datetime_mappings[key].keys())} are missing" raise ValueError(msg)
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Transform - Extracts new features from datetime variables. Parameters ---------- X : DataFrame Data with columns to extract info from. Returns ------- X : DataFrame Transformed input X with added columns of extracted information. Example: -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = DatetimeInfoExtractor( ... columns="a", ... include="timeofmonth", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.datetime(1993, 9, 27), datetime.datetime(2005, 10, 7)], ... "b": [datetime.datetime(1991, 5, 22), datetime.datetime(2001, 12, 10)], ... }, ... ) >>> transformer.transform(test_df) shape: (2, 3) ┌─────────────────────┬─────────────────────┬───────────────┐ │ a ┆ b ┆ a_timeofmonth │ │ --- ┆ --- ┆ --- │ │ datetime[μs] ┆ datetime[μs] ┆ enum │ ╞═════════════════════╪═════════════════════╪═══════════════╡ │ 1993-09-27 00:00:00 ┆ 1991-05-22 00:00:00 ┆ end │ │ 2005-10-07 00:00:00 ┆ 2001-12-10 00:00:00 ┆ start │ └─────────────────────┴─────────────────────┴───────────────┘ ``` """ X = super().transform(X, return_native_override=False) # initialise mappings attr with defaults, # and overwrite with user provided mappings # where possible final_datetime_mappings = copy.deepcopy(self.DEFAULT_MAPPINGS) for key in self.datetime_mappings: final_datetime_mappings[key] = copy.deepcopy( self.datetime_mappings[key], ) # this is a situation where we know the values our mappings allow, # so enum type is more appropriate than categorical and we # will cast to this at the end enums = { include_option: nw.Enum( sorted(set(final_datetime_mappings[include_option].values())), ) for include_option in self.include } mappings_dict = { col + "_" + include_option: final_datetime_mappings[include_option] for col in self.columns for include_option in self.include } transform_dict = { col + "_" + include_option: ( getattr( nw.col(col).dt, self.DATETIME_ATTR[include_option], )().replace_strict( mappings_dict[col + "_" + include_option], ) ) for col in self.columns for include_option in self.include } # final casts transform_dict = { col + "_" + include_option: transform_dict[col + "_" + include_option].cast( enums[include_option], ) for col in self.columns for include_option in self.include } X = ( X.with_columns( **transform_dict, ) if transform_dict else X ) # Drop original columns if self.drop_original is True X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] class DatetimeComponentOptions(str, Enum): """Contains options for DatetimeComponentExtractor.""" __slots__ = () HOUR = "hour" DAY = "day" MONTH = "month" YEAR = "year"
DatetimeComponentOptionStr = Annotated[ str, Is[lambda s: s in DatetimeComponentOptions._value2member_map_], ] DatetimeComponentOptionList = Annotated[ list, Is[ lambda list_value: all( entry in DatetimeComponentOptions._value2member_map_ for entry in list_value ) ], ]
[docs] class DatetimeComponentExtractor(BaseDatetimeTransformer): """Transformer to extract numeric datetime components. Attributes: ---------- columns: List[str] List of columns for processing include : list of str Which numeric datetime components to extract polars_compatible : bool Indicates whether transformer has been converted to polars/pandas agnostic framework lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes jsonable: bool Indicates if transformer supports to/from_json methods FITS: bool Indicates whether transform requires fit to be run first Example: ------- ```pycon >>> transformer = DatetimeComponentExtractor( ... columns="a", ... include=["hour", "day"], ... ) >>> transformer DatetimeComponentExtractor(columns=['a'], include=['hour', 'day']) >>> # transformer can also be dumped to json and reinitialised >>> json_dump = transformer.to_json() >>> json_dump {'tubular_version': ..., 'classname': 'DatetimeComponentExtractor', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'dummy', 'drop_original': False, 'include': ['hour', 'day']}, 'fit': {'is_fitted_': True}} >>> DatetimeComponentExtractor.from_json(json_dump) DatetimeComponentExtractor(columns=['a'], include=['hour', 'day']) ``` """ INCLUDE_OPTIONS: ClassVar[list[str]] = ["hour", "day", "month", "year"] polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: str | list[str], include: DatetimeComponentOptionList | DatetimeComponentOptionStr, **kwargs: str | bool, ) -> None: """Initialize the DatetimeComponentExtractor. Parameters ---------- columns : str or list datetime columns to extract information from include : list of str Which numeric datetime components to extract new_column_name : str, default = "dummy" Name given to new column created by the transformation. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ if isinstance(include, str): include = [include] if "new_column_name" in kwargs: warnings.warn( f"{self.classname()}: new_column_name arg is unused by this transformer", stacklevel=2, ) kwargs.pop("new_column_name", None) super().__init__( columns=columns, new_column_name="dummy", **kwargs, ) self.include = include self.is_fitted_ = True # Does not fit
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: List of features modified/created by the transformer Examples -------- ```pycon >>> transformer = DatetimeComponentExtractor( ... columns=["a", "b"], ... include=["hour", "day"], ... ) >>> transformer.get_feature_names_out() ['a_hour', 'a_day', 'b_hour', 'b_day'] ``` """ return [ col + "_" + include_option for col in self.columns for include_option in self.include ]
[docs] def to_json(self) -> dict[str, Any]: """Convert transformer to JSON format. Returns ------- dict: JSON representation of the transformer Examples -------- ```pycon >>> transformer = DatetimeComponentExtractor( ... columns="a", ... include=["hour", "day"], ... ) >>> transformer.to_json() {'tubular_version': '...', 'classname': 'DatetimeComponentExtractor', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'dummy', 'drop_original': False, 'include': ['hour', 'day']}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"]["include"] = self.include return json_dict
[docs] @beartype def transform(self, X: DataFrame) -> DataFrame: """Transform - Extracts numeric datetime components. Parameters ---------- X : DataFrame Data with columns to extract info from. Returns ------- X : DataFrame Transformed input X with added columns of extracted information. Examples -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = DatetimeComponentExtractor( ... columns="a", ... include=["hour", "day"], ... ) >>> test_df = pl.DataFrame( ... { ... "a": [ ... datetime.datetime(1993, 9, 27, 14, 30), ... datetime.datetime(2005, 10, 7, 9, 45), ... ], ... "b": [ ... datetime.datetime(1991, 5, 22, 18, 0), ... datetime.datetime(2001, 12, 10, 23, 59), ... ], ... }, ... ) >>> transformer.transform(test_df) shape: (2, 4) ┌─────────────────────┬─────────────────────┬────────┬───────┐ │ a ┆ b ┆ a_hour ┆ a_day │ │ --- ┆ --- ┆ --- ┆ --- │ │ datetime[μs] ┆ datetime[μs] ┆ f32 ┆ f32 │ ╞═════════════════════╪═════════════════════╪════════╪═══════╡ │ 1993-09-27 14:30:00 ┆ 1991-05-22 18:00:00 ┆ 14.0 ┆ 27.0 │ │ 2005-10-07 09:45:00 ┆ 2001-12-10 23:59:00 ┆ 9.0 ┆ 7.0 │ └─────────────────────┴─────────────────────┴────────┴───────┘ ``` """ X = super().transform(X, return_native_override=False) transform_dict = { col + "_" + include_option: ( getattr( nw.col(col).dt, include_option, )().cast(nw.Float32) # can't cast to int as may have nulls ) for col in self.columns for include_option in self.include } X = ( X.with_columns( **transform_dict, ) if transform_dict else X ) return _return_narwhals_or_native_dataframe(X, self.return_native)
[docs] class DatetimeSinusoidUnitsOptions(str, Enum): """Options for units argument of DatetimeSinusoidCalculator.""" __slots__ = () YEAR = "year" MONTH = "month" DAY = "day" HOUR = "hour" MINUTE = "minute" SECOND = "second" MICROSECOND = "microsecond"
DatetimeSinusoidUnitsOptionStr = Annotated[ str, Is[lambda s: s in DatetimeSinusoidUnitsOptions._value2member_map_], ]
[docs] class MethodOptions(str, Enum): """Options for method arg of DatetimeSinusoidCalculator.""" __slots__ = () SIN = "sin" COS = "cos"
MethodOptionStr = Annotated[ str, Is[lambda s: s in MethodOptions._value2member_map_], ] MethodOptionList = Annotated[ list, Is[ lambda list_value: all( entry in MethodOptions._value2member_map_ for entry in list_value ) ], ] NumberNotBool = Annotated[ int | float, Is[ # exclude bools which would pass isinstance(..., (float, int)) lambda value: type(value) in {int, float} ], ]
[docs] @register class DatetimeSinusoidCalculator(BaseDatetimeTransformer): """Calculate the sine or cosine of a datetime column in a given unit (e.g hour). Includes the option to scale period of the sine or cosine to match the natural period of the unit (e.g. 24). Attributes: ---------- columns : str or list Columns to take the sine or cosine of. method : str or list The function to be calculated; either sin, cos or a list containing both. units : str or dict Which time unit the calculation is to be carried out on. Will take any of 'year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'. Can be a string or a dict containing key-value pairs of column name and units to be used for that column. period : str, float or dict, default = 2*np.pi The period of the output in the units specified above. Can be a string or a dict containing key-value pairs of column name and units to be used for that column. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes Example: ------- ```pycon >>> DatetimeSinusoidCalculator( ... columns="a", ... method="sin", ... units="month", ... ) DatetimeSinusoidCalculator(columns=['a'], method=['sin'], units='month') ``` """ polars_compatible = True lazyframe_compatible = True FITS = False jsonable = True @beartype def __init__( self, columns: str | list[str], method: MethodOptionStr | MethodOptionList, units: DatetimeSinusoidUnitsOptionStr | dict[str, DatetimeSinusoidUnitsOptionStr], period: NumberNotBool | dict[str, NumberNotBool] = 2 * np.pi, drop_original: bool = False, **kwargs: bool | str, ) -> None: """Initialise class instance. Parameters ---------- columns : str or list Columns to take the sine or cosine of. Must be a datetime[64] column. method : str or list Argument to specify which function is to be calculated. Accepted values are 'sin', 'cos' or a list containing both. units : str or dict Which time unit the calculation is to be carried out on. Accepted values are 'year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'. Can be a string or a dict containing key-value pairs of column name and units to be used for that column. period : int, float or dict, default = 2*np.pi The period of the output in the units specified above. To leave the period of the sinusoid output as 2 pi, specify 2*np.pi (or leave as default). Can be a string or a dict containing key-value pairs of column name and period to be used for that column. drop_original: bool indicates whether to drop original columns kwargs: Union[bool, str] arguments for base classes, e.g. verbose Raises ------ ValueError: if keys in provided period dictionary do match provided columns """ if "new_column_name" in kwargs: warnings.warn( f"{self.classname()}: new_column_name arg is unused by this transformer", stacklevel=2, ) kwargs.pop("new_column_name", None) super().__init__( columns=columns, drop_original=drop_original, new_column_name="dummy", **kwargs, ) method_list = [method] if isinstance(method, str) else method self.method = method_list self.units = units self.period = period if isinstance(units, dict) and sorted(units.keys()) != sorted(self.columns): msg = f"{self.classname()}: unit dictionary keys must be the same as columns but got {set(units.keys())}" raise ValueError(msg) if isinstance(period, dict) and sorted(period.keys()) != sorted(self.columns): msg = f"{self.classname()}: period dictionary keys must be the same as columns but got {set(period.keys())}" raise ValueError(msg) self.units_dict = { column: self.units if not isinstance(self.units, dict) else self.units[column] for column in self.columns } self.period_dict = { column: self.period if not isinstance(self.period, dict) else self.period[column] for column in self.columns } self.is_fitted_ = True # Does not fit
[docs] def get_feature_names_out(self) -> list[str]: """List features modified/created by the transformer. Returns ------- list[str]: list of features modified/created by the transformer Examples -------- ```pycon >>> transformer = DatetimeSinusoidCalculator( ... columns="a", ... method="sin", ... units="month", ... ) >>> transformer.get_feature_names_out() ['sin_6.283185307179586_month_a'] ``` """ return [ f"{method}_{self.period if not isinstance(self.period, dict) else self.period[column]}_{self.units if not isinstance(self.units, dict) else self.units[column]}_{column}" for column in self.columns for method in self.method ]
[docs] @block_from_json def to_json(self) -> dict[str, dict[str, Any]]: """Dump transformer to json dict. Returns ------- dict[str, dict[str, Any]]: jsonified transformer. Nested dict containing levels for attributes set at init and fit. Examples -------- ```pycon >>> transformer = DatetimeSinusoidCalculator( ... columns="a", ... method="sin", ... units="month", ... ) >>> transformer.to_json() {'tubular_version': ..., 'classname': 'DatetimeSinusoidCalculator', 'init': {'columns': ['a'], 'copy': False, 'verbose': False, 'return_native': True, 'new_column_name': 'dummy', 'drop_original': False, 'method': ['sin'], 'units': 'month', 'period': 6.283185307179586}, 'fit': {'is_fitted_': True}} ``` """ json_dict = super().to_json() json_dict["init"].update( { "method": self.method, "units": self.units, "period": self.period, } ) return json_dict
[docs] @beartype def transform( self, X: DataFrame, return_native_override: bool | None = None, ) -> DataFrame: """Transform - creates column containing sine or cosine of another datetime column. Which function is used is stored in the self.method attribute. Parameters ---------- X : pd/pl/nw.DataFrame Data to transform. return_native_override: Optional[bool] Option to override return_native attr in transformer, useful when calling parent methods Returns ------- X : pd/pl/nw.DataFrame Input X with additional columns added, these are named "<method>_<original_column>" Example: -------- ```pycon >>> import polars as pl >>> import datetime >>> transformer = DatetimeSinusoidCalculator( ... columns="a", ... method="sin", ... units="month", ... ) >>> test_df = pl.DataFrame( ... { ... "a": [datetime.datetime(1993, 9, 27), datetime.datetime(2005, 10, 7)], ... "b": [datetime.datetime(1991, 5, 22), datetime.datetime(2001, 12, 10)], ... }, ... ) >>> transformer.transform(test_df) shape: (2, 3) ┌─────────────────────┬─────────────────────┬───────────────────────────────┐ │ a ┆ b ┆ sin_6.283185307179586_month_a │ │ --- ┆ --- ┆ --- │ │ datetime[μs] ┆ datetime[μs] ┆ f64 │ ╞═════════════════════╪═════════════════════╪═══════════════════════════════╡ │ 1993-09-27 00:00:00 ┆ 1991-05-22 00:00:00 ┆ 0.412118 │ │ 2005-10-07 00:00:00 ┆ 2001-12-10 00:00:00 ┆ -0.544021 │ └─────────────────────┴─────────────────────┴───────────────────────────────┘ ``` """ X = _convert_dataframe_to_narwhals(X) return_native = self._process_return_native(return_native_override) X = super().transform(X, return_native_override=False) # first convert to desired units exprs = { f"{method}_{self.period_dict[column]}_{self.units_dict[column]}_{column}": getattr( nw.col(column).dt, self.units_dict[column], )() * (2 * np.pi / self.period_dict[column]) for column in self.columns for method in self.method } # then take sin/cos exprs = { ( new_col_name := f"{method}_{self.period_dict[column]}_{self.units_dict[column]}_{column}" ): getattr(exprs[new_col_name], method)() for column in self.columns for method in self.method } X = X.with_columns(**exprs) if exprs else X # Drop original columns if self.drop_original is True X = DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, return_native=False, ) return _return_narwhals_or_native_dataframe(X, return_native)
# DEPRECATED TRANSFORMERS
[docs] @deprecated( "This Transformer is deprecated, use DateDifferenceTransformer instead. " "If you prefer this transformer to DateDifferenceTransformer, " "let us know through a github issue", ) class DateDiffLeapYearTransformer(BaseGenericDateTransformer): """Transformer to calculate the number of years between two dates. !!! warning "Deprecated" This transformer is now deprecated; use `DateDifferenceTransformer` instead. Attributes ---------- columns : List[str] List of 2 columns. First column will be subtracted from second. new_column_name : str, default = None Name given to calculated datediff column. If None then {column_upper}_{column_lower}_datediff will be used. drop_original : bool Indicator whether to drop old columns during transform method. built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes deprecated: bool indicates if class has been deprecated """ polars_compatible = True lazyframe_compatible = False FITS = False jsonable = False deprecated = True @beartype def __init__( self, columns: ListOfTwoStrs, new_column_name: str, missing_replacement: float | int | str | None = None, drop_original: bool = False, **kwargs: bool, ) -> None: """Initialise class instance. Parameters ---------- columns : List[str] List of 2 columns. First column will be subtracted from second. new_column_name : str Name for the new year column. drop_original : bool Flag for whether to drop the original columns. missing_replacement : int/float/str Value to output if either the lower date value or the upper date value are missing. Default value is None. **kwargs Arbitrary keyword arguments passed onto BaseTransformer.init method. """ super().__init__( columns=columns, new_column_name=new_column_name, drop_original=drop_original, **kwargs, ) self.missing_replacement = missing_replacement # This attribute is not for use in any method, use 'columns' instead. # Here only as a fix to allow string representation of transformer. self.column_lower = columns[0] self.column_upper = columns[1]
[docs] @nw.narwhalify def transform(self, X: FrameT) -> FrameT: """Calculate year gap between the two provided columns. New column is created under the 'new_column_name', and optionally removes the old date columns. Parameters ---------- X : pd/pl/nw.DataFrame Data containing self.columns Returns ------- X : pd/pl/nw.DataFrame Data containing self.columns """ X = nw.from_native(super().transform(X)) # Create a helping column col0 for the first date. This will convert the date into an integer in a format or YYYYMMDD X = X.with_columns( ( nw.col(self.columns[0]).cast(nw.Date).dt.year().cast(nw.Int64) * 10000 + nw.col(self.columns[0]).cast(nw.Date).dt.month().cast(nw.Int64) * 100 + nw.col(self.columns[0]).cast(nw.Date).dt.day().cast(nw.Int64) ).alias("col0"), ) # Create a helping column col1 for the second date. This will convert the date into an integer in a format or YYYYMMDD X = X.with_columns( ( nw.col(self.columns[1]).cast(nw.Date).dt.year().cast(nw.Int64) * 10000 + nw.col(self.columns[1]).cast(nw.Date).dt.month().cast(nw.Int64) * 100 + nw.col(self.columns[1]).cast(nw.Date).dt.day().cast(nw.Int64) ).alias("col1"), ) # Compute difference between integers and if the difference is negative then adjust. # Finally divide by 10000 to get the years. X = X.with_columns( nw.when(nw.col("col1") < nw.col("col0")) .then(((nw.col("col0") - nw.col("col1")) // 10000) * (-1)) .otherwise((nw.col("col1") - nw.col("col0")) // 10000) .cast(nw.Int64) .alias(self.new_column_name), ).drop(["col0", "col1"]) # When we get a missing then replace with missing_replacement otherwise return the above calculation if self.missing_replacement is not None: X = X.with_columns( nw.when( (nw.col(self.columns[0]).is_null()) | (nw.col(self.columns[1]).is_null()), ) .then( self.missing_replacement, ) .otherwise( nw.col(self.new_column_name), ) .cast(nw.Int64) .alias(self.new_column_name), ) # Drop original columns if self.drop_original is True return DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, )
[docs] @deprecated( """This transformer has not been selected for conversion to polars/narwhals, and so has been deprecated. If aspects of it have been useful to you, please raise an issue for it to be replaced with more specific transformers """, ) class SeriesDtMethodTransformer(BaseDatetimeTransformer): """Transformer that applies a pandas.Series.dt method. Transformer assigns the output of the method to a new column. It is possible to supply other key word arguments to the transform method, which will be passed to the pandas.Series.dt method being called. Be aware it is possible to supply incompatible arguments to init that will only be identified when transform is run. This is because there are many combinations of method, input and output sizes. Additionally some methods may only work as expected when called in transform with specific key word arguments. Attributes ---------- column : str Name of column to apply transformer to. This attribute is not for use in any method, use 'columns instead. Here only as a fix to allow string representation of transformer. columns : str Column name for transformation. new_column_name : str The name of the column or columns to be assigned to the output of running the pandas method in transform. pd_method_name : str The name of the pandas.DataFrame method to call. pd_method_kwargs : dict Dictionary of keyword arguments to call the pd.Series.dt method with. drop_original: bool Indicates whether to drop self.column post transform built_from_json: bool indicates if transformer was reconstructed from json, which limits it's supported functionality to .transform polars_compatible : bool class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework jsonable: bool class attribute, indicates if transformer supports to/from_json methods FITS: bool class attribute, indicates whether transform requires fit to be run first lazyframe_compatible: bool class attribute, indicates whether transformer works with lazyframes deprecated: bool indicates if class has been deprecated """ polars_compatible = False lazyframe_compatible = False FITS = False jsonable = False deprecated = True @beartype def __init__( self, new_column_name: str, pd_method_name: str, columns: ListOfOneStr | str, pd_method_kwargs: GenericKwargs | None = None, drop_original: bool = False, **kwargs: bool | None, ) -> None: """Initialise class instance. Parameters ---------- new_column_name : str The name of the column to be assigned to the output of running the pandas method in transform. pd_method_name : str The name of the pandas.Series.dt method to call. columns : str Column to apply the transformer to. If a str is passed this is put into a list. Value passed in columns is saved in the columns attribute on the object. Note this has no default value so the user has to specify the columns when initialising the transformer. This is avoid likely when the user forget to set columns, in this case all columns would be picked up when super transform runs. pd_method_kwargs : dict, default = {} A dictionary of keyword arguments to be passed to the pd.Series.dt method when it is called. drop_original: bool Indicates whether to drop self.column post transform **kwargs Arbitrary keyword arguments passed onto BaseTransformer.__init__(). Raises ------ AttributeError: if requested pd.Series.dt method does not exist """ super().__init__( columns=columns, new_column_name=new_column_name, drop_original=drop_original, **kwargs, ) if pd_method_kwargs is None: pd_method_kwargs = {} self.pd_method_name = pd_method_name self.pd_method_kwargs = pd_method_kwargs try: ser = pd.Series( [datetime.datetime(2020, 12, 21, tzinfo=datetime.timezone.utc)], ) getattr(ser.dt, pd_method_name) except Exception as err: msg = f'{self.classname()}: error accessing "dt.{pd_method_name}" method on pd.Series object - pd_method_name should be a pd.Series.dt method' raise AttributeError(msg) from err if callable(getattr(ser.dt, pd_method_name)): self._callable = True else: self._callable = False # This attribute is not for use in any method, use 'columns' instead. # Here only as a fix to allow string representation of transformer. self.column = self.columns[0]
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Transform specific column on input pandas.DataFrame (X) using the given pandas.Series.dt method. Any keyword arguments set in the pd_method_kwargs attribute are passed onto the pd.Series.dt method when calling it. Parameters ---------- X : pd.DataFrame Data to transform. Returns ------- X : pd.DataFrame Input X with additional column (self.new_column_name) added. These contain the output of running the pd.Series.dt method. """ X = super().transform(X) if self._callable: X[self.new_column_name] = getattr( X[self.columns[0]].dt, self.pd_method_name, )(**self.pd_method_kwargs) else: X[self.new_column_name] = getattr( X[self.columns[0]].dt, self.pd_method_name, ) # Drop original columns if self.drop_original is True return DropOriginalMixin.drop_original_column( X, self.drop_original, self.columns, )