Source code for ler.lens_galaxy_population.cross_section_interpolator

# -*- coding: utf-8 -*-
"""
Module for gravitational lensing cross section interpolation.

This module provides highly optimized Numba-compiled functions for interpolating
gravitational lensing cross sections in a 5-dimensional parameter space using 
cubic B-spline interpolation on prefiltered coefficient grids.

The interpolation is performed based on: \n
- Ellipticity components (e1, e2) derived from axis ratio q and position angle phi \n
- Density profile slope (gamma) \n
- External shear components (gamma1, gamma2) \n

Key Features: \n
- JIT-compiled with Numba for high performance \n
- Parallel processing support for batch evaluations \n
- Cubic B-spline interpolation matching scipy's map_coordinates behavior \n
- Automatic scaling by Einstein radius and affine calibration \n

Usage:
    Basic workflow example:

    >>> from ler.lens_galaxy_population.cross_section_interpolator import make_cross_section_reinit
    >>> cs_func = make_cross_section_reinit(e1_grid, e2_grid, gamma_grid, ...)
    >>> cross_sections = cs_func(zs, zl, sigma, q, phi, gamma, gamma1, gamma2)

Copyright (C) 2024 Hemanta Kumar Phurailatpam. Distributed under MIT License.
"""

import numpy as np
from numba import njit, prange
from scipy import ndimage

from .lens_functions import phi_q2_ellipticity

[docs] C_LIGHT = 299792.458 # km/s
@njit def _bspline3(t): """ Evaluate the centered cubic B-spline basis function B3(t). Parameters ---------- t : ``float`` Distance from the spline center. \n The function has compact support on [-2, 2]. Returns ------- value : ``float`` The B-spline basis value at position t. \n - Maximum value of 2/3 at t=0 \n - Smoothly decays to 0 at |t|=2 \n - Returns 0 for |t| >= 2 \n """ at = abs(t) if at < 1.0: return (4.0 - 6.0 * at * at + 3.0 * at * at * at) / 6.0 elif at < 2.0: u = 2.0 - at return (u * u * u) / 6.0 else: return 0.0 @njit def _clamp_int(i, n): """ Clamp an index to valid array bounds using 'nearest' boundary mode. Parameters ---------- i : ``int`` The index to clamp. Can be negative or exceed array bounds. n : ``int`` The size of the array dimension (valid indices are 0 to n-1). Returns ------- clamped_index : ``int`` The clamped index in the range [0, n-1]. \n - If i < 0, returns 0 \n - If i > n-1, returns n-1 \n - Otherwise returns i unchanged \n """ if i < 0: return 0 if i > n - 1: return n - 1 return i @njit def _physical_to_index(x, x0, x1, n): """ Convert a physical coordinate to a fractional grid index. Parameters ---------- x : ``float`` The physical coordinate value to convert. x0 : ``float`` The minimum physical coordinate (maps to index 0). x1 : ``float`` The maximum physical coordinate (maps to index n-1). n : ``int`` The number of grid points in this dimension. Returns ------- fractional_index : ``float`` The fractional index in [0, n-1]. \n Non-integer values indicate positions between grid points. """ return (x - x0) * (n - 1) / (x1 - x0) # @njit(parallel=True) @njit def _map_coordinates_5d_cubic_nearest(coeff, ie1, ie2, ig, ig1, ig2): """ Perform 5D cubic B-spline interpolation on prefiltered coefficients. This function evaluates a cubic (order-3) B-spline interpolation in 5 dimensions. It matches the behavior of ``scipy.ndimage.map_coordinates`` with order=3, mode='nearest', and prefilter=False. Parameters ---------- coeff : ``numpy.ndarray`` Prefiltered B-spline coefficients with shape (n1, n2, n3, n4, n5). \n Should be the output of ``scipy.ndimage.spline_filter(data, order=3)``. ie1 : ``numpy.ndarray`` Fractional indices for the first dimension (e1), shape (N,). ie2 : ``numpy.ndarray`` Fractional indices for the second dimension (e2), shape (N,). ig : ``numpy.ndarray`` Fractional indices for the third dimension (gamma), shape (N,). ig1 : ``numpy.ndarray`` Fractional indices for the fourth dimension (gamma1), shape (N,). ig2 : ``numpy.ndarray`` Fractional indices for the fifth dimension (gamma2), shape (N,). Returns ------- out : ``numpy.ndarray`` Interpolated values at the specified fractional indices, shape (N,). """ n1, n2, n3, n4, n5 = coeff.shape N = ie1.size out = np.empty(N, dtype=np.float64) for k in prange(N): x1 = ie1[k] x2 = ie2[k] x3 = ig[k] x4 = ig1[k] x5 = ig2[k] b1 = int(np.floor(x1)) - 1 b2 = int(np.floor(x2)) - 1 b3 = int(np.floor(x3)) - 1 b4 = int(np.floor(x4)) - 1 b5 = int(np.floor(x5)) - 1 acc = 0.0 for a1 in range(4): j1 = _clamp_int(b1 + a1, n1) w1 = _bspline3(x1 - (b1 + a1)) if w1 == 0.0: continue for a2 in range(4): j2 = _clamp_int(b2 + a2, n2) w2 = _bspline3(x2 - (b2 + a2)) if w2 == 0.0: continue for a3 in range(4): j3 = _clamp_int(b3 + a3, n3) w3 = _bspline3(x3 - (b3 + a3)) if w3 == 0.0: continue for a4 in range(4): j4 = _clamp_int(b4 + a4, n4) w4 = _bspline3(x4 - (b4 + a4)) if w4 == 0.0: continue for a5 in range(4): j5 = _clamp_int(b5 + a5, n5) w5 = _bspline3(x5 - (b5 + a5)) if w5 == 0.0: continue acc += (w1 * w2 * w3 * w4 * w5) * coeff[j1, j2, j3, j4, j5] out[k] = acc return out @njit def _cross_section( zs, zl, sigma, q, phi, gamma, gamma1, gamma2, e1_grid, e2_grid, gamma_grid, gamma1_grid, gamma2_grid, cs_spline_coeff_grid, Da_instance, csunit_to_cs_slope, csunit_to_cs_intercept, ): """ Compute lensing cross sections via 5D B-spline interpolation. Parameters ---------- zs : ``numpy.ndarray`` Source redshifts, shape (N,). zl : ``numpy.ndarray`` Lens redshifts, shape (N,). sigma : ``numpy.ndarray`` Velocity dispersions (units: km/s), shape (N,). q : ``numpy.ndarray`` Axis ratios, shape (N,). phi : ``numpy.ndarray`` Position angles (units: radians), shape (N,). gamma : ``numpy.ndarray`` Density profile slopes, shape (N,). gamma1 : ``numpy.ndarray`` External shear component 1, shape (N,). gamma2 : ``numpy.ndarray`` External shear component 2, shape (N,). e1_grid : ``numpy.ndarray`` Grid values for ellipticity component e1, shape (n_e1,). e2_grid : ``numpy.ndarray`` Grid values for ellipticity component e2, shape (n_e2,). gamma_grid : ``numpy.ndarray`` Grid values for density slope gamma, shape (n_g,). gamma1_grid : ``numpy.ndarray`` Grid values for shear component gamma1, shape (n_g1,). gamma2_grid : ``numpy.ndarray`` Grid values for shear component gamma2, shape (n_g2,). cs_spline_coeff_grid : ``numpy.ndarray`` Prefiltered B-spline coefficients for cross section, \n shape (n_e1, n_e2, n_g, n_g1, n_g2). Da_instance : ``callable`` Angular diameter distance function. \n Signature: ``Da_instance(z) -> distance`` csunit_to_cs_slope : ``float`` Slope for affine calibration from unit cross section. csunit_to_cs_intercept : ``float`` Intercept for affine calibration from unit cross section. Returns ------- cs : ``numpy.ndarray`` Computed cross sections (units: radians^2), shape (N,). \n Negative values are clipped to zero. """ # Angular diameter distances Ds = Da_instance(zs) Dl = Da_instance(zl) Dls = (Ds * (1 + zs) - Dl * (1 + zl)) / (1 + zs) # Einstein radius theta_E = 4.0 * np.pi * (sigma / C_LIGHT) ** 2 * (Dls / Ds) # Convert (q, phi) -> (e1, e2) e1, e2 = phi_q2_ellipticity(phi, q) N = zs.size # Compute fractional indices ie1 = np.empty(N, dtype=np.float64) ie2 = np.empty(N, dtype=np.float64) ig = np.empty(N, dtype=np.float64) ig1 = np.empty(N, dtype=np.float64) ig2 = np.empty(N, dtype=np.float64) e1_min, e1_max = e1_grid[0], e1_grid[-1] e2_min, e2_max = e2_grid[0], e2_grid[-1] g_min, g_max = gamma_grid[0], gamma_grid[-1] g1_min, g1_max = gamma1_grid[0], gamma1_grid[-1] g2_min, g2_max = gamma2_grid[0], gamma2_grid[-1] n_e1 = e1_grid.size n_e2 = e2_grid.size n_g = gamma_grid.size n_g1 = gamma1_grid.size n_g2 = gamma2_grid.size for i in range(N): ie1[i] = _physical_to_index(e1[i], e1_min, e1_max, n_e1) ie2[i] = _physical_to_index(e2[i], e2_min, e2_max, n_e2) ig[i] = _physical_to_index(gamma[i], g_min, g_max, n_g) ig1[i] = _physical_to_index(gamma1[i], g1_min, g1_max, n_g1) ig2[i] = _physical_to_index(gamma2[i], g2_min, g2_max, n_g2) # Unit cross-section via cubic spline interpolation cs_unit = _map_coordinates_5d_cubic_nearest(cs_spline_coeff_grid, ie1, ie2, ig, ig1, ig2) # Scale by SIS area and apply affine calibration area_sis = np.pi * theta_E**2 cs = cs_unit * (csunit_to_cs_intercept + csunit_to_cs_slope * area_sis) cs = np.maximum(cs, 0.0) return cs
[docs] def make_cross_section_reinit( e1_grid, e2_grid, gamma_grid, gamma1_grid, gamma2_grid, cs_spline_coeff_grid, Da_instance, csunit_to_cs_slope=0.31830988618379075, csunit_to_cs_intercept=-3.2311742677852644e-27, ): """ Factory function to create a JIT-compiled cross section calculator. This function precomputes B-spline coefficients and creates a closure that captures the grid parameters, returning a fast Numba-compiled function for computing cross sections. Parameters ---------- e1_grid : ``numpy.ndarray`` Grid values for ellipticity component e1, shape (n_e1,). e2_grid : ``numpy.ndarray`` Grid values for ellipticity component e2, shape (n_e2,). gamma_grid : ``numpy.ndarray`` Grid values for density slope gamma, shape (n_g,). gamma1_grid : ``numpy.ndarray`` Grid values for shear component gamma1, shape (n_g1,). gamma2_grid : ``numpy.ndarray`` Grid values for shear component gamma2, shape (n_g2,). cs_spline_coeff_grid : ``numpy.ndarray`` Raw cross section grid data (before spline filtering), \n shape (n_e1, n_e2, n_g, n_g1, n_g2). Da_instance : ``callable`` Angular diameter distance function. \n Signature: ``Da_instance(z) -> distance`` csunit_to_cs_slope : ``float`` Slope for affine calibration from unit cross section. \n default: 0.31830988618379075 csunit_to_cs_intercept : ``float`` Intercept for affine calibration from unit cross section. \n default: -3.2311742677852644e-27 Returns ------- cross_section_reinit : ``callable`` JIT-compiled function with signature: \n ``cross_section_reinit(zs, zl, sigma, q, phi, gamma, gamma1, gamma2)`` \n Returns cross sections as ``numpy.ndarray`` of shape (N,). Examples -------- >>> from ler.lens_galaxy_population.cross_section_interpolator import make_cross_section_reinit >>> cs_func = make_cross_section_reinit( ... e1_grid, e2_grid, gamma_grid, gamma1_grid, gamma2_grid, ... cs_spline_coeff_grid, Da_instance ... ) >>> cross_sections = cs_func(zs, zl, sigma, q, phi, gamma, gamma1, gamma2) """ e1g = np.asarray(e1_grid, dtype=np.float64) e2g = np.asarray(e2_grid, dtype=np.float64) gg = np.asarray(gamma_grid, dtype=np.float64) g1g = np.asarray(gamma1_grid, dtype=np.float64) g2g = np.asarray(gamma2_grid, dtype=np.float64) cs = np.asarray(cs_spline_coeff_grid, dtype=np.float64) # Precompute spline coefficients cs_coeff = ndimage.spline_filter(cs, order=3) slope_given = float(csunit_to_cs_slope) intercept_given = float(csunit_to_cs_intercept) Da_function = njit(lambda z: Da_instance(z)) @njit def cross_section_reinit(zs, zl, sigma, q, phi, gamma, gamma1, gamma2): return _cross_section( zs=zs, zl=zl, sigma=sigma, q=q, phi=phi, gamma=gamma, gamma1=gamma1, gamma2=gamma2, e1_grid=e1g, e2_grid=e2g, gamma_grid=gg, gamma1_grid=g1g, gamma2_grid=g2g, cs_spline_coeff_grid=cs_coeff, Da_instance=Da_function, csunit_to_cs_slope=slope_given, csunit_to_cs_intercept=intercept_given, ) return cross_section_reinit