"""
Solver dispatch for regression.
Handles both Linear Models (LM) and Generalized Linear Models (GLM).
When family is None (default), the existing LM path is used → LinearSolution.
When family is provided, the IRLS path is used → GLMSolution.
"""
import warnings
from typing import Literal, Union
import numpy as np
from numpy.typing import ArrayLike
from pystatistics.core.compute.device import select_device
from pystatistics.regression.design import Design
from pystatistics.regression.solution import LinearSolution, GLMSolution
from pystatistics.regression.backends.cpu import CPUQRBackend
BackendChoice = Literal['auto', 'cpu', 'gpu', 'cpu_qr', 'cpu_svd', 'gpu_qr']
[docs]
def fit(
X_or_design: ArrayLike | Design,
y: ArrayLike | None = None,
*,
family: 'str | Family | None' = None,
backend: BackendChoice = 'auto',
force: bool = False,
tol: float = 1e-8,
max_iter: int = 25,
names: list[str] | None = None,
) -> Union[LinearSolution, GLMSolution]:
"""
Fit a linear or generalized linear model.
When family is None (default), fits ordinary least squares (LM) via
QR decomposition or GPU Cholesky. When family is specified, fits a
GLM via IRLS (Iteratively Reweighted Least Squares).
Accepts EITHER:
1. A Design object (from DataSource or arrays)
2. Raw X and y arrays (convenience)
Args:
X_or_design: Design object or X matrix
y: Response vector (required if X_or_design is array)
family: GLM family specification. None for OLS, or a string
('gaussian', 'binomial', 'poisson') or Family instance.
backend: 'auto', 'cpu', 'gpu', 'cpu_qr', 'cpu_svd', 'gpu_qr'
force: If True, proceed with GPU Cholesky even on ill-conditioned
matrices. Has no effect on CPU backends.
tol: Convergence tolerance for IRLS (GLM only). Default 1e-8
matches R's glm.control().
max_iter: Maximum IRLS iterations (GLM only). Default 25
matches R's glm.control().
names: Optional list of predictor names for output labeling.
If len(names) == p - 1 (one fewer than columns in X),
"(Intercept)" is prepended automatically.
If len(names) == p, used as-is.
Returns:
LinearSolution when family is None.
GLMSolution when family is specified.
Examples:
# OLS with named output
>>> result = fit(X, y, names=['albumin', 'copper', 'protime'])
>>> result.coef['copper']
0.005255
# Logistic regression
>>> result = fit(X, y, family='binomial')
# Poisson regression
>>> result = fit(X, y, family='poisson')
"""
# Get or build Design
if isinstance(X_or_design, Design):
design = X_or_design
else:
if y is None:
raise ValueError("y required when passing arrays")
design = Design.from_arrays(np.asarray(X_or_design), np.asarray(y))
# Resolve names: auto-prepend "(Intercept)" if needed
resolved_names = _resolve_names(names, design.p)
# Dispatch: GLM path if family specified, otherwise LM path
if family is not None:
return _fit_glm(design, family, backend, tol, max_iter, resolved_names)
else:
return _fit_lm(design, backend, force, resolved_names)
def _resolve_names(
names: list[str] | None,
p: int,
) -> tuple[str, ...] | None:
"""Resolve user-provided names into a tuple matching the number of columns.
If names has p-1 elements, prepend '(Intercept)'.
If names has p elements, use as-is.
If None, return None (Solutions will fall back to generic labels).
"""
if names is None:
return None
if len(names) == p:
return tuple(names)
if len(names) == p - 1:
return ("(Intercept)",) + tuple(names)
raise ValueError(
f"names must have {p} or {p - 1} elements to match X with "
f"{p} columns, got {len(names)}"
)
def _fit_lm(
design: Design,
backend: BackendChoice,
force: bool,
names: tuple[str, ...] | None,
) -> LinearSolution:
"""Fit ordinary least squares (existing LM path, unchanged)."""
backend_impl = _get_lm_backend(backend, design)
# Solve — pass force to GPU backends, CPU ignores it
if hasattr(backend_impl, 'solve') and 'force' in backend_impl.solve.__code__.co_varnames:
result = backend_impl.solve(design, force=force)
else:
result = backend_impl.solve(design)
return LinearSolution(_result=result, _design=design, _names=names)
def _fit_glm(
design: Design,
family: 'str | Family',
backend: BackendChoice,
tol: float,
max_iter: int,
names: tuple[str, ...] | None,
) -> GLMSolution:
"""Fit GLM via IRLS.
For NegativeBinomial with unknown theta (theta=None), runs the
alternating estimation loop matching R's MASS::glm.nb():
1. Fit Poisson GLM for initial μ
2. Estimate θ via profile likelihood
3. Refit NB GLM with new θ
4. Repeat until θ converges
"""
from pystatistics.regression.families import (
Family, NegativeBinomial, Poisson, resolve_family,
)
family_obj = resolve_family(family) if not isinstance(family, Family) else family
backend_impl = _get_glm_backend(backend)
# NB with unknown theta: alternating estimation loop
if isinstance(family_obj, NegativeBinomial) and family_obj.theta is None:
return _fit_nb(design, family_obj, backend_impl, tol, max_iter, names)
result = backend_impl.solve(design, family_obj, tol=tol, max_iter=max_iter)
return GLMSolution(_result=result, _design=design, _names=names)
def _fit_nb(
design: Design,
family: 'NegativeBinomial',
backend_impl: object,
tol: float,
max_iter: int,
names: tuple[str, ...] | None,
theta_max_iter: int = 25,
theta_tol: float = 1e-6,
) -> GLMSolution:
"""Fit negative binomial GLM with theta estimation.
Alternates between GLM fitting (given theta) and theta estimation
(given mu), matching R's MASS::glm.nb() algorithm.
"""
from pystatistics.core.exceptions import ConvergenceError
from pystatistics.regression.families import NegativeBinomial, Poisson
from pystatistics.regression._nb_theta import theta_ml
y = design.y
wt = np.ones(design.n)
# Step 1: Initial Poisson fit for starting mu
poisson_result = backend_impl.solve(
design, Poisson(), tol=tol, max_iter=max_iter,
)
mu = poisson_result.params.fitted_values
# Step 2: Initial theta from Poisson mu
theta = theta_ml(y, mu, wt)
# Step 3: Iterate: refit NB with new theta → re-estimate theta
for iteration in range(theta_max_iter):
nb_family = NegativeBinomial(theta=theta, link=family._link)
result = backend_impl.solve(
design, nb_family, tol=tol, max_iter=max_iter,
)
mu = result.params.fitted_values
theta_new = theta_ml(y, mu, wt)
if abs(theta_new - theta) / (theta + 1e-10) < theta_tol:
# Converged — final result uses the converged theta
nb_final = NegativeBinomial(theta=theta_new, link=family._link)
result = backend_impl.solve(
design, nb_final, tol=tol, max_iter=max_iter,
)
return GLMSolution(_result=result, _design=design, _names=names)
theta = theta_new
raise ConvergenceError(
f"NB theta estimation did not converge after {theta_max_iter} "
f"outer iterations. Last theta = {theta:.4f}.",
details={'theta': theta, 'n_outer_iter': theta_max_iter},
)
# =====================================================================
# Backend selection
# =====================================================================
def _get_lm_backend(choice: BackendChoice, design: Design):
"""Select LM backend based on user choice and hardware availability."""
if choice == 'auto':
device = select_device('auto')
if device.device_type == 'cuda':
try:
from pystatistics.regression.backends.gpu import GPUQRBackend
return GPUQRBackend(device='cuda')
except ImportError:
warnings.warn(
"CUDA detected but PyTorch not available. "
"Falling back to CPU backend.",
RuntimeWarning,
stacklevel=3,
)
return CPUQRBackend()
# auto + MPS -> CPU (MPS is FP32-only, not reliable for auto)
# auto + CPU -> CPU
return CPUQRBackend()
elif choice in ('cpu', 'cpu_qr'):
return CPUQRBackend()
elif choice == 'cpu_svd':
raise NotImplementedError("CPU SVD backend not yet implemented")
elif choice in ('gpu', 'gpu_qr'):
device = select_device('gpu') # raises RuntimeError if no GPU
from pystatistics.regression.backends.gpu import GPUQRBackend
return GPUQRBackend(device=device.device_type)
else:
raise ValueError(f"Unknown backend: {choice!r}")
def _get_glm_backend(choice: BackendChoice):
"""Select GLM backend based on user choice and hardware availability."""
from pystatistics.regression.backends.cpu_glm import CPUIRLSBackend
if choice == 'auto':
device = select_device('auto')
if device.device_type == 'cuda':
try:
from pystatistics.regression.backends.gpu_glm import GPUIRLSBackend
return GPUIRLSBackend(device='cuda')
except ImportError:
warnings.warn(
"CUDA detected but GPU GLM backend not available. "
"Falling back to CPU IRLS.",
RuntimeWarning,
stacklevel=3,
)
return CPUIRLSBackend()
# auto + MPS -> CPU (MPS not auto-selected)
# auto + CPU -> CPU
return CPUIRLSBackend()
elif choice in ('cpu', 'cpu_qr'):
return CPUIRLSBackend()
elif choice in ('gpu', 'gpu_qr'):
device = select_device('gpu') # raises RuntimeError if no GPU
try:
from pystatistics.regression.backends.gpu_glm import GPUIRLSBackend
return GPUIRLSBackend(device=device.device_type)
except ImportError:
raise ImportError(
"GPU GLM backend requires PyTorch. "
"Install with: pip install torch"
)
else:
raise ValueError(f"Unknown backend: {choice!r}")