Source code for scipy.stats._kde

#-------------------------------------------------------------------------------
#
#  Define classes for (uni/multi)-variate kernel density estimation.
#
#  Currently, only Gaussian kernels are implemented.
#
#  Written by: Robert Kern
#
#  Date: 2004-08-09
#
#  Modified: 2005-02-10 by Robert Kern.
#              Contributed to SciPy
#            2005-10-07 by Robert Kern.
#              Some fixes to match the new scipy_core
#
#  Copyright 2004-2005 by Enthought, Inc.
#
#-------------------------------------------------------------------------------

# Standard library imports.
import warnings

# SciPy imports.
from scipy import linalg, special
from scipy._lib._util import check_random_state

from numpy import (asarray, atleast_2d, reshape, zeros, newaxis, exp, pi,
                   sqrt, ravel, power, atleast_1d, squeeze, sum, transpose,
                   ones, cov)
import numpy as np

# Local imports.
from . import _mvn
from ._stats import gaussian_kernel_estimate, gaussian_kernel_estimate_log

# deprecated import to be removed in SciPy 1.13.0
from scipy.special import logsumexp  # noqa: F401


__all__ = ['gaussian_kde']


[docs]class gaussian_kde: """Representation of a kernel-density estimate using Gaussian kernels. Kernel density estimation is a way to estimate the probability density function (PDF) of a random variable in a non-parametric way. `gaussian_kde` works for both uni-variate and multi-variate data. It includes automatic bandwidth determination. The estimation works best for a unimodal distribution; bimodal or multi-modal distributions tend to be oversmoothed. Parameters ---------- dataset : array_like Datapoints to estimate from. In case of univariate data this is a 1-D array, otherwise a 2-D array with shape (# of dims, # of data). bw_method : str, scalar or callable, optional The method used to calculate the estimator bandwidth. This can be 'scott', 'silverman', a scalar constant or a callable. If a scalar, this will be used directly as `kde.factor`. If a callable, it should take a `gaussian_kde` instance as only parameter and return a scalar. If None (default), 'scott' is used. See Notes for more details. weights : array_like, optional weights of datapoints. This must be the same shape as dataset. If None (default), the samples are assumed to be equally weighted Attributes ---------- dataset : ndarray The dataset with which `gaussian_kde` was initialized. d : int Number of dimensions. n : int Number of datapoints. neff : int Effective number of datapoints. .. versionadded:: 1.2.0 factor : float The bandwidth factor, obtained from `kde.covariance_factor`. The square of `kde.factor` multiplies the covariance matrix of the data in the kde estimation. covariance : ndarray The covariance matrix of `dataset`, scaled by the calculated bandwidth (`kde.factor`). inv_cov : ndarray The inverse of `covariance`. Methods ------- evaluate __call__ integrate_gaussian integrate_box_1d integrate_box integrate_kde pdf logpdf resample set_bandwidth covariance_factor Notes ----- Bandwidth selection strongly influences the estimate obtained from the KDE (much more so than the actual shape of the kernel). Bandwidth selection can be done by a "rule of thumb", by cross-validation, by "plug-in methods" or by other means; see [3]_, [4]_ for reviews. `gaussian_kde` uses a rule of thumb, the default is Scott's Rule. Scott's Rule [1]_, implemented as `scotts_factor`, is:: n**(-1./(d+4)), with ``n`` the number of data points and ``d`` the number of dimensions. In the case of unequally weighted points, `scotts_factor` becomes:: neff**(-1./(d+4)), with ``neff`` the effective number of datapoints. Silverman's Rule [2]_, implemented as `silverman_factor`, is:: (n * (d + 2) / 4.)**(-1. / (d + 4)). or in the case of unequally weighted points:: (neff * (d + 2) / 4.)**(-1. / (d + 4)). Good general descriptions of kernel density estimation can be found in [1]_ and [2]_, the mathematics for this multi-dimensional implementation can be found in [1]_. With a set of weighted samples, the effective number of datapoints ``neff`` is defined by:: neff = sum(weights)^2 / sum(weights^2) as detailed in [5]_. `gaussian_kde` does not currently support data that lies in a lower-dimensional subspace of the space in which it is expressed. For such data, consider performing principle component analysis / dimensionality reduction and using `gaussian_kde` with the transformed data. References ---------- .. [1] D.W. Scott, "Multivariate Density Estimation: Theory, Practice, and Visualization", John Wiley & Sons, New York, Chicester, 1992. .. [2] B.W. Silverman, "Density Estimation for Statistics and Data Analysis", Vol. 26, Monographs on Statistics and Applied Probability, Chapman and Hall, London, 1986. .. [3] B.A. Turlach, "Bandwidth Selection in Kernel Density Estimation: A Review", CORE and Institut de Statistique, Vol. 19, pp. 1-33, 1993. .. [4] D.M. Bashtannyk and R.J. Hyndman, "Bandwidth selection for kernel conditional density estimation", Computational Statistics & Data Analysis, Vol. 36, pp. 279-298, 2001. .. [5] Gray P. G., 1969, Journal of the Royal Statistical Society. Series A (General), 132, 272 Examples -------- Generate some random two-dimensional data: >>> import numpy as np >>> from scipy import stats >>> def measure(n): ... "Measurement model, return two coupled measurements." ... m1 = np.random.normal(size=n) ... m2 = np.random.normal(scale=0.5, size=n) ... return m1+m2, m1-m2 >>> m1, m2 = measure(2000) >>> xmin = m1.min() >>> xmax = m1.max() >>> ymin = m2.min() >>> ymax = m2.max() Perform a kernel density estimate on the data: >>> X, Y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j] >>> positions = np.vstack([X.ravel(), Y.ravel()]) >>> values = np.vstack([m1, m2]) >>> kernel = stats.gaussian_kde(values) >>> Z = np.reshape(kernel(positions).T, X.shape) Plot the results: >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots() >>> ax.imshow(np.rot90(Z), cmap=plt.cm.gist_earth_r, ... extent=[xmin, xmax, ymin, ymax]) >>> ax.plot(m1, m2, 'k.', markersize=2) >>> ax.set_xlim([xmin, xmax]) >>> ax.set_ylim([ymin, ymax]) >>> plt.show() """ def __init__(self, dataset, bw_method=None, weights=None): self.dataset = atleast_2d(asarray(dataset)) if not self.dataset.size > 1: raise ValueError("`dataset` input should have multiple elements.") self.d, self.n = self.dataset.shape if weights is not None: self._weights = atleast_1d(weights).astype(float) self._weights /= sum(self._weights) if self.weights.ndim != 1: raise ValueError("`weights` input should be one-dimensional.") if len(self._weights) != self.n: raise ValueError("`weights` input should be of length n") self._neff = 1/sum(self._weights**2) # This can be converted to a warning once gh-10205 is resolved if self.d > self.n: msg = ("Number of dimensions is greater than number of samples. " "This results in a singular data covariance matrix, which " "cannot be treated using the algorithms implemented in " "`gaussian_kde`. Note that `gaussian_kde` interprets each " "*column* of `dataset` to be a point; consider transposing " "the input to `dataset`.") raise ValueError(msg) try: self.set_bandwidth(bw_method=bw_method) except linalg.LinAlgError as e: msg = ("The data appears to lie in a lower-dimensional subspace " "of the space in which it is expressed. This has resulted " "in a singular data covariance matrix, which cannot be " "treated using the algorithms implemented in " "`gaussian_kde`. Consider performing principle component " "analysis / dimensionality reduction and using " "`gaussian_kde` with the transformed data.") raise linalg.LinAlgError(msg) from e
[docs] def evaluate(self, points): """Evaluate the estimated pdf on a set of points. Parameters ---------- points : (# of dimensions, # of points)-array Alternatively, a (# of dimensions,) vector can be passed in and treated as a single point. Returns ------- values : (# of points,)-array The values at each point. Raises ------ ValueError : if the dimensionality of the input points is different than the dimensionality of the KDE. """ points = atleast_2d(asarray(points)) d, m = points.shape if d != self.d: if d == 1 and m == self.d: # points was passed in as a row vector points = reshape(points, (self.d, 1)) m = 1 else: msg = (f"points have dimension {d}, " f"dataset has dimension {self.d}") raise ValueError(msg) output_dtype, spec = _get_output_dtype(self.covariance, points) result = gaussian_kernel_estimate[spec]( self.dataset.T, self.weights[:, None], points.T, self.cho_cov, output_dtype) return result[:, 0]
__call__ = evaluate
[docs] def integrate_gaussian(self, mean, cov): """ Multiply estimated density by a multivariate Gaussian and integrate over the whole space. Parameters ---------- mean : aray_like A 1-D array, specifying the mean of the Gaussian. cov : array_like A 2-D array, specifying the covariance matrix of the Gaussian. Returns ------- result : scalar The value of the integral. Raises ------ ValueError If the mean or covariance of the input Gaussian differs from the KDE's dimensionality. """ mean = atleast_1d(squeeze(mean)) cov = atleast_2d(cov) if mean.shape != (self.d,): raise ValueError("mean does not have dimension %s" % self.d) if cov.shape != (self.d, self.d): raise ValueError("covariance does not have dimension %s" % self.d) # make mean a column vector mean = mean[:, newaxis] sum_cov = self.covariance + cov # This will raise LinAlgError if the new cov matrix is not s.p.d # cho_factor returns (ndarray, bool) where bool is a flag for whether # or not ndarray is upper or lower triangular sum_cov_chol = linalg.cho_factor(sum_cov) diff = self.dataset - mean tdiff = linalg.cho_solve(sum_cov_chol, diff) sqrt_det = np.prod(np.diagonal(sum_cov_chol[0])) norm_const = power(2 * pi, sum_cov.shape[0] / 2.0) * sqrt_det energies = sum(diff * tdiff, axis=0) / 2.0 result = sum(exp(-energies)*self.weights, axis=0) / norm_const return result
[docs] def integrate_box_1d(self, low, high): """ Computes the integral of a 1D pdf between two bounds. Parameters ---------- low : scalar Lower bound of integration. high : scalar Upper bound of integration. Returns ------- value : scalar The result of the integral. Raises ------ ValueError If the KDE is over more than one dimension. """ if self.d != 1: raise ValueError("integrate_box_1d() only handles 1D pdfs") stdev = ravel(sqrt(self.covariance))[0] normalized_low = ravel((low - self.dataset) / stdev) normalized_high = ravel((high - self.dataset) / stdev) value = np.sum(self.weights*( special.ndtr(normalized_high) - special.ndtr(normalized_low))) return value
[docs] def integrate_box(self, low_bounds, high_bounds, maxpts=None): """Computes the integral of a pdf over a rectangular interval. Parameters ---------- low_bounds : array_like A 1-D array containing the lower bounds of integration. high_bounds : array_like A 1-D array containing the upper bounds of integration. maxpts : int, optional The maximum number of points to use for integration. Returns ------- value : scalar The result of the integral. """ if maxpts is not None: extra_kwds = {'maxpts': maxpts} else: extra_kwds = {} value, inform = _mvn.mvnun_weighted(low_bounds, high_bounds, self.dataset, self.weights, self.covariance, **extra_kwds) if inform: msg = ('An integral in _mvn.mvnun requires more points than %s' % (self.d * 1000)) warnings.warn(msg, stacklevel=2) return value
[docs] def integrate_kde(self, other): """ Computes the integral of the product of this kernel density estimate with another. Parameters ---------- other : gaussian_kde instance The other kde. Returns ------- value : scalar The result of the integral. Raises ------ ValueError If the KDEs have different dimensionality. """ if other.d != self.d: raise ValueError("KDEs are not the same dimensionality") # we want to iterate over the smallest number of points if other.n < self.n: small = other large = self else: small = self large = other sum_cov = small.covariance + large.covariance sum_cov_chol = linalg.cho_factor(sum_cov) result = 0.0 for i in range(small.n): mean = small.dataset[:, i, newaxis] diff = large.dataset - mean tdiff = linalg.cho_solve(sum_cov_chol, diff) energies = sum(diff * tdiff, axis=0) / 2.0 result += sum(exp(-energies)*large.weights, axis=0)*small.weights[i] sqrt_det = np.prod(np.diagonal(sum_cov_chol[0])) norm_const = power(2 * pi, sum_cov.shape[0] / 2.0) * sqrt_det result /= norm_const return result
[docs] def resample(self, size=None, seed=None): """Randomly sample a dataset from the estimated pdf. Parameters ---------- size : int, optional The number of samples to draw. If not provided, then the size is the same as the effective number of samples in the underlying dataset. seed : {None, int, `numpy.random.Generator`, `numpy.random.RandomState`}, optional If `seed` is None (or `np.random`), the `numpy.random.RandomState` singleton is used. If `seed` is an int, a new ``RandomState`` instance is used, seeded with `seed`. If `seed` is already a ``Generator`` or ``RandomState`` instance then that instance is used. Returns ------- resample : (self.d, `size`) ndarray The sampled dataset. """ # numpy/numpydoc#87 # noqa: E501 if size is None: size = int(self.neff) random_state = check_random_state(seed) norm = transpose(random_state.multivariate_normal( zeros((self.d,), float), self.covariance, size=size )) indices = random_state.choice(self.n, size=size, p=self.weights) means = self.dataset[:, indices] return means + norm
[docs] def scotts_factor(self): """Compute Scott's factor. Returns ------- s : float Scott's factor. """ return power(self.neff, -1./(self.d+4))
[docs] def silverman_factor(self): """Compute the Silverman factor. Returns ------- s : float The silverman factor. """ return power(self.neff*(self.d+2.0)/4.0, -1./(self.d+4))
# Default method to calculate bandwidth, can be overwritten by subclass covariance_factor = scotts_factor covariance_factor.__doc__ = """Computes the coefficient (`kde.factor`) that multiplies the data covariance matrix to obtain the kernel covariance matrix. The default is `scotts_factor`. A subclass can overwrite this method to provide a different method, or set it through a call to `kde.set_bandwidth`."""
[docs] def set_bandwidth(self, bw_method=None): """Compute the estimator bandwidth with given method. The new bandwidth calculated after a call to `set_bandwidth` is used for subsequent evaluations of the estimated density. Parameters ---------- bw_method : str, scalar or callable, optional The method used to calculate the estimator bandwidth. This can be 'scott', 'silverman', a scalar constant or a callable. If a scalar, this will be used directly as `kde.factor`. If a callable, it should take a `gaussian_kde` instance as only parameter and return a scalar. If None (default), nothing happens; the current `kde.covariance_factor` method is kept. Notes ----- .. versionadded:: 0.11 Examples -------- >>> import numpy as np >>> import scipy.stats as stats >>> x1 = np.array([-7, -5, 1, 4, 5.]) >>> kde = stats.gaussian_kde(x1) >>> xs = np.linspace(-10, 10, num=50) >>> y1 = kde(xs) >>> kde.set_bandwidth(bw_method='silverman') >>> y2 = kde(xs) >>> kde.set_bandwidth(bw_method=kde.factor / 3.) >>> y3 = kde(xs) >>> import matplotlib.pyplot as plt >>> fig, ax = plt.subplots() >>> ax.plot(x1, np.full(x1.shape, 1 / (4. * x1.size)), 'bo', ... label='Data points (rescaled)') >>> ax.plot(xs, y1, label='Scott (default)') >>> ax.plot(xs, y2, label='Silverman') >>> ax.plot(xs, y3, label='Const (1/3 * Silverman)') >>> ax.legend() >>> plt.show() """ if bw_method is None: pass elif bw_method == 'scott': self.covariance_factor = self.scotts_factor elif bw_method == 'silverman': self.covariance_factor = self.silverman_factor elif np.isscalar(bw_method) and not isinstance(bw_method, str): self._bw_method = 'use constant' self.covariance_factor = lambda: bw_method elif callable(bw_method): self._bw_method = bw_method self.covariance_factor = lambda: self._bw_method(self) else: msg = "`bw_method` should be 'scott', 'silverman', a scalar " \ "or a callable." raise ValueError(msg) self._compute_covariance()
def _compute_covariance(self): """Computes the covariance matrix for each Gaussian kernel using covariance_factor(). """ self.factor = self.covariance_factor() # Cache covariance and Cholesky decomp of covariance if not hasattr(self, '_data_cho_cov'): self._data_covariance = atleast_2d(cov(self.dataset, rowvar=1, bias=False, aweights=self.weights)) self._data_cho_cov = linalg.cholesky(self._data_covariance, lower=True) self.covariance = self._data_covariance * self.factor**2 self.cho_cov = (self._data_cho_cov * self.factor).astype(np.float64) self.log_det = 2*np.log(np.diag(self.cho_cov * np.sqrt(2*pi))).sum() @property def inv_cov(self): # Re-compute from scratch each time because I'm not sure how this is # used in the wild. (Perhaps users change the `dataset`, since it's # not a private attribute?) `_compute_covariance` used to recalculate # all these, so we'll recalculate everything now that this is a # a property. self.factor = self.covariance_factor() self._data_covariance = atleast_2d(cov(self.dataset, rowvar=1, bias=False, aweights=self.weights)) return linalg.inv(self._data_covariance) / self.factor**2
[docs] def pdf(self, x): """ Evaluate the estimated pdf on a provided set of points. Notes ----- This is an alias for `gaussian_kde.evaluate`. See the ``evaluate`` docstring for more details. """ return self.evaluate(x)
[docs] def logpdf(self, x): """ Evaluate the log of the estimated pdf on a provided set of points. """ points = atleast_2d(x) d, m = points.shape if d != self.d: if d == 1 and m == self.d: # points was passed in as a row vector points = reshape(points, (self.d, 1)) m = 1 else: msg = (f"points have dimension {d}, " f"dataset has dimension {self.d}") raise ValueError(msg) output_dtype, spec = _get_output_dtype(self.covariance, points) result = gaussian_kernel_estimate_log[spec]( self.dataset.T, self.weights[:, None], points.T, self.cho_cov, output_dtype) return result[:, 0]
[docs] def marginal(self, dimensions): """Return a marginal KDE distribution Parameters ---------- dimensions : int or 1-d array_like The dimensions of the multivariate distribution corresponding with the marginal variables, that is, the indices of the dimensions that are being retained. The other dimensions are marginalized out. Returns ------- marginal_kde : gaussian_kde An object representing the marginal distribution. Notes ----- .. versionadded:: 1.10.0 """ dims = np.atleast_1d(dimensions) if not np.issubdtype(dims.dtype, np.integer): msg = ("Elements of `dimensions` must be integers - the indices " "of the marginal variables being retained.") raise ValueError(msg) n = len(self.dataset) # number of dimensions original_dims = dims.copy() dims[dims < 0] = n + dims[dims < 0] if len(np.unique(dims)) != len(dims): msg = ("All elements of `dimensions` must be unique.") raise ValueError(msg) i_invalid = (dims < 0) | (dims >= n) if np.any(i_invalid): msg = (f"Dimensions {original_dims[i_invalid]} are invalid " f"for a distribution in {n} dimensions.") raise ValueError(msg) dataset = self.dataset[dims] weights = self.weights return gaussian_kde(dataset, bw_method=self.covariance_factor(), weights=weights)
@property def weights(self): try: return self._weights except AttributeError: self._weights = ones(self.n)/self.n return self._weights @property def neff(self): try: return self._neff except AttributeError: self._neff = 1/sum(self.weights**2) return self._neff
def _get_output_dtype(covariance, points): """ Calculates the output dtype and the "spec" (=C type name). This was necessary in order to deal with the fused types in the Cython routine `gaussian_kernel_estimate`. See gh-10824 for details. """ output_dtype = np.common_type(covariance, points) itemsize = np.dtype(output_dtype).itemsize if itemsize == 4: spec = 'float' elif itemsize == 8: spec = 'double' elif itemsize in (12, 16): spec = 'long double' else: raise ValueError( f"{output_dtype} has unexpected item size: {itemsize}" ) return output_dtype, spec