from abc import ABC, abstractmethod
import warnings
import numpy as np
from numpy.typing import NDArray
from ._utils import unimodal_reg_l2, simplex_proj, simplex_proj_using_axis
_EPSILON = np.finfo(np.float64).eps
class _BaseNMF(ABC):
"""
Base class for Non-negative Matrix Factorization (NMF) algorithms.
This abstract class provides the foundation for various NMF implementations.
It handles common functionality such as convergence checking, error
computation, and projection operations for enforcing constraints.
NMF decomposes a non-negative matrix X into two lower-rank non-negative
matrices: X ≈ W * H, where W and H are the factor matrices.
Parameters
----------
rank : int
The rank of the factorization (number of components).
constraint_kind : int, optional
The type of constraint to apply (default is 0):
- 0: Default, Non-negativity constraints only (W ≥ 0, H ≥ 0)
- 1: Simplex constraint (H^T e ≤ e)
- 2: Row simplex constraint (H e = e)
- 3: Column simplex constraint on W (W^T e = e)
- 4: Column simplex constraint on H (H^T e = e)
unimodal : dict or None, default=None
Specifies unimodality constraints for W and H matrices.
If None, no unimodality constraints are applied.
If dict, Format: {'W': [bool list or bool], 'H': [bool list or bool]}
* Must contain at least one key ('W' or 'H')
* No other keys besides 'W' and 'H' are allowed
* For 'W': Controls unimodality of columns in W
* For 'H': Controls unimodality of rows in H
Each value can be:
* A boolean: applies to all components
* A list of booleans: selectively applies to specific components
Examples:
* {'W': True, 'H': True} - All :math:`W` and :math:`H` components are unimodal
* {'H': [True, False, True]} - Only components 0 and 2 of :math:`H` have
unimodal behavior
* {'W': [False, True, False]} - Only component 1 of :math:`W` has unimodal
behavior
iter_max : int, optional
Maximum number of outer iterations for the algorithm (default is 500).
tol : float, optional
Convergence tolerance for the relative loss (default is 1e-4).
Methods
-------
fit(X, Wi, Hi, known_W=None, known_H=None)
Fit the NMF model to the provided data matrix.
Raises
------
TypeError
If input parameters are not of the correct type.
ValueError
If input parameters have invalid values.
Notes
-----
This is an abstract base class. Concrete implementations should override
the `fit`, `_update_H`, and `_update_W` methods.
"""
def __init__(
self,
rank: int,
constraint_kind: int = 0,
unimodal: dict | None = None,
iter_max: int = 500,
tol: float = 1e-4,
):
# validate type
if not isinstance(rank, (int, np.integer)):
raise TypeError(
f"`rank` must be of type {int}, currrent type is {type(rank)}"
)
if not isinstance(constraint_kind, (int, np.integer)):
raise TypeError(
f"`constraint_kind` must be of type {int}, current type is "
f"{type(constraint_kind)}"
)
if unimodal is not None:
if not isinstance(unimodal, dict):
raise TypeError(f"`unimodal` must be None or a dictionary")
if not isinstance(iter_max, (int, np.integer)):
raise TypeError(
f"`iter_max` must be of type {int}, current type is {type(iter_max)}"
)
if not isinstance(tol, float):
raise TypeError(
f"`tol` must be of type {float}, current type is {type(tol)}"
)
# validate values
if rank <= 0:
raise ValueError("`rank` must be > 0")
if constraint_kind not in [0, 1, 2, 3, 4]:
raise ValueError("`constraint_kind` must be 0, 1, 2, 3, or 4")
# default unimodal constraints if None
unimodal_W = [False] * rank
unimodal_H = [False] * rank
if unimodal is not None:
# empty dictionary check
if not unimodal:
raise ValueError(
"If not None, unimodal dictionary cannot be empty; must contain "
"'W' and/or 'H' keys."
)
# check for required keys
valid_keys = ["W", "H"]
provided_keys = list(unimodal.keys())
for provided_key in provided_keys:
if provided_key not in valid_keys:
raise KeyError(
f"`unimodal` dictionary can contain only keys in: {valid_keys} "
f"got {provided_key}."
)
if "W" in unimodal:
unimodal_W = self._process_unimodal_arg(unimodal["W"], rank)
if "H" in unimodal:
unimodal_H = self._process_unimodal_arg(unimodal["H"], rank)
if iter_max < 10:
raise ValueError("`iter_max` must be >= 10")
if not (0 < tol < 1):
raise ValueError("`tol` must be in the open interval (0, 1)")
self.rank_ = rank
self.constraint_kind_ = constraint_kind
self.unimodal_W_ = unimodal_W
self.unimodal_H_ = unimodal_H
self.iter_max_ = iter_max
self.tol_ = tol
# initialize attributes
self._W = None
self._H = None
self._rel_loss_ls = None
self._is_converged = None
@property
def W(self):
"""
The basis matrix :math:`W` obtained after fitting the model.
Returns
-------
ndarray of shape (n_features, rank)
The obtained :math:`W` after fitting the model.
"""
if self._W is None:
raise AttributeError(
"`W` is not available. Model has not been fitted yet. Call 'fit' first."
)
return self._W
@property
def H(self):
"""
The coefficient matrix :math:`H` obtained after fitting the model.
Returns
-------
ndarray of shape (rank, n_samples)
The obtained :math:`H` after fitting the model.
"""
if self._H is None:
raise AttributeError(
"`H` is not available. Model has not been fitted yet. Call 'fit' first."
)
return self._H
@property
def rel_loss_ls(self):
"""
List of relative loss values from each iteration during model fitting.
It is defined as:
.. math::
\\dfrac{\\sqrt{f_{\\textrm{obj}}^{i}}}{||X||_F}\\ \\textrm{,}
where:
* :math:`||\\cdot||_F` denotes the Frobenius norm
* :math:`X` is the original data matrix
* :math:`f_{\\textrm{obj}}^{i}` is the value of objective function after
iteration :math:`i`
Returns
-------
list of float
Relative loss value from each iteration.
"""
if self._rel_loss_ls is None:
raise AttributeError(
"`rel_loss_ls` is not available. Model has not been fitted yet. Call "
"`fit` first."
)
return self._rel_loss_ls
@property
def is_converged(self):
"""
The convergence status.
Returns
-------
bool
Whether the algorithm converged within ``iter_max`` iterations.
* ``True`` if convergence was reached based on ``tol`` criterion
* ``False`` if maximum iterations were reached without convergence
"""
if self._is_converged is None:
raise AttributeError(
"`is_converged` is not available. Model has not been fitted yet. Call "
"`fit` first."
)
return self._is_converged
@abstractmethod
def fit(
self,
X: NDArray[np.float64],
Wi: NDArray[np.float64],
Hi: NDArray[np.float64],
) -> None:
"""
Fit the NMF model to the provided data matrix.
This method must be implemented by derived classes. Different NMF
algorithms implement this method differently.
"""
self._W = Wi.copy()
self._H = Hi.copy()
self._rel_loss_ls = []
self._is_converged = False
@abstractmethod
def _update_H(self) -> None:
"""
Update the H matrix based on the current W matrix.
This method must be implemented by derived classes. Different NMF
algorithms implement this method differently.
"""
pass
@abstractmethod
def _update_W(self) -> None:
"""
Update the W matrix based on the current H matrix.
This method must be implemented by derived classes. Different NMF
algorithms implement this method differently.
"""
pass
def _process_unimodal_arg(self, constraint: bool | list[bool], rank: int):
"""
Process unimodal constraint for either H or W key into a boolean list of length
rank.
"""
if isinstance(constraint, bool):
return [constraint] * rank
elif isinstance(constraint, list):
if len(constraint) != rank:
raise ValueError(f"Unimodal constraint list must have length {rank}.")
if not all(isinstance(c, bool) for c in constraint):
raise TypeError("Unimodal constraint list must contain only booleans.")
return constraint
else:
raise TypeError("Unimodal constraint must be bool or list of booleans")
def _apply_unimodal_H_rows(self) -> None:
"""Apply unimodal constraints to rows of H."""
nrows, ncols = self._H.shape
w = np.ones(ncols)
for row in range(nrows):
if self.unimodal_H_[row] is True:
self._H[row, :] = unimodal_reg_l2(y=self._H[row, :], w=w)
def _apply_unimodal_W_cols(self) -> None:
"""Apply unimodal constraints to columns of W."""
nrows, ncols = self._W.shape
w = np.ones(nrows)
for col in range(ncols):
if self.unimodal_W_[col]:
self._W[:, col] = unimodal_reg_l2(y=self._W[:, col], w=w)
def _check_convergence(self, iter: int) -> bool:
"""
Check if the algorithm has converged.
Parameters
----------
iter : int
Current iteration number.
Returns
-------
bool
True if the algorithm has converged, False otherwise.
"""
if iter < 10:
is_converged = False
else:
is_converged = (
np.abs(self._rel_loss_ls[iter - 10] - self._rel_loss_ls[iter])
/ np.maximum(_EPSILON, self._rel_loss_ls[iter])
< self.tol_
)
return is_converged
def _fro_err_sq_after_W_update(
self, norm_X: float, HHt: NDArray[np.float64], XHt: NDArray[np.float64]
) -> float:
"""
Calculate the squared Frobenius error after W update.
Computes ||X - W*H||_F^2 efficiently using precomputed matrices.
Parameters
----------
norm_X : float
Frobenius norm of X.
HHt : ndarray
Precomputed H * H^T matrix.
XHt : ndarray
Precomputed X * H^T matrix.
Returns
-------
float
Squared Frobenius error.
"""
fro_error_sq = np.maximum(
_EPSILON,
norm_X**2 - 2 * np.sum(XHt * self._W) + np.sum(HHt * (self._W.T @ self._W)),
)
return fro_error_sq
def _fro_err_sq_after_H_update(
self, norm_X: float, WtW: NDArray[np.float64], WtX: NDArray[np.float64]
) -> float:
"""
Calculate the squared Frobenius error after H update.
Computes ||X - W*H||_F^2 efficiently using precomputed matrices.
Parameters
----------
norm_X : float
Frobenius norm of X.
WtW : ndarray
Precomputed W^T * W matrix.
WtX : ndarray
Precomputed W^T * X matrix.
Returns
-------
float
Squared Frobenius error.
"""
fro_error_sq = np.maximum(
_EPSILON,
norm_X**2 - 2 * np.sum(WtX * self._H) + np.sum(WtW * (self._H @ self._H.T)),
)
return fro_error_sq
def _project_H(self):
"""
Project the coefficient matrix H onto the constraint set.
Applies different projections to the H matrix based on the specified
constraint type. The projection ensures that H satisfies the constraints
required for the non-negative matrix factorization problem.
The following constraints are implemented:
- constraint_kind = 0: Non-negativity constraint (H >= 0)
- constraint_kind = 1: Simplex constraint (H^T e <= e)
- constraint_kind = 2: Row simplex constraint (H e = e)
- constraint_kind = 3: Non-negativity constraint (H >= 0, same as 0)
- constraint_kind = 4: Column simplex constraint (H^T e = e)
"""
if self.constraint_kind_ in [0, 3]:
self._H[:] = np.maximum(self._H, 0.0)
elif self.constraint_kind_ == 1:
simplex_proj(self._H)
elif self.constraint_kind_ == 2:
self._H[:] = simplex_proj_using_axis(B=self._H, is_col=False)
elif self.constraint_kind_ == 4:
self._H[:] = simplex_proj_using_axis(B=self._H, is_col=True)
def _project_W(self):
"""
Project the basis matrix W onto the constraint set.
Applies different projections to the W matrix based on the specified
constraint type. The projection ensures that W satisfies the constraints
required for the non-negative matrix factorization problem.
The following constraints are implemented:
- constraint_kind = 3: Column simplex constraint (W >= 0, W^T e = e)
- Any other value: Non-negativity constraint (W >= 0)
"""
if self.constraint_kind_ == 3:
self._W[:] = simplex_proj_using_axis(B=self._W, is_col=True)
else:
self._W[:] = np.maximum(self._W, 0.0)
def _validate_fit_args(
self,
X: NDArray,
Wi: NDArray,
Hi: NDArray,
known_W: NDArray | None,
known_H: NDArray | None,
):
"""
Validate input arguments for matrix factorization.
This function checks the dimensions and values of the input matrices
to ensure they meet the requirements for non-negative matrix factorization.
Parameters
----------
X : ndarray
The input data matrix to be factorized.
Wi : ndarray
Initial guess for the W matrix.
Hi : ndarray
Initial guess for the H matrix.
known_W : ndarray or None
Matrix with known values of W.
known_H : ndarray or None
Matrix with known values of H.
Raises
------
TypeError
If any input matrix is not a numpy array or has incorrect dtype.
ValueError
If any dimension compatibility check fails or if any matrix
contains negative values.
Warns
-----
UserWarning
If known_W or known_H contains no finite values.
"""
# check type
if not isinstance(X, np.ndarray):
raise TypeError("`X` must be a numpy array.")
if not isinstance(Wi, np.ndarray):
raise TypeError("`Wi` must be a numpy array.")
if not isinstance(Hi, np.ndarray):
raise TypeError("`Hi` must be a numpy array.")
if not ((known_W is None) or (isinstance(known_W, np.ndarray))):
raise TypeError("`known_W` must be None or a numpy array.")
if not ((known_H is None) or (isinstance(known_H, np.ndarray))):
raise TypeError("`known_H` must be None or a numpy array.")
# check shape matches
rows_X, cols_X = X.shape
if Wi.shape[0] != rows_X:
raise ValueError(
"Shape mismatch: number of rows of `Wi` not equal to number of rows of "
"`X`"
)
if Wi.shape[1] != self.rank_:
raise ValueError(
"Shape mismatch: number of columns of `Wi` not equal to `rank`"
)
if Hi.shape[1] != cols_X:
raise ValueError(
"Shape mismatch: number of columns of `Hi` not equal to number of "
"columns of `X`"
)
if Hi.shape[0] != self.rank_:
raise ValueError(
f"Shape mismatch: number of rows of `Hi` not equal to `rank`"
)
# check shape matches between known_W, known_H, initial_W, initial_H
if (known_W is not None) and (known_W.shape != Wi.shape):
raise ValueError(f"Shape mismatch: `known_W` shape not equal to `Wi` shape")
if (known_H is not None) and (known_H.shape != Hi.shape):
raise ValueError(f"Shape mismatch: `known_H` shape not equal to `Hi` shape")
# check dtype is np.float64
if X.dtype != np.float64:
raise TypeError(
f"The dtype of `X` elements should be {np.float64}, current dtype is "
f"{X.dtype}"
)
if Wi.dtype != np.float64:
raise TypeError(
f"The dtype of `Wi` elements should be {np.float64}, current dtype is "
f"{Wi.dtype}"
)
if Hi.dtype != np.float64:
raise TypeError(
f"The dtype of `Hi` elements should be {np.float64}, current dtype is "
f"{Hi.dtype}"
)
# check if non-negative
if X.min() < 0:
raise ValueError("All elements of `X` must be >= 0")
if Wi.min() < 0:
raise ValueError("All elements of `Wi` must be >= 0")
if Hi.min() < 0:
raise ValueError("All elements of `Hi` must be >= 0")
# check the specific validation arguments
if known_W is not None:
mask_fvals_knw_W = np.isfinite(known_W)
tot_fvals_knw_W = np.sum(mask_fvals_knw_W)
if tot_fvals_knw_W == 0: # if all elements are NaN
warnings.warn(
"`known_W` contains no finite values, setting it to None."
)
known_W = None
elif tot_fvals_knw_W == known_W.size: # if all elements are finite
raise ValueError(
"All values of `W` are known, can't perform NMF. Try `NNLS solver` "
"to compute H."
)
else:
# finite elems are non-neg
if known_W[mask_fvals_knw_W].min() < 0:
raise ValueError("All finite elements of `known_W` must be >= 0.")
if known_H is not None:
mask_fvals_knw_H = np.isfinite(known_H)
tot_fvals_knw_H = np.sum(mask_fvals_knw_H)
if tot_fvals_knw_H == 0:
warnings.warn(
"`known_H` contains no finite values, setting it to None."
)
known_H = None
elif tot_fvals_knw_H == known_H.size:
raise ValueError(
"All values of `H` are known, can't perform NMF. Try `NNLS solver` "
"to compute W."
)
else:
# finite elems are non-neg
if known_H[mask_fvals_knw_H].min() < 0:
raise ValueError("All finite elements of `known_H` must be >= 0.")
[docs]
class FroALS(_BaseNMF):
"""
Nonnegative Matrix Factorization (NMF) using Alternating Least Squares (ALS) method.
Factorizes a nonnegative matrix :math:`X` into two nonnegative matrices :math:`W`
and :math:`H` by minimizing the squared Frobenius norm between :math:`X` and product
:math:`WH` using FPGM algorithm. The following objective function is minimized
subject to nonnegativity and other optional constraints on :math:`W` and :math:`H`:
.. math::
f_{\\textrm{obj}} = ||X - WH||_F^2
where :math:`||\\cdot||_{F}` is the Frobenius norm.
Parameters
----------
rank : int
The number of components for the factorization.
constraint_kind : integer-like {0, 1, 2, 3, 4}, default=0
The following constraints are applied based on the integer value specified:
* If ``0``: Only :math:`W \\geq 0`, :math:`H \\geq 0`.
* If ``1``: Closure constraint :math:`H^T e ≤ e`.
* If ``2``: Closure constraint :math:`H e = e`.
* If ``3``: Constraint :math:`W^T e = e`.
* If ``4``: Closure constraint :math:`H^T e = e`.
Note, for 1, 2, 3, and 4 values of ``constraint_kind`` nonnegativity constraints
are also applied along with the additional constraint specified above.
unimodal : dict or None, default=None
Specifies unimodality constraints for :math:`W` and :math:`H` matrices. If
``None``, no unimodality constraints are applied.
If ``dict``, Format: {'W': bool | list of bool, 'H': bool | list of bool}:
* Must contain at least one key ('W' or 'H')
* No other keys besides 'W' and 'H' are allowed
* For 'W': Controls unimodality of columns in W
* For 'H': Controls unimodality of rows in H
Each value can be:
* A boolean: applies to all components
* A list of booleans: selectively applies to specific components
Examples:
* ``{'H': True}``: All :math:`H` components are unimodal
* ``{'W': True, 'H': True}``: All :math:`W` and :math:`H` components are
unimodal
* ``{'H': [True, False, True]}``: Only components 0 and 2 of :math:`H` have
unimodal behavior
* ``{'W': [False, True, False]}``: Only component 1 of :math:`W` has
unimodal behavior
iter_max : int, default=500
Maximum number of iterations. It must be greater :math:`\\geq 10`.
tol : float, default=1e-4
Tolerance for convergence. Must be in the interval :math:`(0, 1)`.
Convergence is reached when:
.. math::
{|e[i] - e[i-10]| \\over e[i]} \\leq \\textrm{tol}
where:
* iteration :math:`i \\geq 10`
* :math:`e[i]` is the squared relative loss after iteration :math:`i`, which
is defined as
.. math::
e[i] = {||X - W^{i}H^{i}||_{F}^2 \\over ||X||_{F}^2}
where :math:`W^{i}` and :math:`H^{i}` is the value of :math:`W` and
:math:`H`, respectively, after iteration :math:`i`.
References
----------
.. [1] Gillis, Nicolas. Nonnegative matrix factorization. Society for Industrial and
Applied Mathematics, 2020.
.. [2] Van Benthem, Mark H., and Michael R. Keenan. "Fast algorithm for the solution
of large‐scale non‐negativity‐constrained least squares problems." Journal of
Chemometrics: A Journal of the Chemometrics Society 18.10 (2004): 441-450.
Examples
--------
>>> from mcrnmf.models import FroALS, SNPA
>>> from mcrnmf.datasets import load_rxn_spectra
>>>
>>> # load the example dataset from mcrnmf
>>> X, wv, time = load_rxn_spectra()
>>>
>>> # generate initial guess using SNPA
>>> snpa = SNPA(rank=4)
>>> snpa.fit(X)
>>> Wi = snpa.W # Initial estimate for W
>>> Hi = snpa.H # Initial estimate for H
>>>
>>> # create an instance of FroALS and fit the model
>>> model = FroALS(rank=4, constraint_kind=1, iter_max=2000, tol=1e-4)
>>> model.fit(X, Wi, Hi)
>>> # access decomposed factors
>>> W, H = model.W, model.H
>>> # check convergence status
>>> converged = model.is_converged
>>> # access rel. reconstruction error after each iterations
>>> rel_recon_err = model.rel_reconstruction_error_ls
"""
def __init__(
self,
rank: int,
constraint_kind: int = 0,
unimodal: dict | None = None,
iter_max: int = 500,
tol: int = 1e-4,
):
super().__init__(
rank=rank,
constraint_kind=constraint_kind,
unimodal=unimodal,
iter_max=iter_max,
tol=tol,
)
@property
def rel_reconstruction_error_ls(self):
"""
List of relative reconstruction errors from each iteration during model fitting.
The relative reconstruction error measures how well the current factors
approximate the original data. It is the ratio:
.. math::
\\dfrac{||X - W^{i}H^{i}||_F}{||X||_F}\\ \\textrm{,}
where:
* :math:`||\\cdot||_F` denotes the Frobenius norm
* :math:`X` is the original data matrix
* :math:`W^{i}` and :math:`H^{i}` are values of :math:`W` and :math:`H` after
iteration :math:`i`
Returns
-------
list of float
Relative reconstruction error from each iteration.
"""
if self._rel_loss_ls is None:
raise AttributeError(
"`rel_reconstruction_error_ls` is not available. Model has not been "
"fitted yet. Call 'fit' first."
)
return self._rel_loss_ls
[docs]
def fit(
self,
X: NDArray[np.float64],
Wi: NDArray[np.float64],
Hi: NDArray[np.float64],
known_W: NDArray[np.float64] | None = None,
known_H: NDArray[np.float64] | None = None,
preprocess_scale_WH: bool = False,
):
"""
Fit the FroALS model to the provided data.
Parameters
----------
X : ndarray of shape (n_features, n_samples)
Data array to be factorized.
Wi : ndarray of shape (n_features, rank)
Initial guess for the factor :math:`W`.
Hi : ndarray of shape (rank, n_samples)
Initial guess for the factor :math:`H`.
known_W : ndarray of shape (n_features, rank), default=None
Array containing known values of :math:`W`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`W` which do not
correspond ``np.nan`` entries in ``known_W``.
known_H : ndarray of shape (rank, n_samples), default=None
Array containing known values of :math:`H`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`H` which do not
correspond ``np.nan`` entries in ``known_H``.
preprocess_scale_WH : bool, default=False
If ``True``, ``Wi`` and ``Hi`` are scaled before optimization.
"""
self._validate_fit_args(X, Wi, Hi, known_W, known_H)
self._W = Wi.copy()
self._H = Hi.copy()
self._rel_loss_ls = []
self._is_converged = False
# Check if known values are passed, if so compute a mask.
if known_W is not None:
mask_known_W = np.isfinite(known_W)
else:
mask_known_W = None
if known_H is not None:
mask_known_H = np.isfinite(known_H)
else:
mask_known_H = None
# preallocate arrays for H updates
WtW = np.zeros((self.rank_, self.rank_))
WtX = np.zeros((self.rank_, X.shape[1]))
# preallocate arrays for W updates
XHt = X @ self._H.T
HHt = self._H @ self._H.T
if preprocess_scale_WH is True:
self._W[:] = self._W * (
np.sum(XHt * self._W) / (np.sum((self._W.T @ self._W) * HHt))
)
norm_Wcol = np.linalg.norm(self._W, axis=0) + _EPSILON
norm_Hrow = np.linalg.norm(self._H, axis=1) + _EPSILON
scaling_vec = np.sqrt(norm_Wcol / norm_Hrow)
self._W[:] = self._W / scaling_vec
self._H[:] = self._H * scaling_vec[:, np.newaxis]
HHt[:] = HHt * scaling_vec[np.newaxis, :]
HHt[:] = scaling_vec[:, np.newaxis] * HHt
XHt[:] = XHt * scaling_vec[np.newaxis, :]
norm_X = np.linalg.norm(X, ord="fro")
rel_loss = np.sqrt(self._fro_err_sq_after_W_update(norm_X, HHt, XHt)) / norm_X
self._rel_loss_ls.append(rel_loss)
iter = 0
while iter < self.iter_max_:
self._update_H(X, WtW, WtX, known_H, mask_known_H)
self._update_W(X, HHt, XHt, known_W, mask_known_W)
rel_loss = (
np.sqrt(self._fro_err_sq_after_W_update(norm_X, HHt, XHt)) / norm_X
)
self._rel_loss_ls.append(rel_loss)
self._is_converged = self._check_convergence(iter)
if self._is_converged:
break
iter += 1
def _update_H(
self,
X: NDArray[np.float64],
WtW: NDArray[np.float64],
WtX: NDArray[np.float64],
known_H: NDArray[np.float64] | None,
mask_known_H: NDArray[np.bool_] | None,
):
"""
Update the H matrix based on the current W matrix using ALS.
Solves the least squares problem H = argmin ||X - WH||_F^2 subject to
constraints. Uses efficient matrix operations and handles potential
ill-conditioning.
Parameters
----------
X : ndarray
The input matrix to be factorized.
WtW : ndarray
Precomputed W^T * W matrix for efficiency.
WtX : ndarray
Precomputed W^T * X matrix for efficiency.
known_H : ndarray or None
Matrix of known values in H (NaN elements are treated as unknown).
mask_known_H : ndarray or None
Boolean mask indicating which values in H are known.
Returns
-------
None
The H matrix is updated in-place.
"""
WtW[:] = self._W.T @ self._W
WtX[:] = self._W.T @ X
if np.linalg.cond(WtW) > 1e6:
delta = np.trace(WtW) / self.rank_
WtW[:] += 1e-6 * delta * np.eye(self.rank_)
self._H[:] = np.linalg.solve(WtW, WtX)
else:
self._H[:] = np.linalg.solve(WtW, WtX)
# apply known values
if mask_known_H is not None:
self._H[mask_known_H] = known_H[mask_known_H]
self._apply_unimodal_H_rows()
self._project_H()
def _update_W(
self,
X: NDArray[np.float64],
HHt: NDArray[np.float64],
XHt: NDArray[np.float64],
known_W: NDArray[np.float64] | None,
mask_known_W: NDArray[np.bool_] | None,
):
"""
Update the W matrix based on the current H matrix using ALS.
Solves the least squares problem W = argmin ||X - WH||_F^2 subject to
constraints. Uses efficient matrix operations and handles potential
ill-conditioning.
Parameters
----------
X : ndarray
The input matrix to be factorized.
HHt : ndarray
Precomputed H * H^T matrix for efficiency.
XHt : ndarray
Precomputed X * H^T matrix for efficiency.
known_W : ndarray or None
Matrix of known values in W (NaN elements are treated as unknown).
mask_known_W : ndarray or None
Boolean mask indicating which values in W are known.
Returns
-------
None
The W matrix is updated in-place.
"""
HHt[:] = self._H @ self._H.T
XHt[:] = X @ self._H.T
if np.linalg.cond(HHt) > 1e6:
delta = np.trace(HHt) / self.rank_
HHt[:] += 1e-6 * delta * np.eye(self.rank_)
self._W[:] = np.linalg.solve(HHt, XHt.T).T
else:
self._W[:] = np.linalg.solve(HHt, XHt.T).T
# apply known values
if mask_known_W is not None:
self._W[mask_known_W] = known_W[mask_known_W]
self._apply_unimodal_W_cols()
self._project_W()
[docs]
class FroFPGM(_BaseNMF):
"""
Nonnegative Matrix Factorization (NMF) using Fast Projected Gradient Method (FPGM).
Factorizes a nonnegative matrix :math:`X` into two nonnegative matrices :math:`W`
and :math:`H` by minimizing the squared Frobenius norm between :math:`X` and product
:math:`WH` using FPGM algorithm. The following objective function is minimized
subject to nonnegativity and other optional constraints on :math:`W` and :math:`H`:
.. math::
f_{\\textrm{obj}} = ||X - WH||_F^2
where :math:`||\\cdot||_{F}` is the Frobenius norm.
Parameters
----------
rank : int
The number of components for the factorization.
constraint_kind : integer-like {0, 1, 2, 3, 4}, default=0
The following constraints are applied based on the integer value specified:
* If ``0``: Only :math:`W \\geq 0`, :math:`H \\geq 0`.
* If ``1``: Closure constraint :math:`H^T e ≤ e`.
* If ``2``: Closure constraint :math:`H e = e`.
* If ``3``: Constraint :math:`W^T e = e`.
* If ``4``: Closure constraint :math:`H^T e = e`.
Note, for 1, 2, 3, and 4 values of ``constraint_kind`` nonnegativity constraints
are also applied along with the additional constraint specified above.
unimodal : dict or None, default=None
Specifies unimodality constraints for :math:`W` and :math:`H` matrices. If
``None``, no unimodality constraints are applied.
If ``dict``, Format: {'W': bool | list of bool, 'H': bool | list of bool}:
* Must contain at least one key ('W' or 'H')
* No other keys besides 'W' and 'H' are allowed
* For 'W': Controls unimodality of columns in W
* For 'H': Controls unimodality of rows in H
Each value can be:
* A boolean: applies to all components
* A list of booleans: selectively applies to specific components
Examples:
* ``{'H': True}``: All :math:`H` components are unimodal
* ``{'W': True, 'H': True}``: All :math:`W` and :math:`H` components are
unimodal
* ``{'H': [True, False, True]}``: Only components 0 and 2 of :math:`H` have
unimodal behavior
* ``{'W': [False, True, False]}``: Only component 1 of :math:`W` has
unimodal behavior
iter_max : int, default=500
Maximum number of iterations. It must be greater :math:`\\geq 10`.
tol : float, default=1e-4
Tolerance for convergence. Must be in the interval :math:`(0, 1)`.
Convergence is reached when:
.. math::
{|e[i] - e[i-10]| \\over e[i]} \\leq \\textrm{tol}
where:
* iteration :math:`i \\geq 10`
* :math:`e[i]` is the squared relative loss after iteration :math:`i`, which
is defined as
.. math::
e[i] = {||X - W^{i}H^{i}||_{F}^2 \\over ||X||_{F}^2}
where :math:`W^{i}` and :math:`H^{i}` is the value of :math:`W` and
:math:`H`, respectively, after iteration :math:`i`.
inner_iter_max : int, default=20
Maximum number of inner iterations performed during each single update of either
:math:`W` or :math:`H` while the other is held fixed.
inner_iter_tol : float, default=0.1
Tolerance for the convergence of the inner loop. The inner loop convergence is
reached when:
.. math::
{||A^{k} - A^{k-1}||_{F} \\over ||A^{1} - A^{0}||_{F}} \\leq
\\textrm{inner_iter_tol}
where:
* :math:`A` is either :math:`H` or :math:`W`.
* :math:`A^{k}` is value of :math:`A` after inner-iteration :math:`k`.
* :math:`A^{0}` is the value of :math:`A` before the first inner-iteration.
References
----------
.. [1] Gillis, N. (2020). Nonnegative matrix factorization. Society for Industrial
and Applied Mathematics.
.. [2] Lin, Chih-Jen. "Projected gradient methods for nonnegative matrix
factorization." Neural computation 19.10 (2007): 2756-2779.
Examples
--------
>>> from mcrnmf.models import FroFPGM, SNPA
>>> from mcrnmf.datasets import load_rxn_spectra
>>>
>>> # load the example dataset from mcrnmf
>>> X, wv, time = load_rxn_spectra()
>>>
>>> # generate initial guess using SNPA
>>> snpa = SNPA(rank=4)
>>> snpa.fit(X)
>>> Wi = snpa.W # Initial estimate for W
>>> Hi = snpa.H # Initial estimate for H
>>>
>>> # create an instance of FroALS and fit the model
>>> model = FroFPGM(rank=4, constraint_kind=1, iter_max=2000, tol=1e-4)
>>> model.fit(X, Wi, Hi)
>>> # access decomposed factors
>>> W, H = model.W, model.H
>>> # check convergence status
>>> converged = model.is_converged
>>> # access rel. reconstruction error after each iterations
>>> rel_recon_err = model.rel_reconstruction_error_ls
"""
def __init__(
self,
rank: int,
constraint_kind: int = 0,
unimodal: dict | None = None,
iter_max: int = 500,
tol: int = 0.0001,
inner_iter_max: int = 20,
inner_iter_tol: float = 0.1,
):
super().__init__(
rank=rank,
constraint_kind=constraint_kind,
unimodal=unimodal,
iter_max=iter_max,
tol=tol,
)
if not isinstance(inner_iter_max, (int, np.integer)):
raise TypeError(
f"`inner_iter_max` must be of type {int}, current type is "
f"{type(inner_iter_max)}"
)
if not isinstance(inner_iter_tol, float):
raise TypeError(
f"`tol` must be of type {float}, current type is {type(inner_iter_tol)}"
)
if inner_iter_max <= 0:
raise ValueError("`inner_iter_max` must be > 0")
if not (0 < inner_iter_tol < 1):
raise ValueError("`inner_iter_tol` must be in the open interval (0, 1)")
self.inner_iter_max_ = inner_iter_max
self.inner_iter_tol_ = inner_iter_tol
@property
def rel_reconstruction_error_ls(self):
"""
List of relative reconstruction errors from each iteration during model fitting.
It is defined as:
.. math::
\\dfrac{||X - W^{i}H^{i}||_F}{||X||_F}\\ \\textrm{,}
where:
* :math:`||\\cdot||_F` denotes the Frobenius norm
* :math:`X` is the original data matrix
* :math:`W^{i}` and :math:`H^{i}` are values of :math:`W` and :math:`H` after
iteration :math:`i`
Returns
-------
list of float
Relative reconstruction error from each iteration.
"""
if self._rel_loss_ls is None:
raise AttributeError(
"`rel_reconstruction_error_ls` is not available. Model has not been "
"fitted yet. Call `fit` first."
)
return self._rel_loss_ls
[docs]
def fit(
self,
X: NDArray[np.float64],
Wi: NDArray[np.float64],
Hi: NDArray[np.float64],
known_W: NDArray[np.float64] | None = None,
known_H: NDArray[np.float64] | None = None,
preprocess_scale_WH: bool = False,
):
"""
Fit the FroFPGM model to the provided data.
Parameters
----------
X : ndarray of shape (n_features, n_samples)
Data array to be factorized.
Wi : ndarray of shape (n_features, rank)
Initial guess for the factor :math:`W`.
Hi : ndarray of shape (rank, n_samples)
Initial guess for the factor :math:`H`.
known_W : ndarray of shape (n_features, rank), default=None
Array containing known values of :math:`W`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`W` which do not
correspond ``np.nan`` entries in ``known_W``.
known_H : ndarray of shape (rank, n_samples), default=None
Array containing known values of :math:`H`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`H` which do not
correspond ``np.nan`` entries in ``known_H``.
preprocess_scale_WH : bool, default=False
If ``True``, ``Wi`` and ``Hi`` are scaled before optimization.
"""
self._validate_fit_args(X, Wi, Hi, known_W, known_H)
self._W = Wi.copy()
self._H = Hi.copy()
self._rel_loss_ls = []
self._is_converged = False
# Check if known values are passed, if so compute a mask.
if known_W is not None:
mask_known_W = np.isfinite(known_W)
else:
mask_known_W = None
if known_H is not None:
mask_known_H = np.isfinite(known_H)
else:
mask_known_H = None
# preallocate arrays for H updates
WtW = np.zeros((self.rank_, self.rank_))
WtX = np.zeros((self.rank_, X.shape[1]))
Y_H = np.zeros_like(self._H)
H_diff = np.zeros_like(self._H)
H_prev = np.zeros_like(self._H)
# preallocate arrays for W updates
XHt = X @ self._H.T
HHt = self._H @ self._H.T
Y_W = np.zeros_like(self._W)
W_diff = np.zeros_like(self._W)
W_prev = np.zeros_like(self._W)
if preprocess_scale_WH is True:
self._W[:] = self._W * (
np.sum(XHt * self._W) / (np.sum((self._W.T @ self._W) * HHt))
)
norm_Wcol = np.linalg.norm(self._W, axis=0) + _EPSILON
norm_Hrow = np.linalg.norm(self._H, axis=1) + _EPSILON
scaling_vec = np.sqrt(norm_Wcol / norm_Hrow)
self._W[:] = self._W / scaling_vec
self._H[:] = self._H * scaling_vec[:, np.newaxis]
HHt[:] = HHt * scaling_vec[np.newaxis, :]
HHt[:] = scaling_vec[:, np.newaxis] * HHt
XHt[:] = XHt * scaling_vec[np.newaxis, :]
norm_X = np.linalg.norm(X, ord="fro")
rel_loss = np.sqrt(self._fro_err_sq_after_W_update(norm_X, HHt, XHt)) / norm_X
self._rel_loss_ls.append(rel_loss)
iter = 0
while iter < self.iter_max_:
self._update_H(X, WtW, WtX, Y_H, H_diff, H_prev, known_H, mask_known_H)
self._update_W(X, HHt, XHt, Y_W, W_diff, W_prev, known_W, mask_known_W)
rel_loss = (
np.sqrt(self._fro_err_sq_after_W_update(norm_X, HHt, XHt)) / norm_X
)
self._rel_loss_ls.append(rel_loss)
self._is_converged = self._check_convergence(iter)
if self._is_converged:
break
iter += 1
def _update_H(
self,
X: NDArray[np.float64],
WtW: NDArray[np.float64],
WtX: NDArray[np.float64],
Y_H: NDArray[np.float64],
H_diff: NDArray[np.float64],
H_prev: NDArray[np.float64],
known_H: None | NDArray[np.float64],
mask_known_H: None | NDArray[np.float64],
) -> None:
"""
Update the H matrix based on the current W matrix.
This method implements the FPGM update for the H matrix.
Parameters
----------
X : ndarray
The input matrix to be factorized.
WtW : ndarray
Precomputed W^T * W matrix.
WtX : ndarray
Precomputed W^T * X matrix.
Y_H : ndarray
Auxiliary variable for momentum updates.
H_diff : ndarray
Difference between consecutive H updates.
H_prev : ndarray
Previous H matrix for momentum calculation.
known_H : ndarray or None
Matrix with known values of H.
mask_known_H : ndarray or None
Mask indicating which values in H are known.
Returns
-------
None
The H matrix is updated in-place.
"""
self._project_H()
WtW[:] = self._W.T @ self._W
WtX[:] = self._W.T @ X
# hyperparameters for optimization
l = np.linalg.norm(WtW, 2) # lipschitz constant
theta_prev = 0.05 # first value of theta, heuristic
Y_H = self._H.copy() # look ahead variable
inner_iter = 0
while inner_iter < self.inner_iter_max_:
H_prev[:] = self._H.copy()
self._H[:] = Y_H - ((WtW @ Y_H) - WtX) / l
# apply known values
if mask_known_H is not None:
self._H[mask_known_H] = known_H[mask_known_H]
self._apply_unimodal_H_rows()
self._project_H()
H_diff[:] = self._H - H_prev
norm_H_diff = np.linalg.norm(H_diff, "fro")
if inner_iter == 0:
norm_H_diff_0 = norm_H_diff
# convergence check
if norm_H_diff <= (self.inner_iter_tol_ * norm_H_diff_0):
break
# do the momentum update only if convergence criteria is not satisfied
theta_prev_sq = theta_prev**2
# theta and beta computation based on scheme 2.2.19 in Introductory lectures
# on convex optimization by Nesterov
theta = (np.sqrt(theta_prev_sq**2 + 4 * theta_prev_sq) - theta_prev_sq) / 2
beta = theta_prev * (1 - theta_prev) / (theta_prev_sq + theta)
# momentum step
H_diff[:] = H_diff * beta
Y_H[:] = self._H + H_diff
theta_prev = theta
inner_iter += 1
def _update_W(
self,
X: NDArray[np.float64],
HHt: NDArray[np.float64],
XHt: NDArray[np.float64],
Y_W: NDArray[np.float64],
W_diff: NDArray[np.float64],
W_prev: NDArray[np.float64],
known_W: None | NDArray[np.float64],
mask_known_W: None | NDArray[np.float64],
) -> None:
"""
Update the W matrix based on the current H matrix.
This method implements the FPGM update for the W matrix.
Parameters
----------
X : ndarray
The input matrix to be factorized.
HHt : ndarray
Precomputed H * H^T matrix.
XHt : ndarray
Precomputed X * H^T matrix.
Y_W : ndarray
Auxiliary variable for momentum updates.
W_diff : ndarray
Difference between consecutive W updates.
W_prev : ndarray
Previous W matrix for momentum calculation.
known_W : ndarray or None
Matrix with known values of W.
mask_known_W : ndarray or None
Mask indicating which values in W are known.
Returns
-------
None
The W matrix is updated in-place.
"""
self._project_W()
HHt[:] = self._H @ self._H.T
XHt[:] = X @ self._H.T
# hyperparameters for optimization
l = np.linalg.norm(HHt, 2) # lipschitz constant
theta_prev = 0.05 # first value of theta, heuristic
Y_W = self._W.copy() # look ahead variable
inner_iter = 0
while inner_iter < self.inner_iter_max_:
W_prev[:] = self._W.copy()
self._W[:] = Y_W - ((Y_W @ HHt) - XHt) / l
# apply known values
if mask_known_W is not None:
self._W[mask_known_W] = known_W[mask_known_W]
self._apply_unimodal_W_cols()
self._project_W()
W_diff[:] = self._W - W_prev
norm_W_diff = np.linalg.norm(W_diff, "fro")
if inner_iter == 0:
norm_W_diff_0 = norm_W_diff
# convergence check
if norm_W_diff <= (self.inner_iter_tol_ * norm_W_diff_0):
break
# do the momentum update only if convergence criteria is not satisfied
theta_prev_sq = theta_prev**2
# theta and beta computation based on scheme 2.2.19 in Introductory lectures
# on convex optimization by Nesterov
theta = (np.sqrt(theta_prev_sq**2 + 4 * theta_prev_sq) - theta_prev_sq) / 2
beta = theta_prev * (1 - theta_prev) / (theta_prev_sq + theta)
# momentum step
W_diff[:] = W_diff * beta
Y_W[:] = self._W + W_diff
theta_prev = theta
inner_iter += 1
[docs]
class MinVol(FroFPGM):
"""
Minimum-Volume Nonnegative Matrix Factorization (NMF) implementation using Fast
Projected Gradient Method.
This class implements the FPGM to perform Minimum-Volume NMF, minimizing the
following objective function subject to non-negativity and other optional
constraints.
.. math::
f_{\\textrm{obj}} = ||X - WH||_F^2 + \\lambda \\times \\log(\\det(W^TW + \\delta
\\times I))
where:
* :math:`||\\cdot||_{F}` is the Frobenius norm.
* :math:`X` the nonnegative matrix to be factorized
* :math:`W` and :math:`H` are the nonnegative factors
* :math:`\\lambda` and :math:`\\delta` are model parameters. See definition in the
Parameters section
* :math:`I` is an identity matrix
Parameters
----------
rank : int
The number of components for the factorization.
constraint_kind : integer-like {0, 1, 2, 3, 4}, default=0
The following constraints are applied based on the integer value specified:
* If ``0``: Only :math:`W \\geq 0`, :math:`H \\geq 0`.
* If ``1``: Closure constraint :math:`H^T e ≤ e`.
* If ``2``: Closure constraint :math:`H e = e`.
* If ``3``: Constraint :math:`W^T e = e`.
* If ``4``: Closure constraint :math:`H^T e = e`.
Note, for 1, 2, 3, and 4 values of ``constraint_kind`` nonnegativity constraints
are also applied along with the additional constraint specified above.
unimodal : dict or None, default=None
Specifies unimodality constraints for :math:`W` and :math:`H` matrices. If
``None``, no unimodality constraints are applied.
If ``dict``, Format: {'W': bool | list of bool, 'H': bool | list of bool}:
* Must contain at least one key ('W' or 'H')
* No other keys besides 'W' and 'H' are allowed
* For 'W': Controls unimodality of columns in W
* For 'H': Controls unimodality of rows in H
Each value can be:
* A boolean: applies to all components
* A list of booleans: selectively applies to specific components
Examples:
* ``{'H': True}``: All :math:`H` components are unimodal
* ``{'W': True, 'H': True}``: All :math:`W` and :math:`H` components are
unimodal
* ``{'H': [True, False, True]}``: Only components 0 and 2 of :math:`H` have
unimodal behavior
* ``{'W': [False, True, False]}``: Only component 1 of :math:`W` has
unimodal behavior
iter_max : int, default=500
Maximum number of iterations. It must be greater :math:`\\geq 10`.
tol : float, default=1e-4
Tolerance for convergence. Must be in the interval :math:`(0, 1)`.
Convergence is reached when:
.. math::
{|e[i] - e[i-10]| \\over e[i]} \\leq \\textrm{tol}
where:
* iteration :math:`i \\geq 10`
* :math:`e[i]` is the squared relative loss after iteration :math:`i`, which
is defined as
.. math::
e[i] = {||X - W^{i}H^{i}||_{F}^2 \\over ||X||_{F}^2}
where :math:`W^{i}` and :math:`H^{i}` is the value of :math:`W` and
:math:`H`, respectively, after iteration :math:`i`.
inner_iter_max : int, default=20
Maximum number of inner iterations performed during each single update of either
:math:`W` or :math:`H` while the other is held fixed.
inner_iter_tol : float, default=0.1
Tolerance for the convergence of the inner loop while updating :math:`H`.
The inner loop convergence is reached when:
.. math::
{||H^{k} - H^{k-1}||_{F} \\over ||H^{1} - H^{0}||_{F}} \\leq
\\textrm{inner_iter_tol}
where:
* :math:`H^{k}` is value of :math:`H` after inner-iteration :math:`k`.
* :math:`H^{0}` is the value of :math:`H` before the first inner-iteration.
delta : float, default=0.1
Value of parameter :math:`\\delta`. It ensures numerical stability when
:math:`W` is rank-deficient.
lambdaa : float, default=1e-3
The intial weight, :math:`\\lambda`, of the volume-regularization term.
References
----------
.. [1] Gillis, N. (2020). Nonnegative matrix factorization. Society for Industrial
and Applied Mathematics.
.. [2] Leplat, Valentin, Andersen MS Ang, and Nicolas Gillis. "Minimum-volume rank-
deficient nonnegative matrix factorizations." ICASSP 2019-2019 IEEE
International Conference on Acoustics, Speech and Signal Processing (ICASSP).
IEEE, 2019.
Examples
--------
>>> from mcrnmf.models import MinVol, SNPA
>>> from mcrnmf.datasets import load_rxn_spectra
>>>
>>> # load the example dataset from mcrnmf
>>> X, wv, time = load_rxn_spectra()
>>>
>>> # generate initial guess using SNPA
>>> snpa = SNPA(rank=4)
>>> snpa.fit(X)
>>> Wi = snpa.W # Initial estimate for W
>>> Hi = snpa.H # Initial estimate for H
>>>
>>> # create an instance of FroALS and fit the model
>>> model = MinVol(rank=4, constraint_kind=1, iter_max=2000, tol=1e-4, lambdaa=1e-3)
>>> model.fit(X, Wi, Hi)
>>> # access decomposed factors
>>> W, H = model.W, model.H
>>> # check convergence status
>>> converged = model.is_converged
>>> # access rel. reconstruction error after each iterations
>>> rel_recon_err = model.rel_reconstruction_error_ls
"""
def __init__(
self,
rank: int,
constraint_kind: int = 0,
unimodal: dict | None = None,
iter_max: int = 500,
tol: float = 0.0001,
inner_iter_max: int = 20,
inner_iter_tol: float = 0.1,
delta: float | int = 0.1,
lambdaa: float = 0.001,
):
super().__init__(
rank=rank,
constraint_kind=constraint_kind,
unimodal=unimodal,
iter_max=iter_max,
tol=tol,
inner_iter_max=inner_iter_max,
inner_iter_tol=inner_iter_tol,
)
if not isinstance(delta, (int, np.integer, float)):
raise TypeError(
f"`delta` must be of type {float} or {int}, current type is "
f"{type(delta)}"
)
if not isinstance(lambdaa, (float, int, np.integer)):
raise TypeError(
f"`lambdaa` must be of type {float} or {int}, current type is "
f"{type(lambdaa)}"
)
if delta <= 0:
raise ValueError("`delta` must be > 0")
if lambdaa <= 0:
raise ValueError("`lambdaa` must be > 0")
self.delta_ = delta
self.lambdaa_ = lambdaa
# initialize the additional attribute specific to MinVol
self._rel_reconstruction_error_ls = None
@property
def rel_reconstruction_error_ls(self):
"""
List of relative reconstruction errors from each iteration during model fitting.
The relative reconstruction error measures how well the current factors
approximate the original data. It is the ratio:
.. math::
\\dfrac{||X - W^{i}H^{i}||_F}{||X||_F}\\ \\textrm{,}
where:
* :math:`||\\cdot||_F` denotes the Frobenius norm
* :math:`X` is the original data matrix
* :math:`W^{i}` and :math:`H^{i}` are values of :math:`W` and :math:`H` after
iteration :math:`i`
Returns
-------
list of float
Relative reconstruction error from each iteration.
"""
if self._rel_reconstruction_error_ls is None:
raise AttributeError(
"`rel_reconstruction_error_ls` is not available. Model has not been "
"fitted yet. Call `fit` first."
)
return self._rel_reconstruction_error_ls
[docs]
def fit(
self,
X: NDArray[np.floating],
Wi: NDArray[np.floating],
Hi: NDArray[np.floating],
known_W: None | NDArray[np.floating] = None,
known_H: None | NDArray[np.floating] = None,
preprocess_scale_WH: bool = True,
):
"""
Fit the MinVol model to the provided data.
Parameters
----------
X : ndarray of shape (n_features, n_samples)
Data array to be factorized.
Wi : ndarray of shape (n_features, rank)
Initial guess for the factor :math:`W`.
Hi : ndarray of shape (rank, n_samples)
Initial guess for the factor :math:`H`.
known_W : ndarray of shape (n_features, rank), default=None
Array containing known values of :math:`W`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`W` which do not
correspond ``np.nan`` entries in ``known_W``.
known_H : ndarray of shape (rank, n_samples), default=None
Array containing known values of :math:`H`.
* The ``np.nan`` elements of the array are treated as unknown.
* Equality constraint is applied at those indices of :math:`H` which do not
correspond ``np.nan`` entries in ``known_H``.
preprocess_scale_WH : bool, default=True
If ``True``, ``Wi`` and ``Hi`` are scaled before optimization.
"""
self._validate_fit_args(X, Wi, Hi, known_W, known_H)
self._W = Wi.copy()
self._H = Hi.copy()
self._rel_loss_ls = []
self._rel_reconstruction_error_ls = []
self._is_converged = False
# Check if known values are passed, if so compute a mask.
if known_W is not None:
mask_known_W = np.isfinite(known_W)
else:
mask_known_W = None
if known_H is not None:
mask_known_H = np.isfinite(known_H)
else:
mask_known_H = None
# pre-allocate matrices used in update H
Y_H = np.zeros_like(self._H)
H_diff = np.zeros_like(self._H)
H_prev = np.zeros_like(self._H)
WtW = np.zeros((self.rank_, self.rank_))
WtX = np.zeros((self.rank_, X.shape[1]))
# preallocate matrices used in update W
Y_W = np.zeros_like(self._W)
W_diff = np.zeros_like(self._W)
W_prev = np.zeros_like(self._W)
HHt = self._H @ self._H.T
XHt = X @ self._H.T
# scale W and H based on the constraint
if (self.constraint_kind_ != 0) and preprocess_scale_WH:
self._scale_WH(
X, H_prev, Y_W, W_diff, W_prev, HHt, XHt, known_W, mask_known_W
)
norm_X = np.sqrt(np.sum(X**2))
WtW[:] = self._W.T @ self._W
WtX[:] = self._W.T @ X
I = np.eye(self.rank_, dtype=X.dtype)
fro_error_sq = self._fro_err_sq_after_H_update(norm_X, WtW, WtX)
self._rel_reconstruction_error_ls.append(np.sqrt(fro_error_sq) / norm_X)
vol_error = self._calc_vol_error(WtW=WtW, I=I)
lambdaa_updt = self.lambdaa_ * max(1e-6, fro_error_sq) / np.abs(vol_error)
rel_loss = np.sqrt(fro_error_sq + lambdaa_updt * vol_error) / norm_X
self._rel_loss_ls.append(rel_loss)
# pre-allocate memory for frequently used matrices
A = np.zeros((self.rank_, self.rank_))
Y = np.zeros((self.rank_, self.rank_))
iter = 0
while iter < self.iter_max_:
XHt[:] = X @ self._H.T
HHt[:] = self._H @ self._H.T
Y[:] = np.linalg.inv(WtW + self.delta_ * I)
A[:] = lambdaa_updt * Y + HHt
self._update_W_minvol(A, XHt, Y_W, W_diff, W_prev, known_W, mask_known_W)
self._update_H(X, WtW, WtX, Y_H, H_diff, H_prev, known_H, mask_known_H)
fro_error_sq = self._fro_err_sq_after_H_update(norm_X, WtW, WtX)
self._rel_reconstruction_error_ls.append(np.sqrt(fro_error_sq) / norm_X)
vol_error = self._calc_vol_error(WtW=WtW, I=I)
rel_loss = np.sqrt(fro_error_sq + lambdaa_updt * vol_error) / norm_X
self._rel_loss_ls.append(rel_loss)
self._is_converged = self._check_convergence(iter)
if self._is_converged:
break
iter += 1
def _calc_vol_error(
self, WtW: NDArray[np.float64], I: NDArray[np.float64]
) -> np.float64:
"""
Calculate the volume error term in the objective function.
Computes log(det(W^T * W + delta * I)) which quantifies the volume of the
simplex defined by the columns of W.
Parameters
----------
WtW : ndarray
Precomputed W^T·W matrix.
I : ndarray
Identity matrix of appropriate size.
Returns
-------
float
The volume error term.
"""
return np.log(np.linalg.det(WtW + self.delta_ * I))
def _scale_WH(
self,
X: NDArray[np.float64],
H_prev: NDArray[np.float64],
Y_W: NDArray[np.float64],
W_diff: NDArray[np.float64],
W_prev: NDArray[np.float64],
HHt: NDArray[np.float64],
XHt: NDArray[np.float64],
known_W: None | NDArray[np.float64],
mask_known_W: None | NDArray[np.float64],
) -> None:
"""
Scale W and H matrices to maintain constraints while improving conditioning.
Depending on the constraint type, this method applies appropriate scaling
transformations to W and H that preserve their product.
Parameters
----------
X : ndarray
The input matrix.
H_prev : ndarray
Previous H matrix for comparison.
Y_W : ndarray
Auxiliary variable for W updates.
W_diff : ndarray
Difference between consecutive W updates.
W_prev : ndarray
Previous W matrix.
HHt : ndarray
Precomputed H·H^T matrix.
XHt : ndarray
Precomputed X·H^T matrix.
known_W : ndarray or None
Matrix with known values of W.
mask_known_W : ndarray or None
Mask indicating which values in W are known.
Returns
-------
None
The W and H matrices are updated in-place.
"""
if self.constraint_kind_ in [1, 4]:
H_prev = self._H.copy()
if self.constraint_kind_ == 1:
simplex_proj(self._H)
else:
simplex_proj_using_axis(self._H, True)
# update W if the H projection has changed H significantly
if np.linalg.norm(self._H - H_prev, 2) > (
1e-3 * np.linalg.norm(self._H, 2)
):
self._update_W(X, HHt, XHt, Y_W, W_diff, W_prev, known_W, mask_known_W)
elif self.constraint_kind_ == 2:
H_row_sum = np.sum(self._H, axis=1)
self._H[:] = self._H / H_row_sum[:, np.newaxis]
self._W[:] = self._W * H_row_sum
elif self.constraint_kind_ == 3:
W_col_sum = np.sum(self._W, axis=0)
self._H[:] = self._H * W_col_sum[:, np.newaxis]
self._W[:] = self._W / W_col_sum
def _update_W_minvol(
self,
A: NDArray[np.float64],
XHt: NDArray[np.float64],
Y_W: NDArray[np.float64],
W_diff: NDArray[np.float64],
W_prev: NDArray[np.float64],
known_W: NDArray[np.float64] | None,
mask_known_W: NDArray[np.float64] | None,
) -> None:
"""
Update W with minimum volume regularization.
Performs gradient descent on W with the objective function that includes
both data fidelity and volume regularization terms. The update optimizes:
||X - WH||_F^2 + lambdaa * log(det(W^T*W + delta*I))
Parameters
----------
A : ndarray
Precomputed regularized Hessian matrix.
XHt : ndarray
Precomputed X*H^T matrix.
Y_W : ndarray
Look-ahead matrix for momentum updates.
W_diff : ndarray
Matrix to store differences between iterations.
W_prev : ndarray
Matrix to store previous W values.
known_W : ndarray or None
Matrix containing known values for W.
mask_known_W : ndarray or None
Boolean mask indicating which W values are known.
Returns
-------
None
The W matrix is updated in-place.
"""
self._project_W()
# hyperparamters for optimization
s = np.linalg.svd(A, compute_uv=False)
l = s[0] # lipschitz constant
kappa_inv = s[-1] / s[0] # inverse of the condition number
beta = (1 - np.sqrt(kappa_inv)) / (1 + np.sqrt(kappa_inv))
Y_W = self._W.copy() # look ahead variable
inner_iter = 0
error, error_prev = 1, 0
while inner_iter < self.inner_iter_max_:
W_prev[:] = self._W.copy()
self._W[:] = Y_W - ((Y_W @ A) - XHt) / l
# apply known values
if mask_known_W is not None:
self._W[mask_known_W] = known_W[mask_known_W]
self._apply_unimodal_W_cols()
self._project_W()
W_diff[:] = self._W - W_prev
norm_W_diff = np.linalg.norm(W_diff, "fro")
if inner_iter == 0:
norm_W_diff_0 = norm_W_diff
# convergence check
if norm_W_diff <= (1e-6 * norm_W_diff_0):
break
error = np.sum((self._W.T @ self._W) * A) - 2 * np.sum(self._W * XHt)
if (inner_iter == 0) or (error <= error_prev):
W_diff[:] = W_diff * beta
Y_W[:] = self._W + W_diff
else:
# If current error is greater than previous error, replace look ahead
# variable with current value of W
Y_W[:] = self._W.copy()
error_prev = error
inner_iter += 1
[docs]
class SNPA:
"""
Successive Nonnegative Projection Algorithm (SNPA) for NMF initialization.
The algorithm sequentially selects columns from :math:`X` with maximum residual norm
after projecting onto the convex cone generated by previously selected columns,
providing a robust initialization for :math:`W` which can be used by other NMF
models. The corresponding :math:`H` matrix is then computed by minimizing
:math:`||X - WH||_F^2` subject to nonnegativity and simplex constraints on
:math:`H`.
Parameters
----------
rank : int
The number of components to extract.
iter_max : int, default=500
Maximum number of iterations for updating :math:`H` after extraction of
:math:`W`.
tol : float, default=1e-6
Tolerance for early stopping. The algorithm stops extracting basis compnents
when:
.. math::
\\dfrac{||X - WH||_F}{||X||_{F}} \\leq \\textrm{tol}
where :math:`||\\cdot||_{F}` is the Frobenius norm.
Examples
--------
>>> from mcrnmf.models import SNPA
>>> from mcrnmf.datasets import load_rxn_spectra
>>>
>>> # load the example dataset from mcrnmf
>>> X, wv, time = load_rxn_spectra()
>>>
>>> # create an instance of SNPA and fit to the data
>>> snpa = SNPA(rank=4)
>>> snpa.fit(X)
>>> Wi = snpa.W # Estimate for W
>>> Hi = snpa.H # Estimate for H
>>> selected_indices = model.col_indices_ls
"""
def __init__(
self,
rank: int,
iter_max: int = 500,
tol: float = 1e-6,
):
# validate types of input arguments
if not isinstance(rank, (int, np.integer)):
raise TypeError(
f"`rank` must be of type {int}, currrent type is {type(rank)}"
)
if not isinstance(iter_max, (np.integer, int)):
raise TypeError(
f"`iter_max` must be of type {int}, current type is {type(iter_max)}"
)
if not isinstance(tol, float):
raise TypeError(
f"`tol` must be of type {float}, current type is {type(tol)}"
)
# validate values of input arguments
if rank < 1:
raise ValueError("`rank` must be > 0")
if iter_max < 1:
raise ValueError("`iter_max` must be > 0")
if not (0 < tol < 1):
raise ValueError("`tol` must be in the open interval (0, 1)")
self.rank_ = rank
self.iter_max_ = iter_max
self.tol_ = tol
# Initialize the attributes
self._W = None
self._H = None
self._col_indices_ls = None
@property
def W(self):
"""
The basis matrix containing selected columns from :math:`X`.
Returns
-------
ndarray of shape (n_features, rank)
The basis matrix containing selected columns from :math:`X`.
"""
if self._W is None:
raise AttributeError(
"'W' is not available. Model has not been fitted yet. Call 'fit' first."
)
return self._W
@property
def H(self):
"""
The coefficient matrix derived from :math:`W`.
Returns
-------
ndarray of shape (rank, n_samples)
The coefficient matrix derived from :math:`W`.
"""
if self._H is None:
raise AttributeError(
"'H' is not available. Model has not been fitted yet. Call 'fit' first."
)
return self._H
@property
def col_indices_ls(self):
"""
The indices of columns selected from :math:`X` for :math:`W`.
Returns
-------
list of int
The indices of columns selected from :math:`X` for :math:`W` after fitting
the model.
"""
if self._col_indices_ls is None:
raise AttributeError(
"'col_indices_ls' is not available. Model has not been fitted yet. "
"Call 'fit' first."
)
return self._col_indices_ls
[docs]
def fit(
self,
X: NDArray[np.float64],
):
"""
Fit the SNPA model to the provided data.
Parameters
----------
X : ndarray of shape (n_features, n_samples)
Data array to be factorized.
Warns
-----
UserWarning
If the desired rank is not reached due to early convergence. If this happens
try specifying a lower value for ``tol``.
"""
# validate X
if not isinstance(X, np.ndarray):
raise TypeError("`X` must be a numpy array")
if X.dtype != np.float64:
raise TypeError(
f"The dtype of `X` elements should be {np.float64}, current dtype is "
f"{X.dtype}"
)
if self.rank_ > X.shape[1]:
raise ValueError(
f"`rank` cannot be greater than columns in `X` {X.shape[1]}."
)
if X.min() < 0:
raise ValueError("All elements of `X` must be >= 0")
X_col_norm = np.sum(X**2, axis=0)
X_col_norm_max = np.max(X_col_norm)
# initialize Residuals column norm
R_col_norm = X_col_norm.copy()
self._col_indices_ls = [] # a list to store the column indices
XtW = np.empty(0)
self._W = np.empty(0)
self._H = np.empty(0)
idx_count = 0
while idx_count < self.rank_:
R_col_norm_max = np.max(R_col_norm)
# check if there are multiples columns with max value
idx = np.where((R_col_norm_max - R_col_norm) / R_col_norm_max <= 1e-6)[0]
if idx.size > 1:
# in case of ties select the one of with largest col norm
self._col_indices_ls.append(idx[np.argmax(X_col_norm[idx])])
else:
self._col_indices_ls.append(idx[0])
self._W = X[:, self._col_indices_ls]
Wi = self._W[:, idx_count].reshape(-1, 1)
if XtW.size == 0:
XtW = X.T @ self._W
else:
XtW = np.hstack((XtW, X.T @ Wi))
if idx_count == 0:
WiTWi = self._W.T @ self._W
else:
WtWi = self._W[:, :idx_count].T @ Wi
WiTWi = np.vstack(
(np.hstack((WiTWi, WtWi)), np.hstack((WtWi.T, Wi.T @ Wi)))
)
if idx_count == 0:
self._update_H(X)
else:
self._H[:, self._col_indices_ls[-1]] = 0
h = np.zeros((1, X.shape[1]))
h[0, self._col_indices_ls[-1]] = 1
self._H = np.vstack([self._H, h])
self._update_H(X)
if idx_count == 0:
R_col_norm[:] = (
X_col_norm - 2 * (XtW.T * self._H) + (self._H * (WiTWi @ self._H))
)
else:
R_col_norm[:] = (
X_col_norm
- 2 * np.sum(XtW.T * self._H, axis=0)
+ np.sum(self._H * (WiTWi @ self._H), axis=0)
)
np.maximum(R_col_norm, 0, out=R_col_norm)
if np.sqrt(np.max(R_col_norm) / X_col_norm_max) <= self.tol_:
if len(self._col_indices_ls) != self.rank_:
warnings.warn(
f"Extracted only {len(self._col_indices_ls)} of {self.rank_} "
"indices"
)
break
idx_count += 1
def _update_H(self, X: NDArray[np.float64]):
"""
Update the coefficient matrix H given data X and basis W.
This method solves the optimization problem to find H that minimizes
||X - WH||_F subject to simplex constraints on H. It uses an accelerated
projected gradient descent approach with momentum updates for faster
convergence.
Parameters
----------
X : ndarray
The input data matrix of shape (n_features, n_samples).
Notes
-----
This is an internal method used by the `fit` method and should not be
called directly by users.
"""
# initialization of H while doing rank-1 factorization
if self._H.size == 0:
if np.linalg.cond(self._W) > 1e6:
warnings.warn(
"`W` has condition number > 1e6. Initial guess of H generated might"
" be poor."
)
self._H = np.linalg.lstsq(self._W, X, rcond=None)[0]
self._H[:] = np.maximum(self._H, 0.0)
alpha = np.sum(self._H * (self._W.T @ X)) / np.sum(
(self._W.T @ self._W) * (self._H @ self._H.T)
)
self._H[:] = self._H * alpha
# replaces rows which sum to zero with small random values
zero_sum_rows = np.where(np.sum(self._H, axis=1) == 0)[0]
if len(zero_sum_rows) > 0:
self._H[zero_sum_rows, :] = (
0.001
* np.max(self._H)
* np.random.rand(len(zero_sum_rows), self._H.shape[1])
)
simplex_proj(self._H)
# precompute WtW, and WtX since it doesn't change in the loop
WtW = self._W.T @ self._W
WtX = self._W.T @ X
# hyperparameters for optimization
l = np.linalg.norm(WtW, 2) # lipschitz constant
alpha_prev = 0.05 # first value of alpha, heuristic
delta = 1e-6
# pre-allocate memory for matrices computed or used repeatedly in the loop
Y = self._H.copy() # look ahead variable
H_diff = np.zeros(self._H.shape)
H_prev = np.zeros(self._H.shape)
iter = 0
while iter < self.iter_max_:
H_prev[:] = self._H.copy()
self._H[:] = Y - ((WtW @ Y) - WtX) / l
simplex_proj(self._H)
H_diff[:] = self._H - H_prev
norm_H_diff = np.linalg.norm(H_diff, "fro")
if iter == 0:
norm_H_diff_0 = norm_H_diff
# convergence check
if norm_H_diff <= (delta * norm_H_diff_0):
break
# do the momentum update only if convergence criteria is not satisfied
alpha_prev_sq = alpha_prev**2
alpha = (np.sqrt(alpha_prev_sq**2 + 4 * alpha_prev_sq) - alpha_prev_sq) / 2
beta = alpha_prev * (1 - alpha_prev) / (alpha_prev_sq + alpha)
# momentum step
H_diff[:] = H_diff * beta
Y[:] = self._H + H_diff
alpha_prev = alpha
iter += 1