import numpy as np # type: ignore
from scipy.optimize import root_scalar # type: ignore
from scipy.integrate import simpson # type: ignore
import camb # type: ignore
from camb import model # type: ignore
import logging
from importlib import resources
from pathlib import Path
from urllib import request
from appdirs import user_cache_dir
import skops.io as skio
# Use relative imports to find the other modules in this package
from .cosmology import Cosmology
from .growth import GrowthCalculator
# Get a logger for this module
log = logging.getLogger(__name__)
[docs]
class AletheiaEmu:
"""Emulator for the non-linear matter power spectrum.
This class provides predictions for the non-linear matter power spectrum,
P_NL(k), for a given cosmology and redshift. It is based on a set of
Gaussian Process (GP) models trained on high-fidelity N-body simulations.
The emulation method combines a de-wiggled linear power spectrum with a
GP prediction for the non-linear boost factor, B(k), and a response
function, dR/dxtide, that captures the effects of different growth of
structure histories.
Attributes
----------
gp_B : sklearn.gaussian_process.GaussianProcessRegressor
The trained GP model for the non-linear boost factor.
gp_dRdx : sklearn.gaussian_process.GaussianProcessRegressor
The trained GP model for the response to xtilde.
correction_function : scipy.interpolate.RectBivariateSpline
A 2D spline object for correcting resolution effects in the prediction.
planck_means : np.ndarray
Mean values of [omega_b, omega_c, n_s] from Planck 2018.
eigenvecs : np.ndarray
Eigenvectors of the Planck 2018 covariance matrix.
planck_sigmas : np.ndarray
Standard deviations along each eigenvector direction.
"""
# --- Define model URLs pointing to .skops files ---
MODEL_URLS = {
"gp_B": "https://gitlab.mpcdf.mpg.de/arielsan/aletheia/-/raw/main/src/aletheiacosmo/data/Aletheia_GP_B_skl1.7.skops?ref_type=heads&inline=false",
"gp_dRdx": "https://gitlab.mpcdf.mpg.de/arielsan/aletheia/-/raw/main/src/aletheiacosmo/data/Aletheia_GP_dRdxt_skl1.7.skops?ref_type=heads&inline=false",
"correction": "https://gitlab.mpcdf.mpg.de/arielsan/aletheia/-/raw/main/src/aletheiacosmo/data/resolution_correction_skl1.7.skops?ref_type=heads&inline=false"
}
# --- Define cache directory for downloaded models ---
CACHE_DIR = Path(user_cache_dir("AletheiaCosmo", "AletheiaTeam"))
# --- Define trusted types for skops loading ---
TRUSTED_TYPES = ['numpy.ndarray',
'builtins.dict',
'builtins.tuple',
'builtins.list',
'builtins.str',
'numpy.float64',
'numpy.random.mtrand.RandomState',
'sklearn.gaussian_process.kernels.Matern',
'sklearn.gaussian_process._gpr.GaussianProcessRegressor',
'scipy.interpolate._fitpack2.RectBivariateSpline'
]
# --- Planck covariance matrix ---
PLANCK_FILE = "planck_2018_lcdm_parcov.dat"
def __init__(self):
log.info("AletheiaEmu instance created. Checking for models...")
# Ensure the cache directory exists
# Use self.CACHE_DIR to access the class attribute
self.CACHE_DIR.mkdir(parents=True, exist_ok=True)
# --- Define local file paths for LARGE models ---
gp_b_path = self.CACHE_DIR / "Aletheia_GP_B_skl1.7.skops"
gp_drdx_path = self.CACHE_DIR / "Aletheia_GP_dRdxt_skl1.7.skops"
correction_path = self.CACHE_DIR / "resolution_correction_skl1.7.skops"
# --- Download LARGE models if they are missing ---
self._download_if_missing(self.MODEL_URLS["gp_B"], gp_b_path)
self._download_if_missing(self.MODEL_URLS["gp_dRdx"], gp_drdx_path)
self._download_if_missing(self.MODEL_URLS["correction"], correction_path)
# --- Use skio.load to load emulator files and resolution correction ---
self.gp_B = skio.load(gp_b_path, trusted=self.TRUSTED_TYPES)
self.gp_dRdx = skio.load(gp_drdx_path, trusted=self.TRUSTED_TYPES)
self.correction_function = skio.load(correction_path, trusted=self.TRUSTED_TYPES)
# --- Load the bundled Planck data file ---
try:
# This uses importlib.resources to find the file *inside* the package
with resources.files('aletheiacosmo.data').joinpath(self.PLANCK_FILE).open('rb') as f:
covmat = np.loadtxt(f)
except FileNotFoundError as e:
raise FileNotFoundError(
"Planck covariance matrix file (planck_2018_lcdm_parcov.dat) was not found. "
"This file should be bundled with the package. Please reinstall AletheiaCosmo."
)
# --- Construct Planck 2018 required data for parameter validation ---
self.planck_means = np.array([0.02236164, 0.12071002, 0.96479956])
eigenvals, eigenvecs = np.linalg.eigh(covmat)
self.eigenvecs = eigenvecs
self.planck_sigmas = np.sqrt(eigenvals)
log.info("Emulator models and data loaded successfully.")
def _download_if_missing(self, url, filepath):
"""Helper function to download an emulator skop file if it doesn't exist locally."""
if not filepath.exists():
log.warning(f"Data file not found. Downloading {filepath.name} to cache: {filepath}")
try:
opener = request.build_opener()
opener.addheaders = [('User-agent', 'AletheiaCosmo-Downloader')]
request.install_opener(opener)
request.urlretrieve(url, filepath)
log.info("Download complete.")
except Exception as e:
log.error(f"Failed to download model from {url}. Error: {e}")
raise RuntimeError(f"Could not download model data. Please check your internet connection or the model URL.")
[docs]
def get_pnl(self, kvec, cospar, z):
"""Calculates the non-linear matter power spectrum.
This is the main method of the emulator. It takes a set of wavenumbers,
a cosmology, and a redshift, and returns the emulated P_NL(k).
Parameters
----------
kvec : array_like
Array of wavenumbers, k, in units of **1/Mpc**.
Must be within the emulator's valid range [0.006, 2.0] 1/Mpc.
cospar : dict
A dictionary of cosmological parameters such as the one created
by `create_cosmo_dict`.
z : float
The redshift at which to calculate the power spectrum.
Returns
-------
np.ndarray
The non-linear matter power spectrum, P_NL(k), in units of **Mpc^3**.
Raises
------
ValueError
If any k-values in `kvec` are outside the emulator's valid
training range [0.006, 2.0] 1/Mpc.
ValueError
If the input cosmology fails the validation checks (e.g., sigma12
is out of range [0.2, 1.0] or shape parameters are out of
the 5-sigma Planck box).
Notes
-----
The calculation involves several steps:
1. Input parameters are validated (k-range, sigma12, shape).
2. A `Cosmology` object is created to compute linear spectra.
3. The non-linear boost `B(k)` and response `dR/dxi` are predicted by GPs.
4. The parameter `xtilde` is computed for the target and a reference cosmology.
5. All components are combined and a final resolution correction is applied.
6. A warning is logged if the resolution correction is > 1% at any scale.
"""
log.info(f"Received request for P_nl at z={z:.2f}")
kvec = np.atleast_1d(kvec)
z = float(z)
# --- k-vector validation (units are 1/Mpc) ---
if np.any(kvec < 0.006) or np.any(kvec > 2.0):
msg = (f"k-values are outside the valid emulator range [0.006, 2.0] 1/Mpc. "
f"Found min={np.min(kvec):.4f}, max={np.max(kvec):.4f}")
log.error(msg)
raise ValueError(msg)
log.info("Initializing cosmology engine for target cosmology.")
cosmology_engine = Cosmology(cospar)
sigma12 = cosmology_engine.get_sigma12(z)
self._validate_params(cospar, sigma12)
# --- Correction factor for resolution effects ---
correction = self.correction_function(kvec, sigma12).flatten()
correction_factors = 1.0 / correction
max_correction = np.max(correction_factors)
if max_correction > 1.03:
k_at_max_corr = kvec[np.argmax(correction_factors)]
log.warning(f"Resolution correction is > 3% (max: {max_correction:.3f}x "
f"at k={k_at_max_corr:.2f} 1/Mpc). ")
cosmology_engine.compute_all_spectra(sigma12)
pdw_use = cosmology_engine.get_pdw(kvec)
log.info("Computing emulator predictions from GPs.")
x_combined = self._build_gp_input(kvec, cospar, sigma12)
y_pred_B = self.gp_B.predict(x_combined)
y_pred_dRdx = self.gp_dRdx.predict(x_combined)
B = np.exp(y_pred_B)
dRdx = y_pred_dRdx
log.info("Computing xtilde for target and reference cosmologies.")
xtilde = self._get_xtilde(z, cosmology_engine.growth)
cospar_ref = cospar.copy()
cospar_ref.update({'w_0':-1.0, 'w_a':0.0, 'A_s':2.101e-9, 'h':0.673})
growth_ref = GrowthCalculator(cospar_ref)
z0 = self._get_redshift(
sigma12, # The value to match
growth_ref, # The reference growth engine
cosmology_engine, # The target cosmology engine
cospar_ref # The reference parameter dict
)
xtilde_0 = self._get_xtilde(z0, growth_ref)
log.debug(f"Target xtilde={xtilde:.4f}. Reference z0={z0:.4f}, xtilde_0={xtilde_0:.4f}")
log.info("Combining components for final prediction.")
dxtilde = xtilde - xtilde_0
Pnl_uncorrected = pdw_use * B * (1. + dRdx * dxtilde)
# Apply final resolution correction
Pnl_final = Pnl_uncorrected * correction_factors
log.info("P_nl calculation complete.")
return Pnl_final
def _build_gp_input(self, kvec, cospar, sigma12):
"""Constructs the 2D input array for the Gaussian Process models."""
lkvec = np.log(kvec)
# Reshape all inputs to be (N, 1) column vectors
x1 = lkvec.reshape(-1, 1)
x2 = np.full_like(x1, cospar['omega_b'])
x3 = np.full_like(x1, cospar['omega_c'])
x4 = np.full_like(x1, cospar['n_s'])
x5 = np.full_like(x1, sigma12)
return np.concatenate((x1, x2, x3, x4, x5), axis=1)
def _validate_params(self, cospar, sigma12):
"""
Checks if the input cosmology is within the valid range of the emulator.
This function performs two checks:
1. It verifies if sigma12 falls within the emulator's trained range
of [0.2, 1.0].
2. It transforms the shape parameters (omega_b, omega_c, n_s) into the
eigenvector basis of the Planck 2018 covariance matrix and checks
that each projected component is within a +/- 5-sigma box.
Args:
cospar (dict): The input cosmology dictionary.
sigma12 (array_like): The input value of sigma12 (as a 1-element array).
Raises:
ValueError: If any parameter falls outside the valid range.
"""
log.info("Validating input parameters...")
# Extract the scalar value for the check and logging
sigma12_val = sigma12[0]
# --- Validate sigma12 ---
log.debug(f"Checking sigma12 = {sigma12_val:.4f}")
if not (0.2 <= sigma12_val <= 1.0):
raise ValueError(
f"Validation failed: Calculated sigma12 ({sigma12_val:.4f}) is outside "
f"the valid emulator range of [0.2, 1.0]."
)
# --- Use the scalar value for logging ---
log.info(f" sigma12 = {sigma12_val:.4f} (OK)")
# --- Validate shape parameters in Planck eigenbasis ---
log.debug("Checking shape parameters against Planck prior box...")
input_params = np.array([cospar['omega_b'], cospar['omega_c'], cospar['n_s']])
centered_params = input_params - self.planck_means
projected_params = self.eigenvecs.T @ centered_params
deviations = np.abs(projected_params) / self.planck_sigmas
log.debug(f"Parameter deviations (in sigmas): {deviations}")
if np.any(deviations > 5.0):
failed_axis = np.argmax(deviations)
msg = (
f"Validation failed: Shape parameters are outside the 5-sigma Planck prior box.\n"
f" - Problem is in eigenvector direction {failed_axis}.\n"
f" - Deviation is {deviations[failed_axis]:.2f} sigma (limit is 5.0 sigma)."
)
log.error(msg)
raise ValueError(msg)
log.info("Shape parameters are within 5-sigma box (OK)")
def _get_xtilde(self, z, growth_obj):
"""Calculates the smoothed growth-dependent parameter xtilde."""
# ... (This logic is unchanged) ...
eta = np.log(growth_obj.Dgrowth(z))
eta_vec = np.linspace(eta-0.5, eta, 300)
x_vec = growth_obj.X_tau(eta_vec)
xtilde = simpson(AletheiaEmu.gaussian_kernel(eta, eta_vec, 0.12)*x_vec, eta_vec)
return xtilde
def _get_redshift(self, target_sigma12, growth_ref, cosmo_engine_target, cospar_ref):
"""Finds the redshift z0 in a reference cosmology with the same sigma12.
This is an internal root-finding method. It finds the redshift `z0`
at which a standard reference LCDM cosmology has a sigma_12 value
equal to the `target_sigma12` of the user's input cosmology.
Parameters
----------
target_sigma12 : float
The sigma_12 value of the target cosmology that we want to match.
growth_ref : GrowthCalculator
An initialized GrowthCalculator instance for the reference LCDM cosmology.
cosmo_engine_target : Cosmology
The initialized Cosmology instance for the target cosmology.
cospar_ref : dict
The cosmology dictionary for the reference LCDM cosmology.
Returns
-------
float
The redshift, z0, in the reference cosmology.
"""
# Get D(z=0) for the reference cosmology
D_ref_0 = growth_ref.Dgrowth(0.)
# Calculate the expected sigma12 at z=0 for the reference cosmology.
# This is done by taking the sigma12(z=0) from the target cosmology's CAMB run
# and rescaling it by the ratio of sqrt(A_s) values.
# The ratio of D(0) values is a small correction for different normalizations.
sigma12_ref_0 = (cosmo_engine_target.sigma12_0 *
np.sqrt(cospar_ref['A_s'] / cosmo_engine_target.parameters.InitPower.As) *
(D_ref_0 / cosmo_engine_target.D0))
# This nested function describes the evolution of sigma12 in the reference model
def sig12_in_ref_cosmology(z):
return sigma12_ref_0 * growth_ref.Dgrowth(z) / D_ref_0
# The function whose root we want to find: f(z) = sigma12_ref(z) - target = 0
def delta_sigma12(z):
return sig12_in_ref_cosmology(z) - target_sigma12
# Use a robust root-finding algorithm to find z0
try:
result = root_scalar(delta_sigma12, bracket=[-0.9, 4.4], method='brentq')
if not result.converged:
raise RuntimeError("Root-finding for z0 did not converge.")
return result.root
except ValueError as e:
# This can happen if delta_sigma12 has the same sign at both ends of the bracket
raise ValueError(f"Could not find a valid z0 for target sigma12={target_sigma12}. "
f"The value may be outside the solvable range. Original error: {e}")
[docs]
@staticmethod
def gaussian_kernel(tau, tau_prime, tau_s):
"""Computes a normalized Gaussian kernel."""
return np.exp(-((tau - tau_prime) ** 2) / (2 * tau_s ** 2)) *2./ (np.sqrt(2 * np.pi) * tau_s)
[docs]
@staticmethod
def create_cosmo_dict(h, omega_b=None, omega_c=None, omega_k=0., Omega_b=None, Omega_c=None,
Omega_k=0., A_s=2.1e-9, n_s=0.96, w_0=-1.0, w_a=0.0,
model='LCDM', density_type='physical'):
"""
Creates a standardized cosmology dictionary for the Aletheia Emulator.
Args:
h (float): The Hubble parameter. Required for all conversions.
omega_b, omega_c, omega_k (float, optional): Physical baryon/CDM densities.
Omega_b, Omega_c, Omega_nu (float, optional): Fractional baryon/CDM densities.
... (other parameters with defaults)
model (str, optional): Cosmological model, e.g., 'LCDM' or 'w0waCDM'.
density_type (str, optional): 'physical' (little omega) or 'fractional' (big Omega).
Returns:
dict: A validated dictionary ready for the emulator.
"""
cospar = {}
# --- Handle density parameter conversion ---
if density_type == 'physical':
if omega_b is None or omega_c is None:
raise ValueError("For 'physical' density_type, 'omega_b', 'omega_c', and 'omega_k' must be provided.")
cospar['omega_b'] = omega_b
cospar['omega_c'] = omega_c
cospar['omega_k'] = omega_k
cospar['omega_nu'] = 0. # The current version assumes massless neutrinos
elif density_type == 'fractional':
if Omega_b is None or Omega_c is None:
raise ValueError("For 'fractional' density_type, 'Omega_b', 'Omega_c', and 'Omega_k' must be provided.")
cospar['omega_b'] = Omega_b * h**2
cospar['omega_c'] = Omega_c * h**2
cospar['omega_k'] = Omega_k * h**2
cospar['omega_nu'] = 0. # The current version assumes massless neutrinos
else:
raise ValueError(f"Unknown density_type: '{density_type}'")
# --- Compute derived parameters ---
cospar['omega_de'] = h**2 - cospar['omega_c'] - cospar['omega_b'] - cospar['omega_k'] - cospar['omega_nu']
# --- Set remaining parameters ---
cospar['h'] = h
cospar['A_s'] = A_s
cospar['n_s'] = n_s
# --- Handle model-specific defaults ---
if model.upper() == 'LCDM':
cospar['w_0'] = -1.0
cospar['w_a'] = 0.0
elif model.upper() == 'w0waCDM':
cospar['w_0'] = w_0
cospar['w_a'] = w_a
else:
raise ValueError(f"Unknown model: '{model}'")
return cospar