"""
Module for optical depth and lens parameter distribution calculations.
This module provides the ``OpticalDepth`` class for computing strong lensing
optical depth, cross-section, sampling velocity dispersion, axis ratio, and other lens galaxy
population parameters. It supports multiple lens models including SIS, SIE,
and EPL + external shear.
Key features: \n
- Optical depth computation for strong gravitational lensing \n
- Velocity dispersion sampling with multiple models \n
- Lens redshift distribution sampling \n
- Cross-section calculations for various lens models \n
Copyright (C) 2024 Hemantakumar Phurailatpam. Distributed under MIT License.
"""
from numba import njit, get_num_threads, set_num_threads
from multiprocessing import Pool
import numpy as np
from scipy.integrate import quad
from scipy.interpolate import CubicSpline
from astropy.cosmology import LambdaCDM
from tqdm import tqdm
from ..utils import (
cubic_spline_interpolator,
inverse_transform_sampler,
cubic_spline_interpolator2d,
save_json,
load_json,
interpolator_json_path,
FunctionConditioning,
inverse_transform_sampler2d,
pdf_cubic_spline_interpolator2d,
comoving_distance,
angular_diameter_distance,
angular_diameter_distance_z1z2,
differential_comoving_volume,
generate_mixed_grid,
)
from .lens_functions import (
phi_cut_SIE,
cross_section,
)
from ..image_properties.cross_section_njit import phi_q2_ellipticity
from .mp import cross_section_mp
@njit()
def _seed_numba_rng(seed):
"""Seed Numba's RNG so njit samplers are reproducible across calls."""
np.random.seed(seed)
[docs]
class OpticalDepth:
"""
Class for computing optical depth and lens galaxy population parameters.
This class calculates strong lensing optical depth, velocity dispersion,
axis ratio, and other parameters for a lens galaxy population. It supports
SIS, SIE, and EPL + external shear lens models with customizable samplers
and interpolators for efficient computation.
Key Features: \n
- Multiple lens model support (SIS, SIE, EPL + shear) \n
- Configurable velocity dispersion distributions \n
- Cached interpolators for fast optical depth computation \n
- Flexible parameter sampling with user-defined priors \n
Parameters
----------
npool : ``int``
Number of processors for multiprocessing. \n
default: 4
z_min : ``float``
Minimum redshift of the lens galaxy population. \n
default: 0.0
z_max : ``float``
Maximum redshift of the lens galaxy population. \n
default: 10.0
cosmology : ``astropy.cosmology`` or ``None``
Cosmology object for distance calculations. \n
default: LambdaCDM(H0=70, Om0=0.3, Ode0=0.7)
lens_type : ``str``
Type of lens galaxy model. \n
Options: \n
- 'epl_shear_galaxy': Elliptical power-law with external shear \n
- 'sie_galaxy': Singular isothermal ellipsoid \n
- 'sis_galaxy': Singular isothermal sphere \n
default: 'epl_shear_galaxy'
lens_functions : ``dict`` or ``None``
Dictionary with lens-related functions. \n
default: None (uses defaults for lens_type)
lens_functions_params : ``dict`` or ``None``
Dictionary with parameters for lens-related functions. \n
default: None
lens_priors : ``dict`` or ``None``
Dictionary of sampler functions for lens parameters. \n
default: None (uses defaults for lens_type)
lens_priors_params : ``dict`` or ``None``
Dictionary with parameters for the samplers. \n
default: None
directory : ``str``
Directory where interpolators are saved. \n
default: './interpolator_json'
create_new_interpolator : ``bool`` or ``dict``
Whether to create new interpolators. \n
default: False
verbose : ``bool``
If True, prints additional information. \n
default: False
Examples
--------
Basic usage:
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> tau = od.optical_depth(zs=np.array([1.0, 2.0]))
"""
def __init__(
self,
npool=4,
z_min=0.0,
z_max=10.0,
cosmology=None,
lens_type="epl_shear_galaxy",
lens_functions=None,
lens_functions_params=None,
lens_priors=None,
lens_priors_params=None,
directory="./interpolator_json",
create_new_interpolator=False,
verbose=False,
):
print("\nInitializing OpticalDepth class\n")
if lens_type not in ["sie_galaxy", "epl_shear_galaxy", "sis_galaxy"]:
raise ValueError(
"lens_type not in ['sie_galaxy', 'epl_shear_galaxy', 'sis_galaxy']"
)
self.lens_type = lens_type
self.npool = npool
self.z_min = z_min
self.z_max = z_max
self.cosmo = (
cosmology
if cosmology
else LambdaCDM(
H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0
)
)
self.directory = directory
# Initialize decision dictionary for creating interpolators
self._initialize_decision_dictionary(create_new_interpolator, lens_type)
# Update lens functions and samplers with user input
self._lens_functions_and_sampler_categorization(
lens_type,
lens_priors,
lens_priors_params,
lens_functions,
lens_functions_params,
)
# Initialize cosmological functions
[docs]
self.comoving_distance = comoving_distance(
z_min=self.z_min,
z_max=self.z_max,
cosmo=self.cosmo,
directory=self.directory,
create_new=self.create_new_interpolator["comoving_distance"]["create_new"],
resolution=self.create_new_interpolator["comoving_distance"]["resolution"],
get_attribute=True,
)
[docs]
self.angular_diameter_distance = angular_diameter_distance(
z_min=self.z_min,
z_max=self.z_max,
cosmo=self.cosmo,
directory=self.directory,
create_new=self.create_new_interpolator["angular_diameter_distance"][
"create_new"
],
resolution=self.create_new_interpolator["angular_diameter_distance"][
"resolution"
],
get_attribute=True,
)
[docs]
self.angular_diameter_distance_z1z2 = angular_diameter_distance_z1z2(
z_min=self.z_min,
z_max=self.z_max,
cosmo=self.cosmo,
directory=self.directory,
create_new=self.create_new_interpolator["angular_diameter_distance_z1z2"][
"create_new"
],
resolution=self.create_new_interpolator["angular_diameter_distance_z1z2"][
"resolution"
],
get_attribute=True,
)
[docs]
self.differential_comoving_volume = differential_comoving_volume(
z_min=self.z_min,
z_max=self.z_max,
cosmo=self.cosmo,
directory=self.directory,
create_new=self.create_new_interpolator["differential_comoving_volume"][
"create_new"
],
resolution=self.create_new_interpolator["differential_comoving_volume"][
"resolution"
],
get_attribute=True,
)
# lens sampler initialization
self.velocity_dispersion = self.lens_priors["velocity_dispersion"]
self.axis_ratio = self.lens_priors["axis_ratio"]
self.axis_rotation_angle = self.lens_priors["axis_rotation_angle"]
self.density_profile_slope = self.lens_priors["density_profile_slope"]
self.external_shear1 = self.lens_priors["external_shear1"]
self.external_shear2 = self.lens_priors["external_shear2"]
# cross section initialization
self.cross_section = self.lens_functions["cross_section"]
# # lens redshift initialization
self.lens_redshift_sl = self.lens_priors["lens_redshift_sl"]
self.lens_redshift = self.lens_priors["lens_redshift"]
# lens function initialization
self.optical_depth = self.lens_functions["optical_depth"]
# --------------------
# Initialization helpers
# --------------------
def _default_lens_priors_and_functions(self, lens_type):
"""
Helper to get default lens samplers and functions based on lens type.
Parameters
----------
lens_type : ``str``
Type of lens galaxy model.
Returns
-------
lens_priors : ``dict``
Dictionary of sampler names.
lens_priors_params : ``dict``
Dictionary of sampler parameters.
lens_functions : ``dict``
Dictionary of lens function names.
lens_functions_params : ``dict``
Dictionary of lens function parameters.
"""
if lens_type == "epl_shear_galaxy":
lens_priors = dict(
zs_sl="strongly_lensed_source_redshift",
lens_redshift_sl="lens_redshift_strongly_lensed_numerical",
lens_redshift="lens_redshift_intrinsic_numerical",
velocity_dispersion="velocity_dispersion_ewoud",
axis_ratio="rayleigh",
axis_rotation_angle="uniform",
external_shear1="normal",
external_shear2="normal",
density_profile_slope="normal",
)
lens_priors_params = dict(
zs_sl=None,
lens_redshift_sl=dict(
param_name = "lens_redshift_sl",
sampler_type = "lens_redshift_strongly_lensed_numerical",
lens_type = lens_type,
integration_size=50000,
use_multiprocessing=False,
cross_section_epl_shear_interpolation=False,
),
lens_redshift=None,
velocity_dispersion=dict(
param_name='velocity_dispersion', sampler_type='velocity_dispersion_ewoud',
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
axis_ratio=dict(
param_name='axis_ratio', sampler_type='rayleigh',
q_min=0.2, q_max=1.0
),
axis_rotation_angle=dict(
param_name='axis_rotation_angle', sampler_type='uniform',
x_min=0.0, x_max=2 * np.pi
),
external_shear1=dict(
param_name='external_shear1', sampler_type='normal',
mu=0.0, sigma=0.05
),
external_shear2=dict(
param_name='external_shear2', sampler_type='normal',
mu=0.0, sigma=0.05
),
density_profile_slope=dict(
param_name='density_profile_slope', sampler_type='normal',
mu=1.99, sigma=0.149
),
)
lens_functions = dict(
param_sampler_type="epl_shear_sl_parameters_rvs",
cross_section_based_sampler="importance_sampler_partial",
optical_depth="optical_depth_numerical",
cross_section="cross_section_epl_shear_njit",
)
lens_functions_params = dict(
param_sampler_type=None,
cross_section_based_sampler=dict(
n_prop=50,
threshold_factor=1e-4,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=1.4,
gamma_max=2.7,
shear_min=-0.22,
shear_max=0.20,
),
optical_depth=dict(param_name="optical_depth", function_type="optical_depth_numerical"),
cross_section=dict(
num_th=500, maginf=-100.0
),
)
elif lens_type == "sie_galaxy":
lens_priors = dict(
zs_sl="strongly_lensed_source_redshift",
lens_redshift_sl="lens_redshift_strongly_lensed_numerical",
lens_redshift="lens_redshift_intrinsic_numerical",
velocity_dispersion="velocity_dispersion_ewoud",
axis_ratio="rayleigh",
axis_rotation_angle="uniform",
external_shear1="constant_values_n_size",
external_shear2="constant_values_n_size",
density_profile_slope="constant_values_n_size",
)
lens_priors_params = dict(
zs_sl=None,
lens_redshift_sl=dict(
param_name = "lens_redshift_sl",
sampler_type = "lens_redshift_strongly_lensed_numerical",
lens_type = lens_type,
integration_size=50000,
use_multiprocessing=False,
cross_section_epl_shear_interpolation=False,
),
lens_redshift=None,
velocity_dispersion=dict(
param_name='velocity_dispersion', sampler_type='velocity_dispersion_ewoud',
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
axis_ratio=dict(
param_name='axis_ratio', sampler_type='rayleigh',
q_min=0.2, q_max=1.0
),
axis_rotation_angle=dict(
param_name='axis_rotation_angle', sampler_type='uniform',
x_min=0.0, x_max=2 * np.pi
),
external_shear1=dict(param_name='external_shear1', sampler_type= 'constant_values_n_size', value=0.0),
external_shear2=dict(param_name='external_shear2', sampler_type= 'constant_values_n_size', value=0.0),
density_profile_slope=dict(param_name='density_profile_slope', sampler_type='constant_values_n_size', value=2.0),
)
lens_functions = dict(
param_sampler_type="epl_shear_sl_parameters_rvs",
cross_section_based_sampler="importance_sampler_partial",
optical_depth="optical_depth_numerical",
cross_section="cross_section_sie_feixu",
)
lens_functions_params = dict(
param_sampler_type=None,
cross_section_based_sampler=dict(
n_prop=50,
threshold_factor=0.0,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=2.0,
gamma_max=2.0,
shear_min=0.0,
shear_max=0.0,
),
optical_depth=dict(param_name="optical_depth", function_type="optical_depth_numerical"),
cross_section=None,
)
elif lens_type == "sis_galaxy":
lens_priors = dict(
zs_sl="strongly_lensed_source_redshift",
lens_redshift_sl="lens_redshift_strongly_lensed_numerical",
lens_redshift="lens_redshift_intrinsic_numerical",
velocity_dispersion="velocity_dispersion_ewoud",
axis_ratio="constant_values_n_size",
axis_rotation_angle="constant_values_n_size",
external_shear1="constant_values_n_size",
external_shear2="constant_values_n_size",
density_profile_slope="constant_values_n_size",
)
lens_priors_params = dict(
zs_sl=None,
lens_redshift_sl=dict(
param_name = "lens_redshift_sl",
sampler_type = "lens_redshift_strongly_lensed_numerical",
lens_type = lens_type,
integration_size=50000,
use_multiprocessing=False,
cross_section_epl_shear_interpolation=False,
),
lens_redshift=None,
velocity_dispersion=dict(
param_name='velocity_dispersion', sampler_type='velocity_dispersion_ewoud',
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
axis_ratio=dict(
param_name='axis_ratio', sampler_type='constant_values_n_size',
value=1.0
),
axis_rotation_angle=dict(
param_name='axis_rotation_angle', sampler_type='constant_values_n_size',
value=0.0
),
external_shear1=dict(
param_name='external_shear1', sampler_type='constant_values_n_size',
value=0.0
),
external_shear2=dict(
param_name='external_shear2', sampler_type='constant_values_n_size',
value=0.0
),
density_profile_slope=dict(
param_name='density_profile_slope', sampler_type='constant_values_n_size',
value=2.0
),
)
lens_functions = dict(
param_sampler_type="epl_shear_sl_parameters_rvs",
cross_section_based_sampler="importance_sampler_partial",
optical_depth="optical_depth_numerical",
cross_section="cross_section_sis",
)
lens_functions_params = dict(
param_sampler_type=None,
cross_section_based_sampler=dict(
n_prop=50,
threshold_factor=0.0,
sigma_min=100.0,
sigma_max=400.0,
q_min=1.0,
q_max=1.0,
phi_min=0.0,
phi_max=0.0,
gamma_min=2.0,
gamma_max=2.0,
shear_min=0.0,
shear_max=0.0,
),
optical_depth=dict(param_name="optical_depth", function_type="optical_depth_numerical"),
cross_section=None, # if interpolation, you should provide dict(num_th=500, maginf=-1000.0)
)
else:
raise ValueError(
"lens_type should be 'epl_shear_galaxy' or 'sie_galaxy' or 'sis_galaxy'"
)
return (
lens_priors,
lens_priors_params,
lens_functions,
lens_functions_params,
)
def _initialize_decision_dictionary(self, create_new_interpolator, lens_type):
"""
Helper to initialize decision dictionary for creating interpolators.
Parameters
----------
create_new_interpolator : ``dict`` or ``bool``
Configuration for creating new interpolators.
lens_type : ``str``
Type of lens galaxy model.
"""
# preserve user input and initialize defaults
user_create_new_interpolator = create_new_interpolator
create_new_interpolator = dict(
velocity_dispersion=dict(
create_new=False, resolution=200, zl_resolution=48, cdf_size=400
),
axis_ratio=dict(
create_new=False, resolution=200, sigma_resolution=48, cdf_size=400
),
lens_redshift_sl=dict(
create_new=False, resolution=16, zl_resolution=48, cdf_size=400
),
lens_redshift=dict(
create_new=False, resolution=16, zl_resolution=48, cdf_size=400
),
optical_depth=dict(create_new=False, resolution=16, cdf_size=500),
comoving_distance=dict(create_new=False, resolution=500),
angular_diameter_distance=dict(create_new=False, resolution=500),
angular_diameter_distance_z1z2=dict(create_new=False, resolution=500),
differential_comoving_volume=dict(create_new=False, resolution=500),
# density_profile_slope=dict(create_new=False, resolution=100),
zs_sl=dict(create_new=False, resolution=200, cdf_size=500),
)
# the following resolution elements are for the cross section
# interpolator, [25, 25, 45, 15, 15] corresponds to the resolution for
# e1, e2, gamma, gamma1, gamma2
spacing_config = {
"e1": {
"mode": "two_sided_mixed_grid",
"power_law_part": "lower",
"spacing_trend": "increasing",
"power": 2.5,
"value_transition_fraction": 0.7,
"num_transition_fraction": 0.9,
"auto_match_slope": True
},
"e2": {
"mode": "two_sided_mixed_grid",
"power_law_part": "lower",
"spacing_trend": "increasing",
"power": 2.5,
"value_transition_fraction": 0.7,
"num_transition_fraction": 0.9,
"auto_match_slope": True
},
"gamma": {
"mode": "two_sided_mixed_grid",
"power_law_part": "lower",
"spacing_trend": "increasing",
"power": 2.5,
"value_transition_fraction": 0.7,
"num_transition_fraction": 0.9,
"auto_match_slope": True
},
"gamma1": {
"mode": "two_sided_mixed_grid",
"power_law_part": "lower",
"spacing_trend": "increasing",
"power": 2.5,
"value_transition_fraction": 0.7,
"num_transition_fraction": 0.9,
"auto_match_slope": True
},
"gamma2": {
"mode": "two_sided_mixed_grid",
"power_law_part": "lower",
"spacing_trend": "increasing",
"power": 2.5,
"value_transition_fraction": 0.7,
"num_transition_fraction": 0.9,
"auto_match_slope": True
},
}
if lens_type == "sis_galaxy":
create_new_interpolator.update(
cross_section=dict(create_new=False, resolution=[5, 5, 5, 5, 5], spacing_config=spacing_config),
)
elif lens_type == "sie_galaxy":
create_new_interpolator.update(
cross_section=dict(create_new=False, resolution=[25, 25, 5, 5, 5], spacing_config=spacing_config ),
)
elif lens_type == "epl_shear_galaxy":
create_new_interpolator.update(
cross_section=dict(create_new=False, resolution=[25, 25, 45, 15, 15], spacing_config=spacing_config),
)
if isinstance(user_create_new_interpolator, dict):
for key, value in user_create_new_interpolator.items():
if (
key in create_new_interpolator
and isinstance(create_new_interpolator[key], dict)
and isinstance(value, dict)
):
create_new_interpolator[key].update(value)
else:
create_new_interpolator[key] = value
# if create_new_interpolator is True, create new interpolator for all
elif user_create_new_interpolator:
for key in create_new_interpolator.keys():
create_new_interpolator[key]["create_new"] = True
# update the create_new_interpolator
try:
if self.create_new_interpolator is None:
self.create_new_interpolator = create_new_interpolator
else:
self.create_new_interpolator.update(create_new_interpolator)
except AttributeError:
self.create_new_interpolator = create_new_interpolator
def _lens_functions_and_sampler_categorization(
self,
lens_type,
lens_priors,
lens_priors_params,
lens_functions,
lens_functions_params,
):
"""
Helper to categorize, set and update lens functions and samplers.
There are three levels.
1. Set default lens functions and samplers based on lens type. \n
2. If there is user input, update the lens functions and samplers with user input. \n
3. Check the validity of user input and update the lens functions and samplers. User input can be different from the level (1). It can either be one of the internal functions or a user-defined function. If it is one of the internal functions, the parameters will be updated (missing parameters will be taken care of) with user input if given, otherwise it will use the default parameters. If it is a user-defined function, it will be directly used without any parameter update. \n
Parameters
----------
lens_priors : ``dict`` or ``None``
User-provided sampler names or sampler instances. For custom samplers,
pass an instance that provides both ``.rvs(...)`` and ``.pdf(...)``
(e.g. a ``ler.utils.FunctionConditioning`` object). Plain callable
sampler functions are not supported.
lens_priors_params : ``dict`` or ``None``
User-provided sampler parameters.
Nested dicts merge into existing defaults key-by-key (outer keys unchanged), so overriding e.g.\ ``cross_section_epl_shear_interpolation`` does not drop other fields for that sampler.
lens_functions : ``dict`` or ``None``
User-provided lens function names or functions.
lens_functions_params : ``dict`` or ``None``
User-provided lens function parameters.
"""
# 1. Set default lens functions and samplers based on lens type.
(
self.lens_priors,
self.lens_priors_params,
self.lens_functions,
self.lens_functions_params,
) = self._default_lens_priors_and_functions(lens_type)
# 2. If there is user input, update the lens functions and samplers with user input.
# update the priors if input is given
if lens_priors:
self.lens_priors.update(lens_priors)
if lens_priors_params:
# Nested merge per sampler category: shallow dict.update replaces whole
# inner dicts and would silently drop unrelated keys unless the caller
# repeats every default field (easy to miss ``cross_section_epl_shear_*``).
for key, value in lens_priors_params.items():
if isinstance(value, dict) and isinstance(
self.lens_priors_params.get(key), dict
):
self.lens_priors_params[key].update(value)
else:
self.lens_priors_params[key] = value
if lens_functions:
self.lens_functions.update(lens_functions)
if lens_functions_params:
self.lens_functions_params.update(lens_functions_params)
# 3. Check the validity of user input and update the lens functions and samplers.
# EPL + Shear default maginf=-100.0
if self.lens_functions['cross_section'] == 'cross_section_epl_shear_interpolation':
if self.lens_type == 'sis_galaxy':
self.lens_functions_params["cross_section"].update(dict(maginf=-10000.0)) # maginf for SIS is set wrt to analytic cross section
elif self.lens_type == 'sie_galaxy':
self.lens_functions_params['cross_section'].update(dict(maginf=-10000.0)) # maginf for SIE is set wrt to analytic cross section
sampler_prior_names = [
"zs_sl",
"lens_redshift_sl",
"lens_redshift",
"velocity_dispersion",
"axis_ratio",
"axis_rotation_angle",
"external_shear1",
"external_shear2",
"density_profile_slope",
]
def _has_rvs_and_pdf(obj):
return (
obj is not None
and hasattr(obj, "rvs")
and callable(getattr(obj, "rvs"))
and hasattr(obj, "pdf")
and callable(getattr(obj, "pdf"))
)
# Reconcile sampler parameter dicts: ``available_lens_priors`` defaults merged with ``self``.
# ``lens_priors`` selects optional overrides; callers may pass only nested ``lens_priors_params``.
for name in sampler_prior_names: # e.g. name='axis_ratio'
if lens_priors is not None and name in lens_priors:
sampler_candidate = lens_priors[name]
else:
sampler_candidate = self.lens_priors.get(name)
# Custom sampler object: no template lookup from ``available_lens_priors``.
if isinstance(sampler_candidate, FunctionConditioning) or _has_rvs_and_pdf(
sampler_candidate
):
continue
if sampler_candidate is None:
continue
if not isinstance(sampler_candidate, str):
raise ValueError(
f"Given sampler for `{name}` must be (a) a string name from "
"``available_lens_priors``, or (b) a ``FunctionConditioning`` "
"(or comparable) providing ``.rvs`` and ``.pdf``."
)
sampler_type = sampler_candidate
dict_ = self.available_lens_priors[
name
] # e.g. {'axis_ratio_padilla_strauss': {'q_min': 0.2, 'q_max': 1.0}, ....}
if sampler_type in dict_: # e.g. 'axis_ratio_padilla_strauss'
template = dict_[sampler_type]
param_dict = template.copy() if isinstance(template, dict) else template
stored = self.lens_priors_params.get(name)
# Merge template defaults with values already accumulated on ``self``
# (nested ``lens_priors_params`` update in step 2).
if isinstance(param_dict, dict):
if isinstance(stored, dict):
param_dict.update(stored)
self.lens_priors_params[name] = param_dict
elif stored is not None:
self.lens_priors_params[name] = stored
else:
self.lens_priors_params[name] = param_dict
else:
raise ValueError(
f"{name} sampler {sampler_type} not available.\n Available {name} samplers and its parameters are: {dict_[name]}"
)
lens_function_names = ["cross_section_based_sampler", "optical_depth", "cross_section"]
# if there is user input function, update the sampler priors
for name in lens_function_names:
if (lens_functions is not None) and (name in lens_functions):
function_name = lens_functions[name]
if isinstance(function_name, str):
# available lens functions for name e.g. 'optical_depth'
dict_ = self.available_lens_functions[name]
if function_name in dict_:
param_dict = dict_[function_name]
param_dict = param_dict.copy() if isinstance(param_dict, dict) else param_dict
if (lens_functions_params is None) or (
lens_functions_params.get(name) is None
): # not a dictionary
self.lens_functions_params[name] = param_dict
else: # if there is user input lens_functions_params
param_dict.update(lens_functions_params[name])
self.lens_functions_params[name] = (
param_dict
) # user inputs override default values
else:
raise ValueError(
f"{name} function {function_name} not available.\n Available {name} functions and its parameters are: {dict_[name]}"
)
elif not callable(lens_functions[name]):
raise ValueError(
f"Given {name} function should be either a string name of available function or a function"
)
# ---------------------------------
# Lens redshift sampler functions
# ---------------------------------
[docs]
def lens_redshift_strongly_lensed_numerical(
self,
size=1000,
zs=None,
get_attribute=False,
**kwargs,
):
"""
Sample lens redshifts conditioned on strong lensing (numerical method).
This method computes the lens redshift distribution by numerically
integrating over the velocity-dispersion number density, lensing
cross-section, and differential comoving volume:
.. math::
\\frac{d\\tau}{dz_l}(z_s) \\propto
\\frac{dV_c}{dz_l}\\int d\\sigma\\,
n(\\sigma, z_l)\\,\\sigma_{\\rm SL}(\\sigma, z_l, z_s).
Parameters
----------
size : ``int``
Number of samples to generate. \\n
default: 1000
zs : ``numpy.ndarray``
Source redshifts.
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \\n
default: False
**kwargs : ``dict``
Additional parameters forwarded from ``lens_priors_params`` (typically merged onto a fresh
copy of the sampler defaults listed in ``available_lens_priors`` so shared templates are
never mutated).
cross_section_epl_shear_interpolation : ``bool``
If True, uses :meth:`~cross_section_epl_shear_interpolation` as the cross-section
function (with a lens-type-dependent interpolation grid) during the numerical
integration for lens redshift distribution.
default: False
Returns
-------
zl : ``numpy.ndarray`` or ``FunctionConditioning``
Lens redshift samples or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> zl = od.lens_redshift(size=100, zs=np.ones(100)*2.0)
"""
identifier_dict = dict(
param_name = "lens_redshift_sl",
sampler_type = "lens_redshift_strongly_lensed_numerical",
lens_type = self.lens_type,
)
identifier_dict["resolution"] = self.create_new_interpolator[identifier_dict["param_name"]][
"resolution"
]
identifier_dict["zl_resolution"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["zl_resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["cdf_size"]
identifier_dict["integration_size"] = self.lens_priors_params[
identifier_dict["param_name"]
]["integration_size"]
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
name_list = [
"z_min",
"z_max",
"cosmology",
"velocity_dispersion",
"axis_ratio",
"axis_rotation_angle",
"density_profile_slope",
"external_shear1",
"external_shear2",
]
# cross_section cannot be FunctionConditioning object
identifier_dict["cross_section"] = {}
identifier_dict["cross_section"]["name"] = self.lens_functions["cross_section"]
if self.lens_functions_params["cross_section"] is not None:
for key, value in self.lens_functions_params["cross_section"].items():
identifier_dict["cross_section"][key] = value
identifier_dict["cross_section"]["resolution"] = self.create_new_interpolator[
"cross_section"
]["resolution"]
# All other parameters can be FunctionConditioning objects
identifier_dict["velocity_dispersion"] = {}
if (
self.velocity_dispersion.info is not None
): # if velocity_dispersion is not None
for key, value in self.velocity_dispersion.info.items():
if key not in name_list:
identifier_dict["velocity_dispersion"][key] = value
identifier_dict["axis_ratio"] = {}
if self.axis_ratio.info is not None: # if axis_ratio is not None
for key, value in self.axis_ratio.info.items():
if key not in name_list:
identifier_dict["axis_ratio"][key] = value
identifier_dict["axis_rotation_angle"] = {}
if (
self.axis_rotation_angle.info is not None
): # if axis_rotation_angle is not None
for key, value in self.axis_rotation_angle.info.items():
if key not in name_list:
identifier_dict["axis_rotation_angle"][key] = value
identifier_dict["density_profile_slope"] = {}
if (
self.density_profile_slope.info is not None
): # if density_profile_slope is not None
for key, value in self.density_profile_slope.info.items():
if key not in name_list:
identifier_dict["density_profile_slope"][key] = value
identifier_dict["external_shear1"] = {}
identifier_dict["external_shear2"] = {}
if self.external_shear1.info is not None: # if external_shear1 is not None
for key, value in self.external_shear1.info.items():
if key not in name_list:
identifier_dict["external_shear1"][key] = value
if self.external_shear2.info is not None: # if external_shear2 is not None
for key, value in self.external_shear2.info.items():
if key not in name_list:
identifier_dict["external_shear2"][key] = value
# Do not mutate shared entries inside ``available_lens_priors``: copy then merge kwargs.
template = self.available_lens_priors[identifier_dict["param_name"]][
identifier_dict["sampler_type"]
]
if isinstance(template, dict):
if template:
merged_sampler_params = dict(template)
merged_sampler_params.update(kwargs)
else:
merged_sampler_params = dict(kwargs)
elif template is None:
merged_sampler_params = dict(kwargs)
else:
merged_sampler_params = dict(kwargs)
identifier_dict.update(merged_sampler_params)
print("Numerically solving the lens redshift distribution...")
zs_resolution = identifier_dict["resolution"]
zs_min = self.z_min + 0.001 if self.z_min == 0.0 else self.z_min
zs_max = self.z_max
zs_array = generate_mixed_grid(zs_min, zs_max, zs_resolution)
_, it_exist = interpolator_json_path(
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
interpolator_name=identifier_dict["sampler_type"],
)
# zl dependent velocity dispersion distribution
zl_scaled = []
zl_min = 0.0001
for i, zs_ in enumerate(zs_array):
zl_resolution = identifier_dict["zl_resolution"]
buffer_ = np.linspace(zl_min, zs_ - zl_min, zl_resolution)
zl_scaled.append(buffer_ / zs_)
zl_scaled = np.array(zl_scaled)
create_new = self.create_new_interpolator[identifier_dict["param_name"]]["create_new"]
if not it_exist or create_new:
number_density = self._helper_number_density_calculation(
zl_scaled,
zs_array,
cross_section_epl_shear_interpolation=identifier_dict.get(
"cross_section_epl_shear_interpolation", False
),
)
# number density is zero for zl=0 and infinite for zl=zs
# Adding zero at the first element of each row
zl_scaled = np.hstack((np.zeros((zl_scaled.shape[0], 1)), zl_scaled))
number_density = np.hstack(
(np.zeros((number_density.shape[0], 1)), number_density)
)
# Adding one at the last element of each row of zl_scaled
zl_scaled = np.hstack((zl_scaled, np.ones((zl_scaled.shape[0], 1))))
# Adding zero at the last element of each row of density
number_density = np.hstack(
(number_density, np.zeros((number_density.shape[0], 1)))
)
else:
# if it_exist, it will use the saved interpolator
number_density = None
zl_object = FunctionConditioning(
function=number_density,
x_array=zl_scaled,
conditioned_y_array=zs_array,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=create_new,
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
# un-scaled lens redshift
cdf_values = zl_object.cdf_values
x_array = zl_object.x_array
x_array_cdf = zl_object.x_array_cdf
y_array = zl_object.conditioned_y_array
function_spline = zl_object.function_spline
pdf_norm_const = zl_object.pdf_norm_const
@njit
def zl_function(x, y):
result = cubic_spline_interpolator2d(
x / y, y, function_spline, x_array, y_array
)
# The differential cross-section is non-negative. Clamp tiny
# negative values introduced by spline interpolation/roundoff.
idx = result < 0.0
result[idx] = 0.0
return result
zl_object.function = zl_function
@njit
def zl_pdf(x, y):
result = (
pdf_cubic_spline_interpolator2d(
x / y, y, pdf_norm_const, function_spline, x_array, y_array
)
/ y
)
# PDF is non-negative; clamp tiny interpolation artifacts.
idx = result < 0.0
result[idx] = 0.0
return result
zl_object.pdf = zl_pdf
@njit
def zl_rvs(size, y):
return (
inverse_transform_sampler2d(size, y, cdf_values, x_array_cdf, y_array) * y
)
zl_object.rvs = zl_rvs
return zl_object if get_attribute else zl_object(size, zs)
def _helper_number_density_calculation(
self, zl_scaled, zs, cross_section_epl_shear_interpolation=False
):
"""
Helper to compute lens redshift distribution using njitted functions.
Parameters
----------
zl_scaled : ``numpy.ndarray``
2D array of lens redshifts scaled by source redshift.
zs : ``numpy.ndarray``
1D array of source redshifts.
Returns
-------
density_array : ``numpy.ndarray``
2D array of the lens redshift distribution.
"""
# Check if the function can be JIT compiled
# Get random variable samplers from initialized objects
sigma_rvs_ = self.velocity_dispersion.rvs
q_rvs_ = self.axis_ratio.rvs
phi_rvs = self.axis_rotation_angle.rvs
gamma_rvs = self.density_profile_slope.rvs
shear1_rvs = self.external_shear1.rvs
shear2_rvs = self.external_shear2.rvs
# sigma_pdf_ = self.velocity_dispersion.pdf
number_density_ = self.velocity_dispersion.function
if cross_section_epl_shear_interpolation:
# Choose interpolation grid resolution based on lens model type
if self.lens_type == "sis_galaxy":
size_list = [5, 5, 5, 5, 5]
elif self.lens_type == "sie_galaxy":
size_list = [25, 25, 5, 5, 5]
else: # "epl_shear_galaxy" (default)
size_list = [25, 25, 45, 15, 15]
cross_section_ = self.cross_section_epl_shear_interpolation(
get_attribute=True,
size_list=size_list,
)
else:
cross_section_ = self.cross_section
dVcdz_function = self.differential_comoving_volume.function
from .sampler_functions import _njit_checks
use_njit_sampler, sampler_dict = _njit_checks(
sigma_rvs=sigma_rvs_,
q_rvs=q_rvs_,
phi_rvs=phi_rvs,
gamma_rvs=gamma_rvs,
shear1_rvs=shear1_rvs,
shear2_rvs=shear2_rvs,
# sigma_pdf=sigma_pdf_,
number_density_function=number_density_,
cross_section_function=cross_section_,
dVcdz_function=dVcdz_function,
create_njit_sampler=True,
)
use_multiprocessing = self.lens_priors_params["lens_redshift_sl"][
"use_multiprocessing"
]
if use_njit_sampler:
try:
from numba import threading_layer
if threading_layer() == "workqueue":
# The workqueue backend can abort on this prange-heavy path.
# Use the multiprocessing implementation instead.
use_njit_sampler = False
except ValueError:
# Threading layer not initialized yet; keep the normal path.
pass
if use_multiprocessing or not use_njit_sampler:
print("Using multiprocessing")
density_array = self._lens_redshift_multiprocessing(
zl_scaled, zs, sampler_dict
)
else:
print("Using multithreaded njit")
density_array = self._lens_redshift_multithreaded_njit(
zl_scaled, zs, sampler_dict
)
return density_array
def _lens_redshift_multithreaded_njit(self, zl_scaled, zs, sampler_dict):
q_rvs = sampler_dict["q_rvs"]
phi_rvs = sampler_dict["phi_rvs"]
gamma_rvs = sampler_dict["gamma_rvs"]
shear1_rvs = sampler_dict["shear1_rvs"]
shear2_rvs = sampler_dict["shear2_rvs"]
number_density_function = sampler_dict["number_density_function"]
cross_section_function = sampler_dict["cross_section_function"]
sigma_min = self.lens_priors_params["velocity_dispersion"]["sigma_min"]
sigma_max = self.lens_priors_params["velocity_dispersion"]["sigma_max"]
# integration_size
integration_size = (
self.lens_priors_params["lens_redshift_sl"]["integration_size"]
if "integration_size" in self.lens_priors_params["lens_redshift_sl"]
else 20000
)
# dVcdz_function
dVcdz_function = self.differential_comoving_volume.function
from .mp import lens_redshift_strongly_lensed_njit
from numba import set_num_threads
# Set Numba threads to npool before entering prange
set_num_threads(self.npool)
density_array = lens_redshift_strongly_lensed_njit(
zs, # 1D
zl_scaled, # 2D
sigma_min,
sigma_max,
q_rvs,
phi_rvs,
gamma_rvs,
shear1_rvs,
shear2_rvs,
number_density_function,
cross_section_function,
dVcdz_function,
integration_size,
)
return density_array
def _lens_redshift_multiprocessing(self, zl_scaled, zs, sampler_dict):
"""
Compute the lens redshift distribution using multiprocessing.
Parameters
----------
zl_scaled : ``numpy.ndarray``
2D array of lens redshifts scaled by source redshift.
zs : ``numpy.ndarray``
1D array of source redshifts.
Returns
-------
density_array : ``numpy.ndarray``
2D array of the lens redshift distribution.
"""
# 0. zs
# 1. zl_scaled
number_density = sampler_dict["number_density_function"]
q_rvs = sampler_dict["q_rvs"]
phi_rvs = sampler_dict["phi_rvs"]
gamma_rvs = sampler_dict["gamma_rvs"]
shear1_rvs = sampler_dict["shear1_rvs"]
shear2_rvs = sampler_dict["shear2_rvs"]
cross_section_function = sampler_dict["cross_section_function"]
sigma_min = self.velocity_dispersion.info["sigma_min"]
sigma_max = self.velocity_dispersion.info["sigma_max"]
dVcdz_function = self.differential_comoving_volume.function
integration_size = (
self.lens_priors_params["lens_redshift_sl"]["integration_size"]
if "integration_size" in self.lens_priors_params["lens_redshift_sl"]
else 20000
)
# setup input params for multiprocessing (zs, zl_scaled, worker_index)
input_params = np.array(
[
(zs[i], zl_scaled[i], i) # worker index for result ordering
for i in range(len(zs))
],
dtype=object,
)
print("Computing lens redshift distribution with multiprocessing...")
from .mp import lens_redshift_strongly_lensed_mp, _init_lens_redshift_worker
density_array = np.zeros_like(zl_scaled)
with Pool(
processes=self.npool,
initializer=_init_lens_redshift_worker,
initargs=(
sigma_min,
sigma_max,
number_density,
q_rvs,
phi_rvs,
gamma_rvs,
shear1_rvs,
shear2_rvs,
dVcdz_function,
cross_section_function,
integration_size,
),
) as pool:
for result in tqdm(
pool.imap_unordered(lens_redshift_strongly_lensed_mp, input_params),
total=len(zs),
ncols=100,
disable=False,
):
(
iter_i,
density_,
) = result
density_array[iter_i] = density_
# # simple for loop check
# for i in tqdm(range(len(zs)), ncols=100):
# result = lens_redshift_strongly_lensed_mp(input_params[i])
# iter_i, density_ = result
# density_array[iter_i] = density_
return density_array
[docs]
def lens_redshift_intrinsic_numerical(
self, size=1000, zs=None, get_attribute=False, **kwargs
):
"""
Sample intrinsic lens redshifts (numerical method).
This method computes the intrinsic lens redshift distribution by
numerically integrating the velocity-dispersion number density and
differential comoving volume.
Parameters
----------
size : ``int``
Number of samples to generate. \n
default: 1000
zs : ``numpy.ndarray`` or ``None``
Source redshifts (not used, kept for API compatibility).
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
Returns
-------
zl : ``numpy.ndarray`` or ``FunctionConditioning``
Lens redshift samples or sampler object.
"""
identifier_dict = dict(
param_name="lens_redshift",
sampler_type="lens_redshift_intrinsic_numerical",
)
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
identifier_dict["resolution"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["cdf_size"]
create_new = self.create_new_interpolator[identifier_dict["param_name"]][
"create_new"
]
name_list = ["z_min", "z_max", "cosmology", "velocity_dispersion"]
identifier_dict["velocity_dispersion"] = {}
if (
self.velocity_dispersion.info is not None
): # if velocity_dispersion is not None
for key, value in self.velocity_dispersion.info.items():
if key not in name_list:
identifier_dict["velocity_dispersion"][key] = value
_, it_exist = interpolator_json_path(
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict['param_name'],
interpolator_name=identifier_dict["sampler_type"],
)
if not it_exist or create_new:
# calculate for zs=zs_max
z_min = 0.0001 if self.z_min == 0 else self.z_min
z_max = self.z_max
resolution = identifier_dict["resolution"]
zl_array = generate_mixed_grid(z_min, z_max, resolution)
# create number density
if self.velocity_dispersion.conditioned_y_array is None:
def integrand(sigma, z):
return (
self.velocity_dispersion.function(np.array([sigma]))[0]
* self.differential_comoving_volume.function(np.array([z]))[0]
)
else:
def integrand(sigma, z):
return (
self.velocity_dispersion.function(
np.array([sigma]), np.array([z])
)[0]
* self.differential_comoving_volume.function(np.array([z]))[0]
)
# fix zl and integrate over sigma
integral = [
quad(
integrand,
identifier_dict["velocity_dispersion"]["sigma_min"],
identifier_dict["velocity_dispersion"]["sigma_max"],
args=(zl),
)[0]
for zl in zl_array
]
# create spline fit for lens redshift, with zs=zs_max
number_density_spline = CubicSpline(zl_array, integral)
# zl dependent velocity dispersion distribution
zl_scaled = []
number_density = []
zl_min = 0.0001
zl_size = identifier_dict["resolution"] = self.create_new_interpolator[identifier_dict["param_name"]]["zl_resolution"]
for i, zl_ in enumerate(zl_array):
buffer_ = np.linspace(zl_min, zl_ - zl_min, zl_size)
zl_scaled.append(buffer_ / zl_)
number_density.append(number_density_spline(buffer_))
zl_scaled = np.array(zl_scaled)
number_density = np.array(number_density)
else:
zl_array = None
number_density = None
zl_scaled = None
zl_object = FunctionConditioning(
function=number_density,
x_array=zl_scaled,
conditioned_y_array=zl_array,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict['param_name'],
name=identifier_dict["sampler_type"],
create_new=create_new,
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
# un-scaled lens redshift
cdf_values = zl_object.cdf_values
x_array = zl_object.x_array
x_array_cdf = zl_object.x_array_cdf
y_array = zl_object.conditioned_y_array
function_spline = zl_object.function_spline
pdf_norm_const = zl_object.pdf_norm_const
@njit
def zl_function(x, y):
return cubic_spline_interpolator2d(
x / y, y, function_spline, x_array, y_array
)
zl_object.function = zl_function
@njit
def zl_pdf(x, y):
return (
pdf_cubic_spline_interpolator2d(
x / y, y, pdf_norm_const, function_spline, x_array, y_array
)
/ y
)
zl_object.pdf = zl_pdf
@njit
def zl_rvs(size, y):
return (
inverse_transform_sampler2d(size, y, cdf_values, x_array_cdf, y_array) * y
)
zl_object.rvs = zl_rvs
return zl_object if get_attribute else zl_object(size, zs)
[docs]
def lens_redshift_strongly_lensed_sis_analytical(
self, size, zs, get_attribute=False, **kwargs
):
"""
Sample SIS lens redshifts using Haris et al. (2018) distribution.
Parameters
----------
size : ``int``
Number of samples to generate.
zs : ``numpy.ndarray``
Source redshifts.
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters.
Returns
-------
zl : ``numpy.ndarray`` or ``FunctionConditioning``
Lens redshift samples or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth(lens_type='sis_galaxy')
>>> zl = od.lens_redshift(size=100, zs=np.ones(100)*2.0)
"""
# # Old way
# splineDc = self.comoving_distance.function_spline # spline coefficients for the comoving distance and redshifts
# splineDcInv = self.comoving_distance.function_inverse_spline # spline coefficients for the redshifts and comoving distance
# u = np.linspace(0, 1, 500)
# cdf = (10 * u**3 - 15 * u**4 + 6 * u**5) # See the integral of Eq. A7 of https://arxiv.org/pdf/1807.07062.pdf (cdf)
# zs = np.array([zs]).reshape(-1)
# New way
# identifier_dict = {"name": "lens_redshift_strongly_lensed_sis_analytical"}
identifier_dict = dict(
param_name="lens_redshift_sl",
sampler_type="lens_redshift_strongly_lensed_sis_analytical",
lens_type=self.lens_type,
)
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = LambdaCDM(H0=70, Om0=0.3, Ode0=0.7)
# Harris et al. (2018) choice of velocity dispersion is from Choi et al. (2007)
identifier_dict["velocity_dispersion"] = dict(
sigma_min=100.0,
sigma_max=400.0,
alpha=2.32,
beta=2.67,
phistar=8.0e-3 * self.cosmo.h**3,
sigmastar=161.0,
)
identifier_dict["resolution"] = self.create_new_interpolator[identifier_dict["param_name"]][
"resolution"
]
identifier_dict["cdf_size"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["cdf_size"]
param_dict = self.available_lens_priors[identifier_dict["param_name"]][
"lens_redshift_strongly_lensed_sis_analytical"
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
zl_resolution = identifier_dict["resolution"]
x_array = np.linspace(0.0, 1.0, zl_resolution)
def pdf_(x):
return 30 * x**2 * (1 - x) ** 2 # Haris et al. 2018 (A7)
zl_object = FunctionConditioning(
function=pdf_,
x_array=x_array,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator[identifier_dict["param_name"]]["create_new"],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
cdf_values_zl = zl_object.cdf_values
x_array_cdf_zl = zl_object.x_array_cdf
function_spline_zl = zl_object.function_spline
# pdf_norm_const = zl_object.pdf_norm_const
x_array_zl = zl_object.x_array
spline_Dc = self.comoving_distance.function_spline
x_array_Dc = self.comoving_distance.x_array
inverse_spline_Dc = self.comoving_distance.function_inverse_spline
z_array_Dc = self.comoving_distance.z_array
@njit
def zl_rvs(size, zs):
r = inverse_transform_sampler(size, cdf_values_zl, x_array_cdf_zl)
zs_Dc = cubic_spline_interpolator(zs, spline_Dc, x_array_Dc)
zl_Dc = zs_Dc * r
return cubic_spline_interpolator(zl_Dc, inverse_spline_Dc, z_array_Dc)
@njit
def zl_pdf(zl, zs):
zs_Dc = cubic_spline_interpolator(zs, spline_Dc, x_array_Dc)
zl_Dc = cubic_spline_interpolator(zl, spline_Dc, x_array_Dc)
r = zl_Dc / zs_Dc
return (
cubic_spline_interpolator(r, function_spline_zl, x_array_zl)
# / pdf_norm_const # pdf_norm_const is 1
)
# @njit
# def zl_function(zl, zs):
# r = zl / zs
# return cubic_spline_interpolator(r, function_spline_zl, x_array_zl)
# zl_object.function = zl_function
zl_object.function = None
zl_object.pdf = zl_pdf
zl_object.rvs = zl_rvs
return zl_object if get_attribute else zl_object.rvs(size, zs)
# ---------------------------------------
# Velocity dispersion sampler functions
# ---------------------------------------
[docs]
def gengamma(self, size, get_attribute=False, **kwargs):
"""
Sample velocity dispersion from generalized gamma distribution.
Parameters
----------
size : ``int``
Number of samples to generate.
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters. \n
+-------------------+-------------------+---------------------------------------+
| Parameter | Unit | Description |
+===================+===================+=======================================+
| param_name | | Name of the parameter |
+-------------------+-------------------+---------------------------------------+
| sampler_type | | Type of the sample |
+-------------------+-------------------+---------------------------------------+
| sigma_min | km/s | minimum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| sigma_max | km/s | maximum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| alpha | dimensionless | Power-law index governing the slope |
| | | of the distribution at low velocities |
+-------------------+-------------------+---------------------------------------+
| beta | dimensionless | Exponential parameter determining the |
| | | sharpness of the high-velocity cutoff |
+-------------------+-------------------+---------------------------------------+
| phistar | h^3 Mpc^-3 | Normalization constant representing |
| | | the comoving number density of galaxy |
+-------------------+-------------------+---------------------------------------+
| sigmastar | km/s | Characteristic velocity scale marking |
| | | the "knee" transition in the VDF |
+-------------------+-------------------+---------------------------------------+
Returns
-------
sigma : ``numpy.ndarray`` or ``FunctionConditioning``
Velocity dispersion samples (km/s) or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth(lens_priors=dict(
... velocity_dispersion="gengamma"))
>>> sigma = od.velocity_dispersion(size=100)
"""
identifier_dict = {
"param_name": "velocity_dispersion",
"sampler_type": "gengamma",
}
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
identifier_dict["resolution"] = self.create_new_interpolator[
"velocity_dispersion"
]["resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator[
"velocity_dispersion"
]["cdf_size"]
param_dict = self.available_lens_priors["velocity_dispersion"][
"gengamma"
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
# setting up inputs for the interpolator
sigma_array = np.linspace(
identifier_dict["sigma_min"],
identifier_dict["sigma_max"],
identifier_dict["resolution"],
)
from .sampler_functions import gengamma_function
def density_func_(sigma_):
return gengamma_function(
sigma=sigma_,
sigma_min=identifier_dict["sigma_min"],
sigma_max=identifier_dict["sigma_max"],
alpha=identifier_dict["alpha"],
beta=identifier_dict["beta"],
phistar=identifier_dict["phistar"],
sigmastar=identifier_dict["sigmastar"],
)
sigma_object = FunctionConditioning(
function=density_func_,
x_array=sigma_array,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator["velocity_dispersion"][
"create_new"
],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
return sigma_object if get_attribute else sigma_object.rvs(size)
[docs]
def velocity_dispersion_bernardi(self, size, get_attribute=False, **kwargs):
"""
Sample velocity dispersion from Bernardi et al. (2010) distribution.
Uses inverse transform sampling on the velocity dispersion function.
Parameters
----------
size : ``int``
Number of samples to generate.
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters. \n
+-------------------+-------------------+---------------------------------------+
| Parameter | Unit | Description |
+===================+===================+=======================================+
| param_name | | Name of the parameter |
+-------------------+-------------------+---------------------------------------+
| sampler_type | | Type of the sample |
+-------------------+-------------------+---------------------------------------+
| sigma_min | km/s | minimum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| sigma_max | km/s | maximum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| alpha | dimensionless | Power-law index governing the slope |
| | | of the distribution at low velocities |
+-------------------+-------------------+---------------------------------------+
| beta | dimensionless | Exponential parameter determining the |
| | | sharpness of the high-velocity cutoff |
+-------------------+-------------------+---------------------------------------+
| phistar | h^3 Mpc^-3 | Normalization constant representing |
| | | the comoving number density of galaxy |
+-------------------+-------------------+---------------------------------------+
| sigmastar | km/s | Characteristic velocity scale marking |
| | | the "knee" transition in the VDF |
+-------------------+-------------------+---------------------------------------+
Returns
-------
sigma : ``numpy.ndarray`` or ``FunctionConditioning``
Velocity dispersion samples (km/s) or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth(lens_priors=dict(
... velocity_dispersion="velocity_dispersion_bernardi"))
>>> sigma = od.velocity_dispersion(size=100)
"""
identifier_dict = {
"param_name": "velocity_dispersion",
"sampler_type": "velocity_dispersion_bernardi",
}
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
identifier_dict["resolution"] = self.create_new_interpolator[
"velocity_dispersion"
]["resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator[
"velocity_dispersion"
]["cdf_size"]
param_dict = self.available_lens_priors["velocity_dispersion"][
"velocity_dispersion_bernardi"
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
# setting up inputs for the interpolator
sigma_array = np.linspace(
identifier_dict["sigma_min"],
identifier_dict["sigma_max"],
identifier_dict["resolution"],
)
from .sampler_functions import velocity_dispersion_bernardi_denisty_function
def number_density_function(sigma):
return velocity_dispersion_bernardi_denisty_function(
sigma,
alpha=self.lens_priors_params["velocity_dispersion"]["alpha"],
beta=self.lens_priors_params["velocity_dispersion"]["beta"],
phistar=self.lens_priors_params["velocity_dispersion"][
"phistar"
],
sigmastar=self.lens_priors_params["velocity_dispersion"][
"sigmastar"
],
)
sigma_object = FunctionConditioning(
function=number_density_function,
x_array=sigma_array,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator["velocity_dispersion"][
"create_new"
],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
return sigma_object if get_attribute else sigma_object.rvs(size)
[docs]
def velocity_dispersion_ewoud(self, size, zl, get_attribute=False, **kwargs):
"""
Sample redshift-dependent velocity dispersion from Wempe et al. (2022).
Uses inverse transform sampling with redshift-dependent velocity
dispersion function.
Parameters
----------
size : ``int``
Number of samples to generate.
zl : ``numpy.ndarray``
Lens redshifts.
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters. \n
+-------------------+-------------------+---------------------------------------+
| Parameter | Unit | Description |
+===================+===================+=======================================+
| param_name | | Name of the parameter |
+-------------------+-------------------+---------------------------------------+
| sampler_type | | Type of the sample |
+-------------------+-------------------+---------------------------------------+
| sigma_min | km/s | minimum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| sigma_max | km/s | maximum velocity dispersion |
+-------------------+-------------------+---------------------------------------+
| alpha | dimensionless | Power-law index governing the slope |
| | | of the distribution at low velocities |
+-------------------+-------------------+---------------------------------------+
| beta | dimensionless | Exponential parameter determining the |
| | | sharpness of the high-velocity cutoff |
+-------------------+-------------------+---------------------------------------+
| phistar | h^3 Mpc^-3 | Normalization constant representing |
| | | the comoving number density of galaxy |
+-------------------+-------------------+---------------------------------------+
| sigmastar | km/s | Characteristic velocity scale marking |
| | | the "knee" transition in the VDF |
+-------------------+-------------------+---------------------------------------+
Returns
-------
sigma : ``numpy.ndarray`` or ``FunctionConditioning``
Velocity dispersion samples (km/s) or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> sigma = od.velocity_dispersion(size=100, zl=np.ones(100)*0.5)
"""
identifier_dict = {"param_name": "velocity_dispersion", "sampler_type": "velocity_dispersion_ewoud"}
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
identifier_dict["resolution"] = self.create_new_interpolator[
"velocity_dispersion"
]["resolution"]
identifier_dict["zl_resolution"] = self.create_new_interpolator[
"velocity_dispersion"
]["zl_resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator[
"velocity_dispersion"
]["cdf_size"]
param_dict = self.available_lens_priors["velocity_dispersion"][
"velocity_dispersion_ewoud"
].copy()
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
# setting up inputs for the interpolator
sigma_array = np.linspace(
identifier_dict["sigma_min"],
identifier_dict["sigma_max"],
identifier_dict["resolution"],
)
from .sampler_functions import velocity_dispersion_ewoud_denisty_function
def number_density_function(sigma, zl):
return velocity_dispersion_ewoud_denisty_function(
sigma,
zl,
alpha=self.lens_priors_params["velocity_dispersion"]["alpha"],
beta=self.lens_priors_params["velocity_dispersion"]["beta"],
phistar=self.lens_priors_params["velocity_dispersion"][
"phistar"
],
sigmastar=self.lens_priors_params["velocity_dispersion"][
"sigmastar"
],
)
z_min = self.z_min + 0.001 if self.z_min == 0.0 else self.z_min
z_max = self.z_max
z_resolution = identifier_dict["zl_resolution"]
zl_array = generate_mixed_grid(z_min, z_max, z_resolution)
sigma_object = FunctionConditioning(
function=number_density_function,
x_array=sigma_array,
non_negative_function=True,
conditioned_y_array=zl_array,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator["velocity_dispersion"][
"create_new"
],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
return sigma_object if get_attribute else sigma_object.rvs(size, zl)
# ------------------------------
# Axis ratio sampler functions
# ------------------------------
[docs]
def rayleigh(self, size, sigma, get_attribute=False, **kwargs):
"""
Sample axis ratio from Rayleigh distribution conditioned on velocity dispersion.
Parameters
----------
size : ``int``
Number of samples to generate.
sigma : ``numpy.ndarray``
Velocity dispersion of the lens galaxy (km/s).
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters (param_name, sampler_type, q_min, q_max).
Returns
-------
q : ``numpy.ndarray`` or ``FunctionConditioning``
Axis ratio samples or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth(lens_priors=dict(axis_ratio="rayleigh"))
>>> q = od.axis_ratio(size=100, sigma=np.ones(100)*200.)
"""
identifier_dict = {"param_name": "axis_ratio", "sampler_type": "rayleigh"}
identifier_dict["velocity_dispersion"] = {}
if (
self.velocity_dispersion.info is not None
): # if velocity_dispersion is not None
for key, value in self.velocity_dispersion.info.items():
identifier_dict["velocity_dispersion"][key] = value
identifier_dict["resolution"] = self.create_new_interpolator["axis_ratio"][
"resolution"
]
identifier_dict["sigma_resolution"] = self.create_new_interpolator[
"axis_ratio"
]["sigma_resolution"]
identifier_dict["cdf_size"] = self.create_new_interpolator["axis_ratio"][
"cdf_size"
]
param_dict = self.available_lens_priors["axis_ratio"]["rayleigh"]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
q_array = np.linspace(
identifier_dict["q_min"],
identifier_dict["q_max"],
identifier_dict["resolution"],
)
sigma_array = np.linspace(
identifier_dict["velocity_dispersion"]["sigma_min"],
identifier_dict["velocity_dispersion"]["sigma_max"],
identifier_dict["sigma_resolution"],
)
from .sampler_functions import rayleigh_pdf
def q_pdf(q, sigma):
return rayleigh_pdf(
q=q,
sigma=sigma,
q_min=identifier_dict["q_min"],
q_max=identifier_dict["q_max"],
)
q_object = FunctionConditioning(
function=q_pdf,
x_array=q_array,
non_negative_function=True,
conditioned_y_array=sigma_array,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator["axis_ratio"]["create_new"],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
return q_object if get_attribute else q_object(size, sigma)
[docs]
def axis_ratio_padilla_strauss(self, size=1000, get_attribute=False, **kwargs):
"""
Sample axis ratio from Padilla & Strauss (2008) distribution.
Parameters
----------
size : ``int``
Number of samples to generate. \n
default: 1000
get_attribute : ``bool``
If True, returns the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Additional parameters (param_name, sampler_type, q_min, q_max).
Returns
-------
q : ``numpy.ndarray`` or ``FunctionConditioning``
Axis ratio samples or sampler object.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth(lens_priors=dict(axis_ratio="axis_ratio_padilla_strauss"))
>>> q = od.axis_ratio(size=100)
"""
identifier_dict = {"param_name": "axis_ratio", "sampler_type": "axis_ratio_padilla_strauss"}
identifier_dict["resolution"] = self.create_new_interpolator["axis_ratio"][
"resolution"
]
param_dict = self.available_lens_priors["axis_ratio"][
"axis_ratio_padilla_strauss"
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
# Using Padilla and Strauss 2008 distribution for axis ratio
from .sampler_functions import axis_ratio_padilla_strauss_pdf
q_array = np.linspace(
identifier_dict["q_min"],
identifier_dict["q_max"],
identifier_dict["resolution"],
)
pdf = axis_ratio_padilla_strauss_pdf(q_array)
q_object = FunctionConditioning(
function=pdf,
x_array=q_array,
conditioned_y_array=None,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_new=self.create_new_interpolator["axis_ratio"]["create_new"],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="rvs",
cdf_size=identifier_dict["cdf_size"],
)
return q_object if get_attribute else q_object(size)
[docs]
def constant_values_n_size(self, size=100, get_attribute=False, **kwargs):
"""
Return array of constant values.
Parameters
----------
size : ``int``
Number of values to return. \n
default: 100
get_attribute : ``bool``
If True, return the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Model parameters: \n
- value: Constant value to return, default: 0.0
Returns
-------
values : ``numpy.ndarray``
Array of constant values.
"""
identifier_dict = dict(
param_name = "unknown_parameter",
sampler_type = "constant_values_n_size",
)
param_dict = dict(value=0.0)
param_dict.update(kwargs)
identifier_dict.update(param_dict)
value = identifier_dict["value"]
# pdf_, zero everywhere except at value
@njit
def pdf_(x):
return np.where(x == value, 1.0, 0.0)
# rvs_, return value
@njit
def rvs_(size):
return np.full(size, value)
object_ = FunctionConditioning(
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_function_inverse=False,
create_function=False,
create_pdf=pdf_,
create_rvs=rvs_,
callback="rvs",
)
if get_attribute:
return object_
else:
return object_.rvs(size)
[docs]
def normal(self, size, get_attribute=False, **kwargs):
"""
Sample with a normal distribution.
Parameters
----------
size : ``int``
Number of samples to draw.
get_attribute : ``bool``
If True, return the sampler object instead of samples. \n
default: False
**kwargs : ``dict``
Model parameters: \n
- mu: Mean of the normal distribution, default: 1.99 \n
- sigma: Standard deviation of the normal distribution, default: 0.149 \n
Returns
-------
x : ``numpy.ndarray``
Array of values drawn from the normal distribution.
"""
# Get parameters
identifier_dict = dict(
param_name = "unknown_parameter",
sampler_type = "normal",
)
param_dict = dict(mu=1.99, sigma=0.149)
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
mu = identifier_dict["mu"]
sigma = identifier_dict["sigma"]
@njit
def normal_pdf_(x):
return (1.0 / (sigma * np.sqrt(2.0 * np.pi))) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)
@njit
def normal_rvs_(size):
return np.random.normal(mu, sigma, size)
object_ = FunctionConditioning(
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["sampler_type"],
create_function=False,
create_pdf=normal_pdf_,
create_rvs=normal_rvs_,
callback='rvs',
)
return object_ if get_attribute else object_.rvs(size)
# --------------------
# Lens functions
# --------------------
[docs]
def optical_depth_numerical(self, zs, get_attribute=False, **kwargs):
"""
Helper to compute optical depth numerically by integrating lens redshift.
Parameters
----------
zs : ``numpy.ndarray``
Source redshifts.
get_attribute : ``bool``
If True, returns the function object instead of values. \n
default: False
**kwargs : ``dict``
Additional parameters.
Returns
-------
tau : ``numpy.ndarray`` or ``FunctionConditioning``
Optical depth values or function object.
Notes
-----
The optical depth is a cumulative integral of a differential cross-section
and is therefore non-negative. When optical depth values are cached via
spline interpolation, tiny negative values can appear near boundaries due
to numerical interpolation artifacts. This routine clamps such values to 0
by constructing the cached interpolator with ``non_negative_function=True``.
"""
identifier_dict = dict(
param_name="optical_depth",
function_type="optical_depth_numerical",
)
identifier_dict["resolution"] = self.create_new_interpolator[identifier_dict["param_name"]][
"resolution"
]
identifier_dict["cdf_size"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["cdf_size"]
identifier_dict["z_min"] = self.z_min
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
name_list = [
"z_min",
"z_max",
"cosmology",
]
identifier_dict["lens_redshift_sl"] = {}
if self.lens_redshift_sl.info is not None:
for key, value in self.lens_redshift_sl.info.items():
if key not in name_list:
identifier_dict["lens_redshift_sl"][key] = value
param_dict = self.available_lens_functions[identifier_dict["param_name"]][
identifier_dict["function_type"]
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
z_min = self.z_min if self.z_min > 0.0 else 0.0001
z_max = self.z_max
resolution = identifier_dict["resolution"]
# zs_array = np.geomspace(z_min, z_max, resolution)
# z_min = self.z_min + 0.001 if self.z_min == 0.0 else self.z_min
# z_max = self.z_max
# z_resolution = identifier_dict["resolution"]
zs_array = generate_mixed_grid(z_min, z_max, resolution)
def tau(zs):
# self.lens_redshift_sl.function gives differential cross-section
def integrand(zl_, zs_):
return self.lens_redshift_sl.function(np.array([zl_]), np.array([zs_]))[0]
integral = [quad(integrand, 0.0, z, args=(z))[0] for z in zs]
return integral
tau_object = FunctionConditioning(
function=tau,
x_array=zs_array,
conditioned_y_array=None,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["function_type"],
create_new=self.create_new_interpolator[identifier_dict["param_name"]]["create_new"],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="function",
cdf_size=identifier_dict["cdf_size"],
)
return tau_object if get_attribute else tau_object.function(zs)
[docs]
def compute_einstein_radii(self, sigma, zl, zs):
"""
Function to compute the Einstein radii of the lens galaxies
Parameters
----------
sigma : `float`
velocity dispersion of the lens galaxy
zl : `float`
lens redshifts
zs : `float`
source redshifts
Returns
-------
theta_E : `float`
Einstein radii of the lens galaxies in radians. Multiply by
Examples
--------
>>> from ler.lens_galaxy_population import LensGalaxyParameterDistribution
>>> lens = LensGalaxyParameterDistribution()
>>> sigma = 200.0
>>> zl = 0.5
>>> zs = 1.0
>>> lens.compute_einstein_radii(sigma, zl, zs)
"""
# Compute the angular diameter distances
Ds = self.angular_diameter_distance.function(zs)
Dls = self.angular_diameter_distance_z1z2.function(zl, zs)
# Compute the Einstein radii
theta_E = (
4.0 * np.pi * (sigma / 299792.458) ** 2 * Dls / (Ds)
) # Note: km/s for sigma; Dls, Ds are in Mpc
return theta_E
[docs]
def optical_depth_sis_analytic(self, zs, get_attribute=False, **kwargs):
"""
Function to compute the strong lensing optical depth (SIS).
LambdaCDM(H0=70, Om0=0.3, Ode0=0.7) was used to derive the following equation. This is the analytic version of optical depth from z=0 to z=zs.
Parameters
----------
zs : ``numpy.ndarray``
Source redshifts.
get_attribute : ``bool``
If True, returns the function object instead of values. \\n
default: False
**kwargs : ``dict``
Additional parameters.
Returns
-------
tau : ``numpy.ndarray`` or ``FunctionConditioning``
Optical depth values or function object.
Notes
-----
The analytic SIS optical depth is non-negative. When cached via spline
interpolation, tiny negative values can occur at the edges due to
interpolation/roundoff. The cached interpolator is therefore created with
``non_negative_function=True`` to clamp such artifacts to 0.
"""
identifier_dict = dict(
param_name="optical_depth",
function_type="optical_depth_sis_analytic",
)
identifier_dict["z_min"] = self.z_min if self.z_min > 0.0 else 0.0001
identifier_dict["z_max"] = self.z_max
identifier_dict["cosmology"] = self.cosmo
identifier_dict["resolution"] = self.create_new_interpolator[identifier_dict["param_name"]][
"resolution"
]
identifier_dict["cdf_size"] = self.create_new_interpolator[
identifier_dict["param_name"]
]["cdf_size"]
param_dict = self.available_lens_functions[identifier_dict["param_name"]][
identifier_dict["function_type"]
]
if param_dict:
param_dict.update(kwargs)
else:
param_dict = kwargs
identifier_dict.update(param_dict)
z_min = identifier_dict["z_min"]
z_max = identifier_dict["z_max"]
z_resolution = identifier_dict["resolution"]
zs_arr = generate_mixed_grid(z_min, z_max, z_resolution)
def tau(zs):
Dc = self.comoving_distance.function(zs)
return 4.192e-15 * Dc**3
tau_object = FunctionConditioning(
function=tau,
x_array=zs_arr,
conditioned_y_array=None,
non_negative_function=True,
identifier_dict=identifier_dict,
directory=self.directory,
sub_directory=identifier_dict["param_name"],
name=identifier_dict["function_type"],
create_new=self.create_new_interpolator[identifier_dict["param_name"]]["create_new"],
create_function_inverse=False,
create_function=True,
create_pdf=True,
create_rvs=True,
callback="function",
cdf_size=identifier_dict["cdf_size"],
)
return tau_object if get_attribute else tau_object.function(zs)
# ------------------------
# cross section functions
# ------------------------
[docs]
def cross_section_sis(
self, zs=None, zl=None, sigma=None, get_attribute=False, **kwargs
):
"""
Function to compute the SIS cross-section
Parameters
----------
sigma : `float`
velocity dispersion of the lens galaxy
zl : `float`
redshift of the lens galaxy
zs : `float`
redshift of the source galaxy
Returns
-------
cross_section : `float`
SIS cross-section
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> print(od.cross_section_sis(sigma=200., zl=0.5, zs=1.0))
"""
# theta_E = self.compute_einstein_radii(sigma, zl, zs)
# Compute the angular diameter distances
_Da = self.angular_diameter_distance.function
_Da_z1z2 = self.angular_diameter_distance_z1z2.function
# Compute the Einstein radii
if get_attribute:
@njit
def _cross_section_sis(zs, zl, sigma):
Ds = _Da(zs)
Dls = _Da_z1z2(zl, zs)
theta_E = 4.0 * np.pi * (sigma / 299792.458) ** 2 * Dls / (Ds)
return np.pi * theta_E**2
return _cross_section_sis
else:
Ds = _Da(zs)
Dls = _Da_z1z2(zl, zs)
theta_E = 4.0 * np.pi * (sigma / 299792.458) ** 2 * Dls / (Ds)
return np.pi * theta_E**2
[docs]
def cross_section_sie_feixu(
self, zs=None, zl=None, sigma=None, q=None, get_attribute=False, **kwargs
):
"""
Function to compute the SIE cross-section from Fei Xu et al. (2021)
Parameters
----------
sigma : `float`
velocity dispersion of the lens galaxy
zl : `float`
redshift of the lens galaxy
zs : `float`
redshift of the source galaxy
Returns
-------
cross_section : `float`
SIE cross-section
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> print(od.cross_section_sie_feixu(sigma=200., zl=0.5, zs=1.0, q=1.0))
"""
if get_attribute:
_cross_section_sis = self.cross_section_sis(
sigma=None, zl=None, zs=None, get_attribute=True
)
@njit
def _cross_section_sie_feixu(zs, zl, sigma, q):
return phi_cut_SIE(q) * _cross_section_sis(zs, zl, sigma)
return _cross_section_sie_feixu
else:
_cross_section_sis = self.cross_section_sis(
zs=zs, zl=zl, sigma=sigma, get_attribute=False
)
return phi_cut_SIE(q) * _cross_section_sis
[docs]
def cross_section_epl_shear_numerical(
self,
zs,
zl,
sigma,
q,
phi,
gamma,
gamma1,
gamma2,
):
"""
Function to compute the strong lensing cross-section numerically for EPL + external shear lenses.
Parameters
----------
zs : `numpy.ndarray`
redshift of the source galaxies
zl : `numpy.ndarray`
redshift of the lens galaxies
sigma : `numpy.ndarray`
velocity dispersion of the lens galaxies
q : `numpy.ndarray`
axis ratios of the lens galaxies
phi : `numpy.ndarray`
axis rotation angles of the lens galaxies in radians
gamma : `numpy.ndarray`
external shear magnitudes of the lens galaxies
gamma1 : `numpy.ndarray`
external shear component 1 of the lens galaxies
gamma2 : `numpy.ndarray`
external shear component 2 of the lens galaxies
Returns
-------
cross_section : `numpy.ndarray`
strong lensing cross-section of the lens galaxies in square radians
"""
size = len(zs)
e1, e2 = phi_q2_ellipticity(phi, q)
theta_E = self.compute_einstein_radii(sigma, zl, zs)
area_list = np.ones(size)
for i in range(size):
area_list[i] = cross_section(
theta_E[i], e1[i], e2[i], gamma[i], gamma1[i], gamma2[i]
)
return area_list
[docs]
def cross_section_epl_shear_numerical_mp(
self,
theta_E,
gamma,
gamma1,
gamma2,
q=None,
phi=None,
e1=None,
e2=None,
verbose=False,
**kwargs,
):
"""
Function to compute the strong lensing cross-section numerically for EPL + external shear lenses.
Parameters
----------
theta_E : `numpy.ndarray`
Einstein radii of the lens galaxies in radians
gamma : `numpy.ndarray`
external shear magnitudes of the lens galaxies
gamma1 : `numpy.ndarray`
external shear component 1 of the lens galaxies
gamma2 : `numpy.ndarray`
external shear component 2 of the lens galaxies
q : `numpy.ndarray`
axis ratios of the lens galaxies
phi : `numpy.ndarray`
axis rotation angles of the lens galaxies in radians
e1 : `numpy.ndarray`
ellipticity component 1 of the lens galaxies
e2 : `numpy.ndarray`
ellipticity component 2 of the lens galaxies
Returns
-------
cross_section : `numpy.ndarray`
strong lensing cross-section of the lens galaxies in square radians
"""
# Transform the axis ratio and the axis rotation angle, to ellipticities e1, e2, using lenstronomy
if e1 is None or e2 is None:
if q is None or phi is None:
raise ValueError("Either (e1, e2) or (q, phi) must be provided.")
e1, e2 = phi_q2_ellipticity(phi, q)
size = len(theta_E)
iter_ = np.arange(size)
points = np.column_stack((theta_E, e1, e2, gamma, gamma1, gamma2, iter_))
area_list = np.ones(size)
with Pool(processes=self.npool) as pool:
if verbose:
for result in tqdm(
pool.imap_unordered(cross_section_mp, points),
total=size,
ncols=100,
disable=False,
):
(
iter_i,
area,
) = result
area_list[int(iter_i)] = area
else:
for result in pool.map(cross_section_mp, points):
(
iter_i,
area,
) = result
area_list[int(iter_i)] = area
return np.array(area_list)
def _two_sided_mixed_grid(
self,
x_min,
x_max,
resolution,
power_law_part='lower',
spacing_trend='increasing',
power=2.3,
value_transition_fraction=0.3,
num_transition_fraction=0.6,
auto_match_slope=True,
):
# n1 chosen so that n1 + (n1-1) = 2*n1-1 >= resolution.
# The -1 is because x2 drops the shared midpoint (0.5) via [1:].
n1 = (resolution + 2) // 2
# generate grid from 0 to 0.5
x1 = generate_mixed_grid(
x_min=0.,
x_max=0.5,
resolution=n1,
power_law_part=power_law_part,
spacing_trend=spacing_trend,
power=power,
value_transition_fraction=value_transition_fraction,
num_transition_fraction=num_transition_fraction,
auto_match_slope=auto_match_slope
)
# Mirror x1 onto [0.5, 1.0]; [1:] removes the shared endpoint at 0.5.
x2 = np.sort(1 - x1)[1:]
u_grid = np.concatenate([x1, x2])[:resolution]
x = x_min + u_grid * (x_max - x_min)
return x
def _parameter_spacing(self, x_min, x_max, resolution, config=None):
"""
Create 1D grid axis based on spacing config.
Supported modes:
- ``linear`` (default)
- ``geom_mixed`` (power-law + linear)
- ``gaussian_mixed`` (Gaussian concentration + linear remainder)
"""
if config is None:
return np.linspace(x_min, x_max, resolution)
mode = str(config.get("mode", "linear")).lower()
if mode == "powerlaw_mixed":
return generate_mixed_grid(
x_min=x_min,
x_max=x_max,
resolution=resolution,
power_law_part=config.get("power_law_part", "lower"),
spacing_trend=config.get("spacing_trend", "increasing"),
power=config.get("power", 2.5),
value_transition_fraction=config.get("value_transition_fraction", 0.6),
num_transition_fraction=config.get("num_transition_fraction", 0.3),
auto_match_slope=config.get("auto_match_slope", True)
)
if mode == "two_sided_mixed_grid":
return self._two_sided_mixed_grid(
x_min=x_min,
x_max=x_max,
resolution=resolution,
power_law_part=config.get("power_law_part", "lower"),
spacing_trend=config.get("spacing_trend", "increasing"),
power=config.get("power", 2.5),
value_transition_fraction=config.get("value_transition_fraction", 0.6),
num_transition_fraction=config.get("num_transition_fraction", 0.3),
auto_match_slope=config.get("auto_match_slope", True)
)
return np.linspace(x_min, x_max, resolution)
def _compute_parameter_bounds(self, size_list=[25, 25, 45, 15, 15]):
"""
Sample lens parameters with a fixed internal seed and return min/max
bounds used for grid construction.
The global NumPy random state is saved before seeding and restored
afterwards, so no external draws are affected.
Parameters
----------
size_list : list
Grid axis sizes ``[n_e1, n_e2, n_gamma, n_gamma1, n_gamma2]``.
Used to set the padding around the sampled extremes.
Returns
-------
bounds : dict
Keys: ``q_min``, ``q_max``, ``phi_min``, ``phi_max``,
``e1_min``, ``e1_max``, ``e2_min``, ``e2_max``,
``gamma_min``, ``gamma_max``,
``gamma1_min``, ``gamma1_max``,
``gamma2_min``, ``gamma2_max``.
"""
size = 1000000
_rng_state = np.random.get_state()
_numba_threads = get_num_threads()
try:
np.random.seed(42)
_seed_numba_rng(42)
# Keep njit+prange samplers deterministic while computing bounds.
set_num_threads(1)
q_array = getattr(self.axis_ratio, "x_array", None)
if q_array is not None:
q_min = np.min(q_array)
q_max = np.max(q_array)
else:
zl = np.random.uniform(0.1, 10.0, size)
if self.velocity_dispersion.conditioned_y_array is not None:
sigma = self.velocity_dispersion.rvs(size, zl)
else:
sigma = self.velocity_dispersion.rvs(size)
if self.axis_ratio.conditioned_y_array is not None:
q = self.axis_ratio.rvs(size, sigma)
else:
q = self.axis_ratio.rvs(size)
q_min, q_max = q.min(), q.max()
del_q = 2 * (q_max - q_min) / size_list[0]
if (q_min - del_q) > 0:
q_min = q_min - del_q
if (q_max + del_q) < 1:
q_max = q_max + del_q
if del_q == 0:
q_min = max(0.2, q_min - 0.01)
q_max = min(1.0, q_max + 0.01)
phi_info = getattr(self.axis_rotation_angle, "info", None)
if isinstance(phi_info, dict):
phi_min = phi_info.get("x_min", 0.0)
phi_max = phi_info.get("x_max", 2.0 * np.pi)
else:
phi_min = 0.0
phi_max = 2.0 * np.pi
if phi_min == phi_max:
phi = self.axis_rotation_angle.rvs(size)
phi_min, phi_max = phi.min(), phi.max()
del_phi = 2 * (phi_max - phi_min) / size_list[1]
if del_phi == 0:
phi_min = max(0.0, phi_min - 0.01)
phi_max = min(2.0 * np.pi, phi_max + 0.01)
# Build deterministic e1/e2 bounds from a fixed q-phi grid.
n_q = max(256, int(size_list[0]) * 16)
n_phi = max(256, int(size_list[1]) * 16)
q_grid = np.linspace(q_min, q_max, n_q)
phi_grid = np.linspace(phi_min, phi_max, n_phi)
q_eval = np.repeat(q_grid, n_phi)
phi_eval = np.tile(phi_grid, n_q)
e1, e2 = phi_q2_ellipticity(phi_eval, q_eval)
e1_min, e1_max = e1.min(), e1.max()
e2_min, e2_max = e2.min(), e2.max()
del_e1 = 2 * (e1_max - e1_min) / size_list[0]
del_e2 = 2 * (e2_max - e2_min) / size_list[1]
e1_min = max(-0.9999, e1_min - del_e1)
e1_max = min(0.9999, e1_max + del_e1)
e2_min = max(-0.9999, e2_min - del_e2)
e2_max = min(0.9999, e2_max + del_e2)
if del_e1 == 0:
e1_min = max(-0.9999, e1_min - 0.01)
e1_max = min(0.9999, e1_max + 0.01)
if del_e2 == 0:
e2_min = max(-0.9999, e2_min - 0.01)
e2_max = min(0.9999, e2_max + 0.01)
gamma = self.density_profile_slope.rvs(size)
gamma_min, gamma_max = gamma.min(), gamma.max()
del_gamma = 2 * (gamma_max - gamma_min) / size_list[2]
gamma_min = gamma_min - del_gamma
gamma_max = gamma_max + del_gamma
if del_gamma == 0:
gamma_min = gamma_min - 0.01
gamma_max = gamma_max + 0.01
gamma1 = self.external_shear1.rvs(size)
gamma2 = self.external_shear2.rvs(size)
gamma1_min, gamma1_max = gamma1.min(), gamma1.max()
gamma2_min, gamma2_max = gamma2.min(), gamma2.max()
del_gamma1 = 2 * (gamma1_max - gamma1_min) / size_list[3]
del_gamma2 = 2 * (gamma2_max - gamma2_min) / size_list[4]
gamma1_min = gamma1_min - del_gamma1
gamma1_max = gamma1_max + del_gamma1
gamma2_min = gamma2_min - del_gamma2
gamma2_max = gamma2_max + del_gamma2
if del_gamma1 == 0:
gamma1_min = gamma1_min - 0.0001
gamma1_max = gamma1_max + 0.0001
if del_gamma2 == 0:
gamma2_min = gamma2_min - 0.0001
gamma2_max = gamma2_max + 0.0001
finally:
np.random.set_state(_rng_state)
set_num_threads(_numba_threads)
bounds = dict(
q_min=q_min, q_max=q_max,
phi_min=phi_min, phi_max=phi_max,
e1_min=e1_min, e1_max=e1_max,
e2_min=e2_min, e2_max=e2_max,
gamma_min=gamma_min, gamma_max=gamma_max,
gamma1_min=gamma1_min, gamma1_max=gamma1_max,
gamma2_min=gamma2_min, gamma2_max=gamma2_max,
)
return {key: float(np.round(value, 4)) for key, value in bounds.items()}
[docs]
def create_parameter_grid(self, size_list=[25, 25, 45, 15, 15], spacing_config=None, bounds=None):
"""
Create a parameter grid for lens galaxies.
Parameters
----------
size_list : list
List of sizes for each parameter grid.
spacing_config : dict or None
Optional per-parameter spacing configuration. Keys can include
``q``, ``phi``, ``e1``, ``e2``, ``gamma``, ``gamma1``, ``gamma2`` and values are
dictionaries, e.g. ``{"mode": "two_sided_mixed_grid", "power_law_part": "lower", "spacing_trend": "increasing", "power": 2.5, "value_transition_fraction": 0.6, "num_transition_fraction": 0.3, "auto_match_slope": True}``.
bounds : dict or None
Pre-computed parameter bounds from ``_compute_parameter_bounds``. If
None, bounds are computed internally.
Returns
-------
zl : numpy.ndarray
Lens redshifts.
sigma : numpy.ndarray
Velocity dispersions.
q : numpy.ndarray
Axis ratios.
"""
if bounds is None:
bounds = self._compute_parameter_bounds(size_list)
e1_min = bounds["e1_min"]
e1_max = bounds["e1_max"]
e2_min = bounds["e2_min"]
e2_max = bounds["e2_max"]
gamma_min = bounds["gamma_min"]
gamma_max = bounds["gamma_max"]
gamma1_min = bounds["gamma1_min"]
gamma1_max = bounds["gamma1_max"]
gamma2_min = bounds["gamma2_min"]
gamma2_max = bounds["gamma2_max"]
if spacing_config is None:
spacing_config = {}
###########################
# sampling the parameters #
###########################
# q = self._parameter_spacing(
# q_min, q_max, size_list[0], spacing_config.get("q")
# )
# phi = self._parameter_spacing(
# phi_min, phi_max, size_list[1], spacing_config.get("phi")
# )
# e1, e2 = phi_q2_ellipticity(phi, q)
# Build interpolation axes directly in e1/e2 space.
# This avoids distortions introduced by elementwise (q, phi) -> (e1, e2)
# pairing when nonuniform spacing is used.
e1 = self._parameter_spacing(
e1_min,
e1_max,
size_list[0],
spacing_config.get("e1", spacing_config.get("e1")),
)
e2 = self._parameter_spacing(
e2_min,
e2_max,
size_list[1],
spacing_config.get("e2", spacing_config.get("e2")),
)
e1 = np.sort(e1)
e2 = np.sort(e2)
# print(f"e1 : {e1}")
# print(f"e2 : {e2}")
gamma = self._parameter_spacing(
gamma_min, gamma_max, size_list[2], spacing_config.get("gamma")
)
gamma1 = self._parameter_spacing(
gamma1_min, gamma1_max, size_list[3], spacing_config.get("gamma1")
)
gamma2 = self._parameter_spacing(
gamma2_min, gamma2_max, size_list[4], spacing_config.get("gamma2")
)
# return q, phi, e1, e2, gamma, gamma1, gamma2
return e1, e2, gamma, gamma1, gamma2
[docs]
def cross_section_epl_shear_interpolation_init(
self, file_path, size_list, spacing_config=None, batch_size=50000, bounds=None
):
print(f"Cross section interpolation data points will be created at {file_path}")
# ----------------------------------
# cross section unit to cross section ratio fitting
# ----------------------------------
# # --------------------------
# # warm up if cross_section_epl_shear_numerical_mp is used
# # --------------------------
# size = 2 # small size for warm up
# zs = np.random.uniform(self.z_min, self.z_max, size)
# zl = np.zeros(size)
# for i in range(size):
# zl[i] = np.random.uniform(0.001, zs[i] - 0.001)
# sigma = np.random.uniform(
# self.lens_priors_params["velocity_dispersion"]["sigma_min"],
# self.lens_priors_params["velocity_dispersion"]["sigma_max"],
# size,
# )
# # Sample individual parameter values (not grid)
# try:
# q = self.axis_ratio.rvs(size, sigma)
# except TypeError:
# q = self.axis_ratio.rvs(size)
# phi = self.axis_rotation_angle.rvs(size)
# e1, e2 = phi_q2_ellipticity(phi, q)
# gamma = self.density_profile_slope.rvs(size)
# gamma1 = self.external_shear1.rvs(size)
# gamma2 = self.external_shear2.rvs(size)
# size = np.prod(size_list)
# # --------------------------
# cs data points for interpolation
e1, e2, gamma, gamma1, gamma2 = self.create_parameter_grid(
size_list=size_list, spacing_config=spacing_config, bounds=bounds
)
e1_arr, e2_arr, gamma_arr, gamma1_arr, gamma2_arr = np.meshgrid(
e1, e2, gamma, gamma1, gamma2, indexing="ij"
)
e1_arr = e1_arr.flatten()
e2_arr = e2_arr.flatten()
gamma_arr = gamma_arr.flatten()
gamma1_arr = gamma1_arr.flatten()
gamma2_arr = gamma2_arr.flatten()
# # ---------
# cs_unit = self.cross_section_epl_shear_numerical_mp(
# theta_E=np.ones(size),
# gamma=gamma_arr,
# gamma1=gamma1_arr,
# gamma2=gamma2_arr,
# e1=e1_arr,
# e2=e2_arr,
# verbose=True,
# )
# # ---------
# --------------------------
# cross_section_epl_shear_unit njit
# --------------------------
from ..image_properties.cross_section_njit import cross_section_epl_shear_unit
# Set Numba threads to npool before entering prange
from numba import set_num_threads
set_num_threads(self.npool)
args = self.lens_functions_params["cross_section"]
if args is not None and 'num_th' in args:
num_th = args['num_th']
else:
num_th = 500
if args is not None and 'maginf' in args:
maginf = args['maginf']
else:
maginf = 100.0
# print(f"\n num_th: {num_th}, maginf: {maginf} \n")
# warm up the Numba function
cross_section_epl_shear_unit(
e1=e1_arr[:2],
e2=e2_arr[:2],
gamma=gamma_arr[:2],
gamma1=gamma1_arr[:2],
gamma2=gamma2_arr[:2],
num_th=num_th,
maginf=maginf,
)
"""Memory-efficient wrapper - reuses the same small work arrays."""
n = len(e1_arr)
cs_unit = np.empty(n)
print(
f"Computing n={int(n)} cross-section data points for interpolation with ler.image_properties.cross_section_njit.cross_section_epl_shear_unit..."
)
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
cs_unit[start:end] = cross_section_epl_shear_unit(
e1=e1_arr[start:end], e2=e2_arr[start:end], gamma=gamma_arr[start:end],
gamma1=gamma1_arr[start:end], gamma2=gamma2_arr[start:end],
num_th=num_th, maginf=maginf
)
# --------------------------
# # find slope and intercept for cs_unit to cs conversion
# print("Finding the conversion factor from cross-section unit to actual cross-section...")
# # choose random gamma, gamma1, gamma2 for testing cs_unit to cs conversion
# size = min(10000, len(gamma_arr))
# idx = np.random.choice(len(gamma_arr), size, replace=False)
# theta_E = np.random.uniform(1.0e-10, 1.0e-8, size)
# if self.lens_type in ["epl_shear_galaxy", "sie_galaxy"]:
# cs = cross_section_epl_shear_unit(
# e1=e1_arr[idx],
# e2=e2_arr[idx],
# gamma=gamma_arr[idx],
# gamma1=gamma1_arr[idx],
# gamma2=gamma2_arr[idx],
# theta_E=theta_E,
# )
# elif self.lens_type == "sis_galaxy":
# cs = np.pi * theta_E * theta_E * np.ones(size)
# else:
# raise ValueError(f"Unsupported lens type {self.lens_type} for cross-section interpolation.")
# # cs = self.cross_section_epl_shear_numerical_mp(
# # theta_E=theta_E,
# # gamma=gamma_arr[idx],
# # gamma1=gamma1_arr[idx],
# # gamma2=gamma2_arr[idx],
# # e1=e1_arr[idx],
# # e2=e2_arr[idx],
# # verbose=False,
# # )
# y_ = cs / cs_unit[idx]
# x_ = np.pi * theta_E * theta_E # SIS cross-section as reference
# valid = np.isfinite(y_) & np.isfinite(x_) & (y_ > 0.0) & (cs_unit[idx] > 0.0)
# if np.count_nonzero(valid) >= 2:
# cs_csunit_slope, cs_csunit_intercept = np.polyfit(x_[valid], y_[valid], 1)
# else:
# cs_csunit_slope = 0.31830988618379075
# cs_csunit_intercept = -3.2311742677852644e-27
cs_csunit_slope = 0.31830988618379075
cs_csunit_intercept = -3.2311742677852644e-27
# # calculate relative error and adjust the offset
# size = min(100, len(gamma_arr))
# idx = np.random.choice(len(gamma_arr), size, replace=False)
# theta_E = np.random.uniform(1.0e-12, 1.0e-5, size)
# cs_pred_unit = cross_section_epl_shear_unit(
# e1=e1_arr[idx],
# e2=e2_arr[idx],
# gamma=gamma_arr[idx],
# gamma1=gamma1_arr[idx],
# gamma2=gamma2_arr[idx],
# )
# cs_sis = np.pi * theta_E * theta_E
# cs_pred = cs_pred_unit * (cs_csunit_intercept + cs_csunit_slope * cs_sis)
# if self.lens_type in ["epl_shear_galaxy", "sie_galaxy"]:
# cs_true = self.cross_section_epl_shear_numerical_mp(
# theta_E=theta_E,
# gamma=gamma_arr[idx],
# gamma1=gamma1_arr[idx],
# gamma2=gamma2_arr[idx],
# e1=e1_arr[idx],
# e2=e2_arr[idx],
# verbose=False,
# )
# elif self.lens_type == "sis_galaxy":
# cs_true = np.pi * theta_E * theta_E * np.ones(size)
# idx_nonzero = cs_true > 0.0
# relative_error = np.median((cs_pred[idx_nonzero] - cs_true[idx_nonzero]) / cs_true[idx_nonzero])
# # correct the intercept to minimize the median relative error
# cs_csunit_slope = cs_csunit_slope / (1 + relative_error)
# cs_csunit_intercept = cs_csunit_intercept / (1 + relative_error)
# ----------
cs_unit_grid = cs_unit.reshape(size_list)
# save cs data
save_json(
file_path,
[
e1,
e2,
gamma,
gamma1,
gamma2,
cs_unit_grid,
cs_csunit_slope,
cs_csunit_intercept,
],
)
return e1, e2, gamma, gamma1, gamma2, cs_unit_grid, cs_csunit_slope, cs_csunit_intercept
[docs]
def cross_section_epl_shear_interpolation(
self,
zs=None,
zl=None,
sigma=None,
q=None,
phi=None,
gamma=None,
gamma1=None,
gamma2=None,
get_attribute=False,
size_list=[25, 25, 45, 15, 15],
spacing_config=None,
**kwargs,
):
"""
Function to compute the cross-section correction factor
"""
from .cross_section_interpolator import make_cross_section_area_reinit
# spacing_config = kwargs.get("grid_spacing_config", None)
if spacing_config is None:
# spacing_config = {'e1': {'mode': 'mixed', 'geom_fraction': 0.6, 'transition_fraction': 0.3}, 'e2': {'mode': 'mixed', 'geom_fraction': 0.6, 'transition_fraction': 0.3}, 'gamma': {'mode': 'linear'}, 'gamma1': {'mode': 'linear'}, 'gamma2': {'mode': 'linear'}}
spacing_config = self.create_new_interpolator["cross_section"]["spacing_config"]
bounds = self._compute_parameter_bounds(size_list)
param_dict_given = dict(
z_min=self.z_min,
z_max=self.z_max,
resolution=self.create_new_interpolator["cross_section"]["resolution"],
size_list=size_list,
velocity_dispersion=dict(
sigma_min=self.lens_priors_params["velocity_dispersion"]["sigma_min"],
sigma_max=self.lens_priors_params["velocity_dispersion"]["sigma_max"],
),
axis_ratio=dict(
q_min=bounds["q_min"],
q_max=bounds["q_max"],
),
axis_rotation_angle=dict(
phi_min=bounds["phi_min"],
phi_max=bounds["phi_max"],
),
density_profile_slope=dict(
gamma_min=bounds["gamma_min"],
gamma_max=bounds["gamma_max"],
),
external_shear1=dict(
gamma1_min=bounds["gamma1_min"],
gamma1_max=bounds["gamma1_max"],
),
external_shear2=dict(
gamma2_min=bounds["gamma2_min"],
gamma2_max=bounds["gamma2_max"],
),
grid_spacing_config=spacing_config,
)
file_path, it_exist = interpolator_json_path(
identifier_dict=param_dict_given,
directory=self.directory,
sub_directory="cross_section_function",
interpolator_name="cross_section_function",
)
if (self.create_new_interpolator["cross_section"]["create_new"] is True) or (
it_exist is False
):
(
e1_grid,
e2_grid,
gamma_grid,
gamma1_grid,
gamma2_grid,
cs_unit_grid,
cs_csunit_slope,
cs_csunit_intercept,
) = self.cross_section_epl_shear_interpolation_init(
file_path, size_list, spacing_config=spacing_config, bounds=bounds
)
else:
print(f"Cross section interpolation data points loaded from {file_path}")
(
e1_grid,
e2_grid,
gamma_grid,
gamma1_grid,
gamma2_grid,
cs_unit_grid,
cs_csunit_slope,
cs_csunit_intercept,
) = load_json(file_path)
self.cs_data_points_path = file_path
# Create the interpolator instance
cs_caculator = make_cross_section_area_reinit(
e1_grid=np.array(e1_grid),
e2_grid=np.array(e2_grid),
gamma_grid=np.array(gamma_grid),
gamma1_grid=np.array(gamma1_grid),
gamma2_grid=np.array(gamma2_grid),
cs_unit_grid=np.array(cs_unit_grid),
Da_instance=self.angular_diameter_distance.function,
csunit_to_cs_slope=cs_csunit_slope,
csunit_to_cs_intercept=cs_csunit_intercept,
)
# # define conversion function: cs_unit to cs
# self.cs_unit_to_cs = lambda area_sis: cs_csunit_slope * area_sis + cs_csunit_intercept
return (
cs_caculator
if get_attribute
else cs_caculator(
zs=zs,
zl=zl,
sigma=sigma,
q=q,
phi=phi,
gamma=gamma,
gamma1=gamma1,
gamma2=gamma2,
)
)
# ----------------------------------------------------
# EPL + external shear cross section using Numba njit
# ----------------------------------------------------
[docs]
def cross_section_epl_shear_njit(
self,
zs,
zl,
sigma,
q,
phi,
gamma,
gamma1,
gamma2,
get_attribute=False,
num_th=500,
maginf=-100.0,
**kwargs,
):
"""
Compute the lensing cross-section for EPL+shear lens model using Numba njit.
Parameters
----------
zs : ``float`` or ``numpy.ndarray``
Source redshift(s).
zl : ``float`` or ``numpy.ndarray``
Lens redshift(s).
sigma : ``float`` or ``numpy.ndarray``
Velocity dispersion(s) in km/s.
q : ``float`` or ``numpy.ndarray``
Axis ratio(s) (b/a).
phi : ``float`` or ``numpy.ndarray``
Axis rotation angle(s) in radians.
gamma : ``float`` or ``numpy.ndarray``
Density profile slope(s).
gamma1 : ``float`` or ``numpy.ndarray``
External shear component 1.
gamma2 : ``float`` or ``numpy.ndarray``
External shear component 2.
get_attribute : ``bool``, optional
If True, return the interpolator instance instead of the cross-section.
Default is False.
num_th : ``int``, optional
Number of theta values to use for the spline interpolation.
Default is 500.
maginf : ``float``, optional
Magnitude limit for the cross-section calculation.
Default is -100.0.
**kwargs
Additional keyword arguments to pass to the interpolator.
Returns
-------
cs_caculator : ``ler.image_properties.cross_section_njit.CrossSectionNjit``
The interpolator instance (if get_attribute=True).
cross_section : ``float`` or ``numpy.ndarray``
Lensing cross-section in arcsec^2.
"""
from ..image_properties.cross_section_njit import make_cross_section_area_reinit
# Create the interpolator instance
Da_instance = self.angular_diameter_distance.function
cs_caculator = make_cross_section_area_reinit(
Da_instance=Da_instance,
num_th=num_th,
maginf=maginf,
)
return (
cs_caculator
if get_attribute
else cs_caculator(
zs=zs,
zl=zl,
sigma=sigma,
q=q,
phi=phi,
gamma=gamma,
gamma1=gamma1,
gamma2=gamma2,
)
)
# -------------------
# properties
# -------------------
@property
[docs]
def optical_depth(self):
"""
Strong lensing optical depth calculator.
Returns
-------
optical_depth : ``FunctionConditioning``
Function object with `.function(zs)` method that returns \\n
optical depth for given source redshifts.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> tau = od.optical_depth.function(np.array([1.0, 2.0]))
"""
return self._optical_depth
@optical_depth.setter
def optical_depth(self, input_function):
# Handle available lens functions (need to map to private method)
available_functions_mapping = {
"optical_depth_numerical": "optical_depth_numerical",
}
if input_function in self.available_lens_functions["optical_depth"]:
print(f"using ler available optical depth function : {input_function}")
# Map to private method if it exists
method_name = available_functions_mapping.get(
input_function, input_function
)
args = self.lens_functions_params["optical_depth"]
if args is None:
self._optical_depth = getattr(self, method_name)(
zs=None, get_attribute=True
)
else:
self._optical_depth = getattr(self, method_name)(
zs=None, get_attribute=True, **args
)
elif isinstance(input_function, FunctionConditioning):
print(
"using user provided custom optical depth class/object of type ler.utils.FunctionConditioning"
)
self._optical_depth = input_function
elif callable(input_function):
print("using user provided custom optical depth function")
self._optical_depth = FunctionConditioning(
function=None, x_array=None, create_function=input_function
)
else:
raise ValueError(
"optical depth function should be string in available_lens_functions['optical_depth'] or callable or class object of 'ler.utils.FunctionConditioning'"
)
@property
[docs]
def velocity_dispersion(self):
"""
Velocity dispersion sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size, zl)``: Sample velocity dispersion values \n
- ``pdf(sigma, zl)``: Get probability density \n
- ``function(sigma, zl)``: Get number density function \n
Returns
-------
velocity_dispersion : ``FunctionConditioning``
Sampler object for velocity dispersion (km/s).
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> sigma = od.velocity_dispersion(size=100, zl=np.ones(100)*0.5)
"""
return self._velocity_dispersion
@velocity_dispersion.setter
def velocity_dispersion(self, prior):
if prior in self.available_lens_priors["velocity_dispersion"]:
print(f"using ler available velocity dispersion function : {prior}")
args = self.lens_priors_params["velocity_dispersion"]
if prior == "velocity_dispersion_choi":
prior = "velocity_dispersion_bernardi"
if args is None:
self._velocity_dispersion = getattr(self, prior)(
size=None, zl=None, get_attribute=True
)
else:
self._velocity_dispersion = getattr(self, prior)(
size=None, zl=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom velocity_dispersion class/object of type ler.utils.FunctionConditioning"
)
self._velocity_dispersion = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
and hasattr(prior, "function")
and callable(getattr(prior, "function"))
):
self._velocity_dispersion = prior
else:
raise ValueError(
"velocity_dispersion must be either a sampler name from available_lens_priors['velocity_dispersion'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def axis_ratio(self):
"""
Axis ratio sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size, sigma)``: Sample axis ratio values \n
- ``pdf(q, sigma)``: Get probability density \n
- ``function(q, sigma)``: Get distribution function \n
Returns
-------
axis_ratio : ``FunctionConditioning``
Sampler object for axis ratio.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> q = od.axis_ratio(size=100, sigma=np.ones(100)*200.)
"""
return self._axis_ratio
@axis_ratio.setter
def axis_ratio(self, prior):
if prior in self.available_lens_priors["axis_ratio"]:
print(f"using ler available axis_ratio function : {prior}")
args = self.lens_priors_params["axis_ratio"]
if args is None:
self._axis_ratio = getattr(self, prior)(
size=None, sigma=None, get_attribute=True
)
else:
self._axis_ratio = getattr(self, prior)(
size=None, sigma=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom axis_ratio class/object of type ler.utils.FunctionConditioning"
)
self._axis_ratio = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._axis_ratio = prior
else:
raise ValueError(
"axis_ratio must be either a string in available_lens_priors['axis_ratio'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def zs_sl(self):
"""
Strongly-lensed source redshift sampler object.
Returns
-------
zs_sl : ``FunctionConditioning``
Sampler object for source redshift conditioned on strong lensing.
Notes
-----
The default strongly-lensed source redshift sampler,
``strongly_lensed_source_redshift``, is implemented in
:class:`~ler.lens_galaxy_population.LensGalaxyParameterDistribution`.
This property exists in ``OpticalDepth`` so that derived classes can
configure/override the prior using the common lens-prior mechanism.
"""
return self._zs_sl
@zs_sl.setter
def zs_sl(self, prior):
if prior in self.available_lens_priors["zs_sl"]:
print(f"using ler available zs_sl function : {prior}")
args = self.lens_priors_params["zs_sl"]
if args is None:
self._zs_sl = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._zs_sl = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom zs_sl class/object of type ler.utils.FunctionConditioning"
)
self._zs_sl = prior
else:
raise ValueError(
"zs_sl should be a string in available_lens_priors['zs_sl'] "
"or an instance of `ler.utils.FunctionConditioning`."
)
@property
[docs]
def lens_redshift_sl(self):
"""
Strongly lensed lens redshift sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size, zs)``: Sample lens redshifts given source redshifts \n
- ``pdf(zl, zs)``: Get probability density \n
- ``function(zl, zs)``: Get effective lensing cross-section \n
Returns
-------
lens_redshift_sl : ``FunctionConditioning``
Sampler object for strongly lensed lens redshift.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> zl = od.lens_redshift_sl(size=100, zs=np.ones(100)*2.0)
"""
return self._lens_redshift_sl
@lens_redshift_sl.setter
def lens_redshift_sl(self, prior):
if prior in self.available_lens_priors["lens_redshift_sl"]:
print(f"using ler available lens_redshift_sl function : {prior}")
args = self.lens_priors_params["lens_redshift_sl"]
if args is None:
self._lens_redshift_sl = getattr(self, prior)(
size=None, zs=None, get_attribute=True
)
else:
self._lens_redshift_sl = getattr(self, prior)(
size=None, zs=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom lens_redshift_sl class/object of type ler.utils.FunctionConditioning"
)
self._lens_redshift_sl = prior
else:
raise ValueError(
"lens_redshift_sl should be string in available_lens_priors['lens_redshift_sl'] or class object of 'ler.utils.FunctionConditioning'"
)
@property
[docs]
def lens_redshift(self):
"""
Lens redshift (intrinsic) sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size)``: Sample lens redshifts \n
- ``pdf(zl)``: Get probability density \n
Returns
-------
lens_redshift : ``FunctionConditioning``
Sampler object for lens redshift.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> zl = od.lens_redshift(size=100)
"""
return self._lens_redshift
@lens_redshift.setter
def lens_redshift(self, prior):
if prior in self.available_lens_priors["lens_redshift"]:
print(f"using ler available lens_redshift function : {prior}")
args = self.lens_priors_params["lens_redshift"]
if args is None:
self._lens_redshift = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._lens_redshift = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom lens_redshift class/object of type ler.utils.FunctionConditioning"
)
self._lens_redshift = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._lens_redshift = prior
else:
raise ValueError(
"lens_redshift must be either a string in available_lens_priors['lens_redshift'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def axis_rotation_angle(self):
"""
Axis rotation angle sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size)``: Sample axis rotation angles \n
- ``pdf(phi)``: Get probability density \n
Returns
-------
axis_rotation_angle : ``FunctionConditioning``
Sampler object for axis rotation angle (rad).
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> phi = od.axis_rotation_angle(size=100)
"""
return self._axis_rotation_angle
@axis_rotation_angle.setter
def axis_rotation_angle(self, prior):
if prior in self.available_lens_priors["axis_rotation_angle"]:
print(f"using ler available axis_rotation_angle function : {prior}")
args = self.lens_priors_params["axis_rotation_angle"]
if args is None:
self._axis_rotation_angle = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._axis_rotation_angle = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom axis_rotation_angle class/object of type ler.utils.FunctionConditioning"
)
self._axis_rotation_angle = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._axis_rotation_angle = prior
else:
raise ValueError(
"axis_rotation_angle must be either a string in available_lens_priors['axis_rotation_angle'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def external_shear1(self):
"""
External shear sampler object (component 1).
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size)``: Sample shear component 1 (gamma1) \n
- ``pdf(gamma1)``: Get probability density \n
Returns
-------
external_shear1 : ``FunctionConditioning``
Sampler object for external shear (component 1).
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> gamma1 = od.external_shear1(size=100)
"""
return self._external_shear1
@external_shear1.setter
def external_shear1(self, prior):
if prior in self.available_lens_priors["external_shear1"]:
print(f"using ler available external_shear1 function : {prior}")
args = self.lens_priors_params["external_shear1"]
if args is None:
self._external_shear1 = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._external_shear1 = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom external_shear1 class/object of type ler.utils.FunctionConditioning"
)
self._external_shear1 = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._external_shear1 = prior
else:
raise ValueError(
"external_shear1 must be either a string in available_lens_priors['external_shear1'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def external_shear2(self):
"""
External shear sampler object (component 2).
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size)``: Sample shear component 2 (gamma2) \n
- ``pdf(gamma2)``: Get probability density \n
Returns
-------
external_shear2 : ``FunctionConditioning``
Sampler object for external shear (component 2).
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> gamma2 = od.external_shear2(size=100)
"""
return self._external_shear2
@external_shear2.setter
def external_shear2(self, prior):
if prior in self.available_lens_priors["external_shear2"]:
print(f"using ler available external_shear2 function : {prior}")
args = self.lens_priors_params["external_shear2"]
if args is None:
self._external_shear2 = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._external_shear2 = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom external_shear2 class/object of type ler.utils.FunctionConditioning"
)
self._external_shear2 = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._external_shear2 = prior
else:
raise ValueError(
"external_shear2 must be either a string in available_lens_priors['external_shear2'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def density_profile_slope(self):
"""
Density profile slope sampler object.
Returns a ``FunctionConditioning`` object with methods: \n
- ``rvs(size)``: Sample density profile slope values \n
- ``pdf(gamma)``: Get probability density \n
Returns
-------
density_profile_slope : ``FunctionConditioning``
Sampler object for density profile slope.
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> gamma = od.density_profile_slope(size=100)
"""
return self._density_profile_slope
@density_profile_slope.setter
def density_profile_slope(self, prior):
if prior in self.available_lens_priors["density_profile_slope"]:
print(f"using ler available density_profile_slope function : {prior}")
args = self.lens_priors_params["density_profile_slope"]
if args is None:
self._density_profile_slope = getattr(self, prior)(
size=None, get_attribute=True
)
else:
self._density_profile_slope = getattr(self, prior)(
size=None, get_attribute=True, **args
)
elif isinstance(prior, FunctionConditioning):
print(
"using user provided custom density_profile_slope class/object of type ler.utils.FunctionConditioning"
)
self._density_profile_slope = prior
elif (
hasattr(prior, "rvs")
and callable(getattr(prior, "rvs"))
and hasattr(prior, "pdf")
and callable(getattr(prior, "pdf"))
):
self._density_profile_slope = prior
else:
raise ValueError(
"density_profile_slope must be either a string in available_lens_priors['density_profile_slope'] "
"or an instance providing both `.rvs(...)` and `.pdf(...)` (e.g. `ler.utils.FunctionConditioning`)."
)
@property
[docs]
def cross_section(self):
"""
Lensing cross-section calculator.
Returns a callable that computes lensing cross-section for individual \n
lensing events. Input parameters depend on lens type: \n
- EPL+shear: zs, zl, sigma, q, phi, gamma, gamma1, gamma2 \n
- SIE: zs, zl, sigma, q \n
- SIS: zs, zl, sigma \n
Returns
-------
cross_section : ``callable``
Cross-section function (rad² units).
Examples
--------
>>> from ler.lens_galaxy_population import OpticalDepth
>>> od = OpticalDepth()
>>> cs = od.cross_section(zs=zs, zl=zl, sigma=sigma, ...)
"""
return self._cross_section
@cross_section.setter
def cross_section(self, cross_section):
if cross_section == "cross_section_epl_shear_interpolation":
print(f"using ler available cross_section function : {cross_section}")
# this will initialize the cross section interpolator
size_list = self.create_new_interpolator["cross_section"]["resolution"]
self._cross_section = self.cross_section_epl_shear_interpolation(
zs=None,
zl=None,
sigma=None,
q=None,
phi=None,
gamma=None,
gamma1=None,
gamma2=None,
get_attribute=True,
size_list=size_list,
)
elif cross_section == "cross_section_epl_shear_njit":
print(f"using ler available cross_section function : {cross_section}")
args = self.lens_functions_params["cross_section"]
if args is None:
self._cross_section = self.cross_section_epl_shear_njit(
zs=None,
zl=None,
sigma=None,
q=None,
phi=None,
gamma=None,
gamma1=None,
gamma2=None,
get_attribute=True,
)
else:
self._cross_section = self.cross_section_epl_shear_njit(
zs=None,
zl=None,
sigma=None,
q=None,
phi=None,
gamma=None,
gamma1=None,
gamma2=None,
get_attribute=True,
**args,
)
elif cross_section in ["cross_section_sie_feixu", "cross_section_sis"]:
print(f"using ler available cross_section function : {cross_section}")
self._cross_section = getattr(self, cross_section)(get_attribute=True)
elif callable(cross_section):
self._cross_section = cross_section
elif isinstance(cross_section, str) and hasattr(self, cross_section):
self._cross_section = getattr(self, cross_section)
else:
raise ValueError(
"cross_section should be a string, callable, or an instance of a class"
)
# --------------------
# Available samplers and functions
# --------------------
@property
[docs]
def available_lens_priors(self):
"""
Dictionary of available lens parameter samplers and their default parameters.
Returns
-------
available_lens_priors : ``dict``
Dictionary with sampler names and default parameters.
"""
self._available_lens_priors = dict(
zs_sl=dict(
strongly_lensed_source_redshift=dict(
tau_approximation=True,
),
),
lens_redshift_sl=dict(
lens_redshift_strongly_lensed_sis_analytical=None,
lens_redshift_strongly_lensed_numerical=dict(
param_name = "lens_redshift_sl",
sampler_type = "lens_redshift_strongly_lensed_numerical",
lens_type = "epl_shear_galaxy",
integration_size=50000,
use_multiprocessing=False,
cross_section_epl_shear_interpolation=False,
),
),
lens_redshift=dict(
lens_redshift_intrinsic_numerical=None,
),
velocity_dispersion=dict(
gengamma=dict(
param_name="velocity_dispersion",
sampler_type="gengamma",
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
velocity_dispersion_choi=dict(
param_name="velocity_dispersion",
sampler_type="velocity_dispersion_choi",
sigma_min=100.0,
sigma_max=400.0,
alpha=2.32,
beta=2.67,
phistar=8.0e-3 * self.cosmo.h**3,
sigmastar=161.0,
),
velocity_dispersion_bernardi=dict(
param_name="velocity_dispersion",
sampler_type="velocity_dispersion_bernardi",
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
velocity_dispersion_ewoud=dict(
param_name="velocity_dispersion",
sampler_type="velocity_dispersion_ewoud",
sigma_min=100.0,
sigma_max=400.0,
alpha=0.94,
beta=1.85,
phistar=2.099e-2 * (self.cosmo.h/0.7)**3,
sigmastar=113.78,
),
),
axis_ratio=dict(
rayleigh=dict(param_name="axis_ratio", sampler_type="rayleigh", q_min=0.2, q_max=1.0),
axis_ratio_padilla_strauss=dict(param_name="axis_ratio", sampler_type="axis_ratio_padilla_strauss", q_min=0.2, q_max=1.0),
uniform=dict(param_name="axis_ratio", sampler_type="uniform", x_min=0.2, x_max=1.0),
constant_values_n_size=dict(param_name="axis_ratio", sampler_type="constant_values_n_size", value=1.0),
),
axis_rotation_angle=dict(
uniform=dict(param_name="axis_rotation_angle", sampler_type="uniform", x_min=0.0, x_max=2 * np.pi),
constant_values_n_size=dict(param_name="axis_rotation_angle", sampler_type="constant_values_n_size", value=0.0)
),
density_profile_slope=dict[str, dict[str, str | float]](
normal=dict(param_name="density_profile_slope", sampler_type="normal", mu=1.99, sigma=0.149),
constant_values_n_size=dict(param_name="density_profile_slope", sampler_type="constant_values_n_size", value=2.0),
),
external_shear1=dict(
normal=dict(param_name="external_shear1", sampler_type="normal", mu=0.0, sigma=0.05),
constant_values_n_size=dict(param_name="external_shear1", sampler_type="constant_values_n_size", value=0.0)
),
external_shear2=dict(
normal=dict(param_name="external_shear2", sampler_type="normal", mu=0.0, sigma=0.05),
constant_values_n_size=dict(param_name="external_shear2", sampler_type="constant_values_n_size", value=0.0)
),
# source_parameters=dict(gw_parameters_rvs=None),
)
return self._available_lens_priors
@property
[docs]
def available_lens_functions(self):
"""
Dictionary of available lens functions and their default parameters.
Returns
-------
available_lens_functions : ``dict``
Dictionary with function names and default parameters.
"""
self._available_lens_functions = dict(
cross_section_based_sampler=dict(
rejection_sampler_full=dict(
n_prop=10000,
threshold_factor=1e-4,
zs_min=0.001,
zs_max=10.0,
zl_min=0.0001,
zl_max=None,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=1.4,
gamma_max=2.7,
shear_min=-0.2,
shear_max=0.2,
),
importance_sampler_full=dict(
n_prop=500,
threshold_factor=1e-4,
zs_min=0.001,
zs_max=10.0,
zl_min=0.0001,
zl_max=None,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=1.4,
gamma_max=2.7,
shear_min=-0.2,
shear_max=0.2,
),
rejection_sampler_partial=dict(
n_prop=10000,
threshold_factor=1e-4,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=1.4,
gamma_max=2.7,
shear_min=-0.2,
shear_max=0.2,
),
importance_sampler_partial=dict(
n_prop=400,
threshold_factor=1e-4,
sigma_min=100.0,
sigma_max=400.0,
q_min=0.2,
q_max=1.0,
phi_min=0.0,
phi_max=2 * np.pi,
gamma_min=1.4,
gamma_max=2.7,
shear_min=-0.2,
shear_max=0.2,
),
),
optical_depth=dict(
optical_depth_sis_analytic=dict(param_name="optical_depth", function_type="optical_depth_sis_analytic"),
optical_depth_numerical=dict(param_name="optical_depth", function_type="optical_depth_numerical"),
),
param_sampler_type=dict(
epl_shear_sl_parameters_rvs=None,
),
cross_section=dict(
cross_section_sie_feixu=None,
cross_section_sis=None,
cross_section_epl_shear_numerical=None,
cross_section_epl_shear_interpolation=dict(
num_th=500, maginf=-100.0
),
cross_section_epl_shear_njit=dict(
num_th=500, maginf=-100.0
),
),
)
return self._available_lens_functions
# -------------------------------------
# Simple instance attribute properties
# -------------------------------------
@property
[docs]
def npool(self):
"""
Number of processors for multiprocessing.
Returns
-------
npool : ``int``
Number of parallel processors.
"""
return self._npool
@npool.setter
def npool(self, value):
self._npool = value
from numba import set_num_threads
set_num_threads(value)
@property
[docs]
def z_min(self):
"""
Minimum redshift of the lens galaxy population.
Returns
-------
z_min : ``float``
Minimum redshift.
"""
return self._z_min
@z_min.setter
def z_min(self, value):
self._z_min = value
@property
[docs]
def z_max(self):
"""
Maximum redshift of the lens galaxy population.
Returns
-------
z_max : ``float``
Maximum redshift.
"""
return self._z_max
@z_max.setter
def z_max(self, value):
self._z_max = value
@property
[docs]
def cosmo(self):
"""
Cosmology object for distance calculations.
Returns
-------
cosmo : ``astropy.cosmology``
Cosmology object.
"""
return self._cosmo
@cosmo.setter
def cosmo(self, value):
self._cosmo = value
@property
[docs]
def lens_type(self):
"""
Type of lens galaxy model.
Returns
-------
lens_type : ``str``
Lens type ('epl_shear_galaxy', 'sie_galaxy', or 'sis_galaxy').
"""
return self._lens_type
@lens_type.setter
def lens_type(self, value):
self._lens_type = value
@property
[docs]
def directory(self):
"""
Directory for interpolator storage.
Returns
-------
directory : ``str``
Path to interpolator JSON files.
"""
return self._directory
@directory.setter
def directory(self, value):
self._directory = value