Source code for vulpes.automl.regressions

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""regressions.py: Class Regressions
to test many regression models
"""

from .corevulpes import CoreVulpes
from ..utils.utils import (
    CUSTOM_SCORER_REG,
    METRIC_NAMES,
    METRICS_TO_REVERSE,
    r2_score_adj,
)

import warnings
import numbers
from time import perf_counter
from typing import List, Dict, Any, Union, Tuple
from collections import defaultdict
from collections.abc import Iterable
from os.path import join as opj

import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.pipeline import Pipeline
from sklearn.model_selection import (
    cross_validate,
    RepeatedKFold,
    _validation,
    train_test_split,
)
from sklearn.metrics import make_scorer

# define type Array_like
Array_like = Union[List, pd.DataFrame, pd.Series, np.ndarray, Any]


[docs] class Regressions(CoreVulpes): """ Object to train many regressions. All the parameters are optionals and can be modified. Args: models_to_try (Union[str, List[Tuple[str, Any]]], optional): List of models to try. It can be either a string that corresponds to a predefined list of models ("all", ...) or it can be a list of tuple (name of a model, class of a model) (e.g. ("RandomForestRegressor", sklearn.ensemble.RandomForestRegressor)). Defaults to "all" (train all the available regression algorithms). custom_scorer (Dict[str, Any], optional): metrics to calculate after fitting a model. Dictionary with pairs name:scorer where the scorer is created using the function make_scorer from sklearn. Defaults to CUSTOM_SCORER_REG. preprocessing (Union[Pipeline, str], optional): preprocessing pipeline to use. It can be None (no preprocessing), a predefined preprocessing pipeline ("default", ...) or a Pipeline object from sklearn. Defaults to "default": it applies a OneHotEncoder to "category" and object features, and it applies a SimpleImputer (median strategy) and a StandardScaler to numerical features. use_cross_validation (bool, optional): whether or not we apply a cross-validation. Defaults to True. cv (Any, optional): cross-validation object. It can be a predefined cross-validation setting ("default", "timeseries", ...), an iterable object, a cross-validation object from sklearn, etc. Defaults to "default": it applies a StratifiedShuffleSplit if a groups object is given when applying the fit method, otherwise, it uses a RepeatedKFold. In both cases, n_splits is set to 5. test_size (float, optional): test of the size set when splitting. Defaults to 0.2. shuffle (bool, optional): whether or not the algorithm shuffle the sample when splitting the given dataset. Defaults to False. sort_result_by (str, optional): on which metric do you want to sort the final dataframe. Defaults to "MAPE". ascending (bool, optional): sort the final dataframe in ascending order?. Defaults to True. save_results (bool, optional): if True, save the results in a csv file. Defaults to False. path_results (str, optional): path to use when saving the results. Defaults to "". additional_model_params (Dict[str, Any], optional): dictionary that contains parameters to be applied to each element of the pipeline. E.g. {"n_estimators": 100}, apply to all the preprocessing tasks and/or models that have the parameter n_estimators with the parameter n_estimators. Defaults to {}. random_state (int, optional): random state variable. Is applied to every model and elements of the pipeline. Defaults to 42. verbose (int, optional): if greater than 1, print the warnings. Defaults to 0. Examples: >>> import pandas as pd >>> from sklearn.datasets import make_regression >>> from vulpes.automl import Regressions >>> X, y = make_regression( n_samples=20, n_features=1, random_state=0, noise=4.0, bias=100.0) >>> regressions = Regressions() >>> df_models = regressions.fit(X, y) >>> df_models | Model | R2 | RMSE | ... |-----------------------------|----------|-----------|------ | LassoCV | 0.997649 | 19.818497 | ... | HuberRegressor | 0.997776 | 19.881912 | ... | Lars | 0.997694 | 19.555598 | ... | TransformedTargetRegressor | 0.997748 | 19.298391 | ... | ... | ... | ... | ... """ def __init__( self, *, models_to_try: Union[str, List[Tuple[str, Any]]] = "all", custom_scorer: Dict[str, Any] = CUSTOM_SCORER_REG, preprocessing: Union[Pipeline, str] = "default", use_cross_validation: bool = True, cv: Any = "default", test_size: float = 0.2, shuffle: bool = True, sort_result_by: str = "MAPE", ascending: bool = True, save_results: bool = False, path_results: str = "", additional_model_params: Dict[str, Any] = {}, random_state: int = None, verbose: int = 0, ): super().__init__() self.task = "regression" self.models_to_try = self.predefined_list_models(models_to_try) self.custom_scorer = custom_scorer self.preprocessing = preprocessing self.use_cross_validation = use_cross_validation self.cv = cv self.test_size = test_size self.shuffle = shuffle self.sort_result_by = sort_result_by self.ascending = ascending self.save_results = save_results self.path_results = path_results self.additional_model_params = additional_model_params self.random_state = random_state self.verbose = verbose if not (self.verbose): warnings.filterwarnings("ignore")
[docs] def fit( self, X: Array_like, y: Array_like, *, sample_weight: Array_like = None, groups: Array_like = None, ) -> pd.DataFrame: """ Fit many models Args: X (Array_like): Input dataset y (Array_like): Target variable sample_weight (Array_like, optional): sample weights to apply when fitting. Defaults to None. groups (Array_like, optional): groups to use during cross validation to stratify. Defaults to None. Raises: ValueError: cross validation wrong type, or failed RuntimeError: Error when fitting a model Returns: pd.DataFrame: dataframe with the goodness-of-fit metrics evaluated for each model. Examples: """ # Convert X to dataframe # (some preprocessing task, model, etc require this format) if not (isinstance(X, pd.DataFrame)): X = pd.DataFrame(X) # dictionary to store calculated values, model info, etc for each model metrics_dic = defaultdict(list) # add adjusted r2 to the custom_scorer # we will fix fit_intercept to True if "adj_r2" in self.custom_scorer: self.custom_scorer["adj_r2"] = make_scorer( r2_score_adj, n=X.shape[0], p=X.shape[1], fit_intercept=True, greater_is_better=True, ) # Fit and calculate metrics for all the models for name, model in tqdm(self.models_to_try): print("\n" + name) top = perf_counter() model = model() model_name = name.lower() # Create the pipeline pipe = self.create_pipeline(model_name, model) # When available: # - maximize n_jobs # - fix a random_state # - set normalize to False # (normalization should be handled by the preprocessing part) model_params = {} for pipe_name, pipe_elt in pipe.steps: pipe_elt_available_params = pipe_elt.get_params().keys() if "random_state" in pipe_elt_available_params: model_params[f"{pipe_name}__random_state"] = self.random_state if "normalize" in pipe_elt_available_params: model_params[f"{pipe_name}__normalize"] = False if "n_jobs" in pipe_elt_available_params: model_params[f"{pipe_name}__n_jobs"] = -1 for ( add_param_name, add_param_val, ) in self.additional_model_params.items(): if add_param_name in pipe_elt_available_params: model_params[ f"{pipe_name}__{add_param_name}" ] = add_param_val if model_params != {}: pipe.set_params(**model_params) # if sample_weight is a parameter of the fit function, update # fit parameters fit_params = {} for pipe_name, pipe_elt in pipe.steps: if hasattr(pipe_elt, "fit") and ( "sample_weight" in pipe_elt.fit.__code__.co_varnames ): fit_params[f"{pipe_name}__sample_weight"] = sample_weight if self.use_cross_validation: if not ( hasattr(self.cv, "split") or isinstance(self.cv, numbers.Integral) or isinstance(self.cv, Iterable) or isinstance(self.cv, str) ): raise ValueError( "Expected cv as an integer, cross-validation " "object (from sklearn.model_selection), " "iterable or valid string predefined cv" ) if isinstance(self.cv, str): cv = self.predefined_cv(X, groups) else: cv = self.cv try: cv_model = cross_validate( pipe, X, y, cv=cv, return_estimator=True, n_jobs=-1, fit_params=fit_params, scoring=self.custom_scorer, ) except ValueError as e: print(e) print("Cross validation failed. If groups provided, ") print( "maybe can't stratify because some groups " "represented in the dataset don't contains enough" " samples." ) print("Using RepeatedKFold instead.") cv = RepeatedKFold( n_splits=5, n_repeats=5, random_state=self.random_state ) cv_model = cross_validate( pipe, X, y, cv=cv, return_estimator=True, n_jobs=-1, fit_params=fit_params, scoring=self.custom_scorer, ) except Exception as e: raise RuntimeError(str(e)) # store fitted models self.fitted_models_[name] = cv_model["estimator"] # add metrics to the lists # nan mean are used, because when folding, # a label can be missing # in a particular fold, thus creating nan values for metric_name in self.custom_scorer.keys(): print_metric_name = METRIC_NAMES.get(metric_name, metric_name) ( metrics_dic[print_metric_name].append( np.nanmean(cv_model[f"test_{metric_name}"]) ) ) metrics_dic["Model"].append(name) # add name # add running time metrics_dic["Running time"].append(perf_counter() - top) else: try: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self.test_size, shuffle=self.shuffle, stratify=groups, random_state=self.random_state, ) pipe.fit(X_train, y_train, **fit_params) res = _validation._score( pipe, X_test, y_test, self.custom_scorer, error_score="raise" ) except Exception as e: raise RuntimeError(f"Error when fitting: {e}") # store fitted models self.fitted_models_[name] = pipe # add metrics to the lists # nan mean are used, because when folding, # a label can be missing # in a particular fold, thus creating nan values for metric_name in self.custom_scorer.keys(): print_metric_name = METRIC_NAMES.get(metric_name, metric_name) ( metrics_dic[print_metric_name].append( np.nanmean(res[metric_name]) ) ) metrics_dic["Model"].append(name) # add name # add running time metrics_dic["Running time"].append(perf_counter() - top) # reverse these metrics (because lower is better) # ex: rmse, mae, mape for metric_name in METRICS_TO_REVERSE: if metric_name in metrics_dic: metrics_dic[metric_name] = [-x for x in metrics_dic[metric_name]] df_models = pd.DataFrame.from_dict(metrics_dic) df_models = df_models.sort_values( by=self.sort_result_by, ascending=self.ascending ).set_index("Model") self.df_models = df_models if self.save_results: # save results df_models.to_csv( opj.path.join(self.path_results, f"vulpes_results_{self.task}.csv") ) return df_models