Source code for anlearn.loda

from typing import Optional, Union

import numpy as np
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.utils import check_array, check_random_state
from sklearn.utils.validation import check_is_fitted

from ._typing import ArrayLike


# There is no DensityEstimationMixin in scikit-learn
[docs]class Histogram(BaseEstimator): """Histogram model Histogram model based on :obj:`scipy.stats.rv_histogram`. Parameters ---------- bins : Union[int, str], optional * :obj:`int` - number of equal-width bins in the given range. * :obj:`str` - method used to calculate bin width (:obj:`numpy.histogram_bin_edges`). See :obj:`numpy.histogram_bin_edges` bins for more details, by default "auto" return_min : bool, optional Return minimal float value instead of 0, by default True Attributes ---------- hist : numpy.ndarray Value of histogram bin_edges : numpy.ndarray Edges of histogram pdf : numpy.ndarray Probability density function """ def __init__(self, bins: Union[int, str] = "auto", return_min: bool = True) -> None: self.bins = bins self.return_min = return_min
[docs] def fit(self, X: np.ndarray) -> "Histogram": """Fit estimator Parameters ---------- X : numpy.ndarray Input data, shape (n_samples,) Returns ------- Histogram Fitted estimator """ self.hist, self.bin_edges = np.histogram(X, bins=self.bins) widths = self.bin_edges[1:] - self.bin_edges[:-1] pdf = self.hist / np.sum(self.hist * widths) self.pdf = np.hstack([0.0, pdf, 0.0]) if self.return_min: self.pdf[self.pdf <= 0] = np.finfo(np.float_).eps return self
[docs] def predict_proba(self, X: np.ndarray) -> np.ndarray: """Predict probability Predict probability of input data X. Parameters ---------- X : numpy.ndarray Input data, shape (n_samples,) Returns ------- numpy.ndarray Probability estimated from histogram, shape (n_samples,) """ return self.pdf[np.searchsorted(self.bin_edges, X, side="right")]
[docs]class LODA(BaseEstimator, OutlierMixin): """LODA: Lightweight on-line detector of anomalies [1]_ LODA is an ensemble of histograms on random projections. See Pevný, T. Loda [1]_ for more details. Parameters ---------- n_estimators : int, optional number of histograms, by default 1000 bins : Union[int, str], optional * :obj:`int` - number of equal-width bins in the given range. * :obj:`str` - method used to calculate bin width (:obj:`numpy.histogram_bin_edges`). See :obj:`numpy.histogram_bin_edges` bins for more details, by default "auto" q : float, optional Quantile for compution threshold from training data scores. This threshold is used for `predict` method, by default 0.05 random_state : Optional[int], optional Random seed used for stochastic parts., by default None n_jobs : Optional[int], optional Not implemented yet, by default None verbose : int, optional Verbosity of logging, by default 0 Attributes ---------- projections_ : numpy.ndarray Random projections, shape (n_estimators, n_features) hists_ : List[Histogram] Histograms on random projections, shape (n_estimators,) anomaly_threshold_ : float Treshold for :meth:`predict` function Examples -------- >>> import numpy as np >>> from anlearn.loda import LODA >>> X = np.array([[0, 0], [0.1, -0.2], [0.3, 0.2], [0.2, 0.2], [-5, -5], [0.6, 0.7]]) >>> loda = LODA(n_estimators=10, bins=10, random_state=42) >>> loda.fit(X) LODA(bins=10, n_estimators=10, random_state=42) >>> loda.predict(X) array([ 1, 1, 1, 1, -1, 1]) References ---------- .. [1] Pevný, T. Loda: Lightweight on-line detector of anomalies. Mach Learn 102, 275–304 (2016). <https://doi.org/10.1007/s10994-015-5521-0> """ def __init__( self, n_estimators: int = 1000, bins: Union[int, str] = "auto", q: float = 0.05, random_state: Optional[int] = None, n_jobs: Optional[int] = None, verbose: int = 0, ) -> None: self.n_estimators = n_estimators self.bins = bins self.q = q self.random_state = random_state self.verbose = verbose self.n_jobs = n_jobs # TODO self._validate() def _validate(self) -> None: if not isinstance(self.n_estimators, int) or self.n_estimators < 1: raise ValueError("LODA: n_estimators must be > 0") if self.q < 0 or self.q > 1: raise ValueError("LODA: q must be in [0; 1]") def _init_projections(self) -> None: self.projections_ = np.zeros((self.n_estimators, self._shape[1])) non_zero_w = np.rint(np.sqrt(self._shape[1])).astype(int) rnd = check_random_state(self.random_state) indexes = rnd.rand(self.n_estimators, self._shape[1]).argpartition( non_zero_w, axis=1 )[:, :non_zero_w] rand_values = rnd.normal(size=indexes.shape) for projection, chosen_d, values in zip( self.projections_, indexes, rand_values ): projection[chosen_d] = values
[docs] def fit(self, X: ArrayLike, y: Optional[ArrayLike] = None) -> "LODA": """Fit estimator Parameters ---------- X : ArrayLike Input data, shape (n_samples, n_features) y : Optional[ArrayLike], optional Present for API consistency by convention, by default None Returns ------- LODA Fitted estimator """ raw_data = self.__check_array(X) self._shape = raw_data.shape self._init_projections() w_X = raw_data @ self.projections_.T self.hists_ = [] X_prob = [] for w_x in w_X.T: new_hist = Histogram(bins=self.bins, return_min=True).fit(w_x) self.hists_.append(new_hist) prob = new_hist.predict_proba(w_x) X_prob.append(prob) X_scores = np.mean(np.log(np.array(X_prob)), axis=0) self.anomaly_threshold_ = np.quantile(X_scores, self.q) return self
def __check_array(self, X: ArrayLike) -> np.ndarray: return check_array( X, accept_sparse=True, dtype="numeric", force_all_finite=True ) def __log_prob(self, X: ArrayLike) -> np.ndarray: check_is_fitted(self, attributes=["projections_", "hists_"]) raw_data = self.__check_array(X) w_X = raw_data @ self.projections_.T X_prob = np.array( [hist.predict_proba(w_x) for hist, w_x in zip(self.hists_, w_X.T)] ) return np.log(X_prob)
[docs] def score_samples(self, X: ArrayLike) -> np.ndarray: """Anomaly scores for samples Average of the logarithm probabilities estimated of individual projections. Output is proportional to the negative log-likelihood of the sample, that means the less likely a sample is, the higher the anomaly value it receives [1]_. This score is reversed for scikit-learn compatibility. Parameters ---------- X : ArrayLike Input data, shape (n_samples, n_features) Returns ------- numpy.ndarray The anomaly score of the input samples. The lower, the more abnormal. Shape (n_samples,) """ X_log_prob = self.__log_prob(X) X_scores = np.mean(X_log_prob, axis=0) return X_scores
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict if samples are outliers or not Samples with a score lower than :attr:`anomaly_threshold_` are considered to be outliers. Parameters ---------- X : ArrayLike Input data, shape (n_samples, n_features) Returns ------- numpy.ndarray 1 for inlineres, -1 for outliers, shape (n_samples,) """ check_is_fitted(self, attributes=["anomaly_threshold_"]) scores = self.score_samples(X) return np.where(scores < self.anomaly_threshold_, -1, 1)
[docs] def score_features(self, X: ArrayLike) -> np.ndarray: r"""Feature importance Feature importance is computed as a one-tailed two-sample t-test between :math:`-log(\hat{p}_i)` from histograms on projections with and without a specific feature. The higher the value is, the more important feature is. See full description in **3.3 Explaining the cause of an anomaly** [1]_ for more details. Parameters ---------- X : ArrayLike input data, shape (n_samples, n_features) Returns ------- numpy.ndarray Feature importance in anomaly detection. Notes ----- .. math:: t_j = \frac{\mu_j - \bar{\mu}_j}{ \sqrt{\frac{s^2_j}{|I_j|} + \frac{\bar{s}^2_j}{|\bar{I_j}|}}} """ X_neg_log_prob = -self.__log_prob(X) zero_projections = self.projections_ == 0 results = [] # t-test for every feature for j_feature in range(self._shape[1]): i_with_feature = X_neg_log_prob[~zero_projections[:, j_feature]] i_wo_feature = X_neg_log_prob[zero_projections[:, j_feature]] t_j = ( np.mean(i_with_feature, axis=0) - np.mean(i_wo_feature, axis=0) ) / np.sqrt( np.var(i_with_feature, axis=0) / i_with_feature.shape[0] + np.var(i_wo_feature, axis=0) / i_wo_feature.shape[0] ) results.append(t_j) return np.vstack(results).T