#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""clustering.py: Class Clustering
to test many clustering algorithms
"""
from .corevulpes import CoreVulpes
from ..utils.utils import CUSTOM_SCORER_CLT, METRIC_NAMES, METRICS_TO_REVERSE
import warnings
from time import perf_counter
from typing import List, Dict, Any, Union, Tuple
from collections import defaultdict
from os.path import join as opj
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.pipeline import Pipeline
# define type Array_like
Array_like = Union[List, pd.DataFrame, pd.Series, np.ndarray, Any]
[docs]
class Clustering(CoreVulpes):
"""
Object to train many regressions.
All the parameters are optionals and can be modified.
Args:
models_to_try (Union[str, List[Tuple[str, Any]]], optional):
List of models to try. It can be either a string that corresponds
to a predefined list of models ("all", ...) or it can be a list
of tuple (name of a model, class of a model)
(e.g. ("KMeans",
sklearn.cluster.KMeans)).
Defaults to "all" (train all the available clustering
algorithms).
custom_scorer (Dict[str, Any], optional): metrics to calculate after
fitting a model. Dictionary with pairs name:scorer where the
scorer is created using the function make_scorer from sklearn.
Defaults to CUSTOM_SCORER_CLT.
preprocessing (Union[Pipeline, str], optional): preprocessing
pipeline to use. It can be None (no preprocessing), a
predefined preprocessing pipeline ("default", ...) or
a Pipeline object from sklearn. Defaults to "default":
it applies a OneHotEncoder to "category" and object features,
and it applies a SimpleImputer (median strategy) and a
StandardScaler to numerical features.
sort_result_by (str, optional): on which metric do you want to
sort the final dataframe. Defaults to "Davies–Bouldin Index".
ascending (bool, optional): sort the final dataframe in
ascending order?. Defaults to True.
save_results (bool, optional): if True, save the results in a csv
file. Defaults to False.
path_results (str, optional): path to use when saving the results.
Defaults to "".
additional_model_params (Dict[str, Any], optional): dictionary
that contains parameters to be applied to each element of the
pipeline. E.g. {"n_estimators": 100}, apply to all the
preprocessing tasks and/or models that have the parameter
n_estimators with the parameter n_estimators.
Defaults to {"nb_clusters": 3, "min_samples": 5, "eps": 0.5}.
random_state (int, optional): random state variable. Is applied
to every model and elements of the pipeline. Defaults to 42.
verbose (int, optional): if greater than 1, print the warnings.
Defaults to 0.
Examples:
>>> from sklearn.datasets import load_iris
>>> from vulpes.automl import Clustering
>>> dataset = load_iris()
>>> X = pd.DataFrame(dataset["data"], columns=dataset["feature_names"])
>>> clustering = Clustering()
>>> df_models = clustering.fit(X)
>>> df_models
| Model | Calinski-Harabasz Index | ...
|-------------------------|-------------------------|----------
| AgglomerativeClustering | 502.821564 | ...
| MeanShift | 509.703427 | ...
| Birch | 458.472511 | ...
| SpectralClustering | 410.369441 | ...
| ... | ... | ...
"""
def __init__(
self,
*,
models_to_try: Union[str, List[Tuple[str, Any]]] = "all",
custom_scorer: Dict[str, Any] = CUSTOM_SCORER_CLT,
preprocessing: Union[Pipeline, str] = "default",
sort_result_by: str = "Davies–Bouldin Index",
ascending: bool = True,
save_results: bool = False,
path_results: str = "",
additional_model_params: Dict[str, Any] = {"nb_clusters": 3,
"min_samples": 5,
"eps": 0.5},
random_state: int = None,
verbose: int = 0,
):
super().__init__()
self.task = "clustering"
self.models_to_try = self.predefined_list_models(models_to_try)
self.custom_scorer = custom_scorer
self.preprocessing = preprocessing
self.sort_result_by = sort_result_by
self.ascending = ascending
self.save_results = save_results
self.path_results = path_results
self.additional_model_params = additional_model_params
self.random_state = random_state
self.verbose = verbose
if not (self.verbose):
warnings.filterwarnings("ignore")
[docs]
def fit(
self,
X: Array_like,
*,
sample_weight: Array_like = None
) -> pd.DataFrame:
"""
Fit many clustering algorithms
Args:
X (Array_like): Input dataset
sample_weight (Array_like, optional): sample weights
to apply when fitting. Defaults to None.
Raises:
RuntimeError: Error when fitting a model
Returns:
pd.DataFrame: dataframe with the goodness-of-fit metrics
evaluated for each clustering algorithm.
Examples:
"""
# Convert X to dataframe
# (some preprocessing task, model, etc require this format)
if not (isinstance(X, pd.DataFrame)):
X = pd.DataFrame(X)
# dictionary to store calculated values, model info, etc for each model
metrics_dic = defaultdict(list)
# Fit and calculate metrics for all the models
for name, model in tqdm(self.models_to_try):
print("\n" + name)
top = perf_counter()
model = model()
model_name = name.lower()
# Create the pipeline
pipe = self.create_pipeline(model_name, model)
# When available:
# - maximize n_jobs
# - fix a random_state
model_params = {}
for pipe_name, pipe_elt in pipe.steps:
pipe_elt_available_params = pipe_elt.get_params().keys()
if "random_state" in pipe_elt_available_params:
rd_state = f"{pipe_name}__random_state"
model_params[rd_state] = self.random_state
if "normalize" in pipe_elt_available_params:
model_params[f"{pipe_name}__normalize"] = False
if "n_jobs" in pipe_elt_available_params:
model_params[f"{pipe_name}__n_jobs"] = -1
for (
add_param_name,
add_param_val,
) in self.additional_model_params.items():
if add_param_name in pipe_elt_available_params:
model_params[
f"{pipe_name}__{add_param_name}"
] = add_param_val
if model_params != {}:
pipe.set_params(**model_params)
# if sample_weight is a parameter of the fit function, update
# fit parameters
fit_params = {}
for pipe_name, pipe_elt in pipe.steps:
if hasattr(pipe_elt, "fit") and (
"sample_weight" in pipe_elt.fit.__code__.co_varnames
):
fit_params[f"{pipe_name}__sample_weight"] = sample_weight
try:
pipe.fit(X, **fit_params)
except Exception as e:
raise RuntimeError(f"Error when fitting: {e}")
res = {}
for name_scorer, scorer in self.custom_scorer.items():
try:
# pipe[-1] because (clustering) model at the end
score_val = scorer(X, pipe[-1].labels_)
except Exception as e:
print(f"Error when calculating {name_scorer} on {name}")
print(e)
score_val = np.nan
res[name_scorer] = score_val
# store fitted models
self.fitted_models_[name] = pipe
# add metrics to the lists
# nan mean are used, because when folding,
# a label can be missing
# in a particular fold, thus creating nan values
for metric_name in self.custom_scorer.keys():
# print_metric_name (pmn)
pmn = METRIC_NAMES.get(metric_name, metric_name)
(metrics_dic[pmn].append(np.nanmean(res[metric_name])))
metrics_dic["Model"].append(name) # add name
# add running time
metrics_dic["Running time"].append(perf_counter() - top)
# reverse these metrics (because lower is better)
# ex: rmse, mae, mape
for metric_name in METRICS_TO_REVERSE:
if metric_name in metrics_dic:
reverse_metric = [-x for x in metrics_dic[metric_name]]
metrics_dic[metric_name] = reverse_metric
df_models = pd.DataFrame.from_dict(metrics_dic)
df_models = df_models.sort_values(
by=self.sort_result_by, ascending=self.ascending
).set_index("Model")
self.df_models = df_models
if self.save_results: # save results
df_models.to_csv(
opj.path.join(self.path_results,
f"vulpes_results_{self.task}.csv")
)
return df_models