Source code for odte.Odte

"""
__author__ = "Ricardo Montañana Gómez"
__copyright__ = "Copyright 2020, Ricardo Montañana Gómez"
__license__ = "MIT"
Build a forest of oblique trees based on STree, admits any base classifier
as well
"""

from __future__ import annotations
import random
import json
from math import factorial
from typing import Union, Optional, Tuple, List, Set
import numpy as np
from sklearn.utils.multiclass import (  # type: ignore
    check_classification_targets,
)
from sklearn.base import clone, BaseEstimator, ClassifierMixin  # type: ignore
from sklearn.utils import check_random_state  # type: ignore
from sklearn.ensemble import BaseEnsemble  # type: ignore
from sklearn.utils.validation import (  # type: ignore
    check_is_fitted,
    _check_sample_weight,
)
from joblib import Parallel, delayed  # type: ignore
from stree import Stree  # type: ignore
from ._version import __version__


[docs] class Odte(BaseEnsemble, ClassifierMixin): def __init__( self, # n_jobs = -1 to use all available cores n_jobs: int = -1, estimator: BaseEstimator = Stree(), random_state: Optional[int] = None, max_features: Optional[Union[str, int, float]] = None, max_samples: Optional[Union[int, float]] = None, n_estimators: int = 100, be_hyperparams: str = "{}", ): super().__init__( estimator=estimator, n_estimators=n_estimators, ) self.estimator = estimator self.n_jobs = n_jobs self.n_estimators = n_estimators self.random_state = random_state self.max_features = max_features self.max_samples = max_samples # size of bootstrap self.be_hyperparams = be_hyperparams
[docs] @staticmethod def version() -> str: return __version__
[docs] def _validate_estimator(self) -> None: """Check the estimator and set the estimator_ attribute.""" super()._validate_estimator( default=Stree(random_state=self.random_state) )
[docs] def fit( self, X: np.ndarray, y: np.ndarray, sample_weight: Optional[np.ndarray] = None, ) -> Odte: # Check parameters are Ok. if self.n_estimators < 1: raise ValueError( f"n_estimators must be greater than 0 but got (n_estimators=\ {self.n_estimators})" ) check_classification_targets(y) X, y = self._validate_data(X, y) # if sample_weight is None return np.ones sample_weights = _check_sample_weight( sample_weight, X, dtype=np.float64 ) check_classification_targets(y) # Initialize computed parameters # Build the estimator self.max_features_ = self._initialize_max_features() # build estimator_ self._validate_estimator() self.classes_, y = np.unique(y, return_inverse=True) self.n_classes_: int = self.classes_.shape[0] self.estimators_: List[BaseEstimator] = [] self.subspaces_: List[Tuple[int, ...]] = [] result = self._train(X, y, sample_weights) self.estimators_, self.subspaces_ = tuple(zip(*result)) # type: ignore self._compute_metrics() return self
[docs] def _compute_metrics(self) -> None: tdepth = tnodes = tleaves = 0 for estimator in self.estimators_: if hasattr(estimator, "nodes_leaves"): nodes, leaves = estimator.nodes_leaves() depth = estimator.depth_ tdepth += depth tnodes += nodes tleaves += leaves self.depth_ = tdepth self.leaves_ = tleaves self.nodes_ = tnodes
[docs] def _train( self, X: np.ndarray, y: np.ndarray, weights: np.ndarray ) -> Tuple[List[BaseEstimator], List[Tuple[int, ...]]]: n_samples = X.shape[0] boot_samples = self._get_bootstrap_n_samples(n_samples) estimator = clone(self.estimator_) defined_state = ( random.randint(0, 2**31) if self.random_state is None else self.random_state ) return Parallel(n_jobs=self.n_jobs, prefer="threads")( # type: ignore delayed(Odte._parallel_build_tree)( estimator, X, y, weights, random_seed, boot_samples, self.max_features_, self.be_hyperparams, ) for random_seed in range( defined_state, defined_state + self.n_estimators ) )
[docs] @staticmethod def _parallel_build_tree( estimator_: BaseEstimator, X: np.ndarray, y: np.ndarray, weights: np.ndarray, random_seed: int, boot_samples: int, max_features: int, hyperparams: str, ) -> Tuple[BaseEstimator, Tuple[int, ...]]: clf = clone(estimator_) hyperparams_ = json.loads(hyperparams) hyperparams_.update(dict(random_state=random_seed)) clf.set_params(**hyperparams_) n_samples = X.shape[0] # initialize random boxes random.seed(random_seed) random_box = check_random_state(random_seed) # bootstrap indices = random_box.randint(0, n_samples, boot_samples) # update weights with the chosen samples weights_update = np.bincount(indices, minlength=n_samples) current_weights = weights * weights_update # random subspace features = Odte._get_random_subspace(X, y, max_features) # train the classifier bootstrap = X[indices, :] clf.fit(bootstrap[:, features], y[indices], current_weights[indices]) return (clf, features)
[docs] def _get_bootstrap_n_samples(self, n_samples: int) -> int: if self.max_samples is None: return n_samples if isinstance(self.max_samples, int): if not (1 <= self.max_samples <= n_samples): message = f"max_samples should be in the range 1 to \ {n_samples} but got {self.max_samples}" raise ValueError(message) return self.max_samples if isinstance(self.max_samples, float): if not (0 < self.max_samples < 1): message = f"max_samples should be in the range (0, 1)\ but got {self.max_samples}" raise ValueError(message) return int(round(self.max_samples * n_samples)) raise ValueError( f"Expected values int, float but got \ {type(self.max_samples)}" )
[docs] def _initialize_max_features(self) -> int: if isinstance(self.max_features, str): if self.max_features == "auto": max_features = max(1, int(np.sqrt(self.n_features_in_))) elif self.max_features == "sqrt": max_features = max(1, int(np.sqrt(self.n_features_in_))) elif self.max_features == "log2": max_features = max(1, int(np.log2(self.n_features_in_))) else: raise ValueError( "Invalid value for max_features. " "Allowed string values are 'auto', " "'sqrt' or 'log2'." ) elif self.max_features is None: max_features = self.n_features_in_ elif isinstance(self.max_features, int): max_features = abs(self.max_features) else: # float if self.max_features > 0.0: max_features = max( 1, int(self.max_features * self.n_features_in_) ) else: raise ValueError( "Invalid value for max_features." "Allowed float must be in range (0, 1] " f"got ({self.max_features})" ) return max_features
[docs] @staticmethod def _generate_spaces(features: int, max_features: int) -> list: comb: Set[Tuple[int, ...]] = set() # Generate at most 5 combinations if max_features == features: set_length = 1 else: number = factorial(features) / ( factorial(max_features) * factorial(features - max_features) ) set_length = min(5, int(number)) while len(comb) < set_length: comb.add( tuple(sorted(random.sample(range(features), max_features))) ) return list(comb)
[docs] @staticmethod def _get_random_subspace( dataset: np.ndarray, labels: np.ndarray, max_features: int ) -> Tuple[int, ...]: features_sets = Odte._generate_spaces(dataset.shape[1], max_features) if len(features_sets) > 1: index = random.randint(0, len(features_sets) - 1) return features_sets[index] else: return features_sets[0]
[docs] def predict(self, X: np.ndarray) -> np.ndarray: proba = self.predict_proba(X) return self.classes_[np.argmax(proba, axis=1)]
[docs] def predict_proba(self, X: np.ndarray) -> np.ndarray: check_is_fitted(self, "estimators_") # Input validation X = self._validate_data(X, reset=False) n_samples = X.shape[0] result = np.zeros((n_samples, self.n_classes_)) for tree, features in zip(self.estimators_, self.subspaces_): predictions = tree.predict(X[:, features]) for i in range(n_samples): result[i, predictions[i]] += 1 return result / self.n_estimators
[docs] def get_nodes(self) -> int: check_is_fitted(self, "estimators_") return self.nodes_
[docs] def get_leaves(self) -> int: check_is_fitted(self, "estimators_") return self.leaves_
[docs] def get_depth(self) -> int: check_is_fitted(self, "estimators_") return self.depth_
[docs] def nodes_leaves(self) -> Tuple[int, int]: check_is_fitted(self, "estimators_") return (self.get_nodes(), self.get_leaves())