# -*- coding: utf-8 -*-
"""
Module for computing image properties of strongly lensed gravitational wave events.
This module contains the ImageProperties class, which computes image positions,
magnifications, time delays, and other lensing-related quantities for gravitational
wave sources that are strongly lensed by intervening galaxies. The class uses
multiprocessing to efficiently solve lens equations for large samples.
Usage:
Basic workflow example:
>>> from ler.image_properties import ImageProperties
>>> ip = ImageProperties()
>>> lens_parameters = dict(zs=np.array([2.0]), zl=np.array([0.5]), ...)
>>> result = ip.image_properties(lens_parameters)
Copyright (C) 2026 Phurailatpam Hemantakumar. Distributed under MIT License.
"""
import warnings
warnings.filterwarnings("ignore")
import logging
import multiprocessing as mp
from tqdm import tqdm
import numpy as np
from astropy.cosmology import LambdaCDM
logging.getLogger("numexpr.utils").setLevel(logging.ERROR)
# the following .py file will be called if they are not given in the class initialization
from .multiprocessing_routine_epl_shear import (
solve_lens_equation,
_init_worker_multiprocessing,
)
from ..lens_galaxy_population.lens_functions import phi_q2_ellipticity
[docs]
Mpc_to_m = 3.085677581491367e22
[docs]
class ImageProperties:
"""
Class to compute image properties of strongly lensed gravitational wave events.
This class solves the lens equation to find image positions, magnifications,
time delays, and image types (morse phase) for strongly lensed sources. It uses
multiprocessing for efficient computation of large samples.
Key Features: \n
- Solves lens equations using multiprocessing for efficiency \n
- Computes image positions, magnifications, and time delays \n
- Classifies image types using morse phase \n
- Calculates detection probabilities for lensed images \n
Parameters
----------
npool : ``int``
Number of processes for multiprocessing. \n
default: 4
n_min_images : ``int``
Minimum number of images required for a valid lensing event. \n
default: 2
n_max_images : ``int``
Maximum number of images to consider per event. \n
default: 4
time_window : ``float``
Time window for lensed events from min(geocent_time) (units: seconds). \n
default: 365*24*3600*2 (2 years)
include_effective_parameters : ``bool``
Whether to include effective parameters (effective_phase, effective_ra, effective_dec) in the output. \n
default: True
lens_model_list : ``list``
List of lens models to use. \n
default: ['EPL_NUMBA', 'SHEAR']
cosmology : ``astropy.cosmology`` or ``None``
Cosmology for distance calculations. \n
If None, uses default LambdaCDM. \n
default: LambdaCDM(H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0)
spin_zero : ``bool``
If True, spin parameters are set to zero (no spin sampling). \n
default: False
spin_precession : ``bool``
If True (and spin_zero=False), sample precessing spin parameters. \n
If False (and spin_zero=False), sample aligned/anti-aligned spins. \n
default: False
multiprocessing_verbose : ``bool``
If True, shows a progress bar for multiprocessing tasks. \n
default: True
include_redundant_parameters : ``bool``
If True, removes redundant parameters (e.g., theta_E, n_images, mass_1, mass_2, luminosity_distance) from output to save memory. \n
Examples
--------
Basic usage:
>>> from ler.image_properties import ImageProperties
>>> ip = ImageProperties()
>>> lens_parameters = dict(
... zs=np.array([2.0]),
... zl=np.array([0.5]),
... gamma1=np.array([0.0]),
... gamma2=np.array([0.0]),
... phi=np.array([0.0]),
... q=np.array([0.8]),
... gamma=np.array([2.0]),
... theta_E=np.array([1.0])
... )
>>> result = ip.image_properties(lens_parameters)
>>> print(result.keys())
Instance Methods
----------
ImageProperties has the following methods: \n
+-----------------------------------------------------+------------------------------------------------+
| Method | Description |
+=====================================================+================================================+
| :meth:`~image_properties_epl_shear` | Compute image properties for lensed events |
+-----------------------------------------------------+------------------------------------------------+
| :meth:`~get_lensed_snrs` | Compute detection probability for lensed images|
+-----------------------------------------------------+------------------------------------------------+
Instance Attributes
----------
ImageProperties has the following attributes: \n
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| Attribute | Type | Unit | Description |
+=====================================================+===========================+==========+==================================================================+
| :attr:`~npool` | ``int`` | | Number of multiprocessing workers |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~multiprocessing_verbose` | ``bool`` | | If True, shows a progress bar for multiprocessing tasks |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~n_min_images` | ``int`` | | Minimum number of images required |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~n_max_images` | ``int`` | | Maximum number of images per event |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~time_window` | ``float`` | s | Time window for lensed events |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~include_effective_parameters` | ``bool`` | | To include effective parameters in output |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~include_redundant_parameters` | ``bool`` | | If True, removes redundant parameters from output to save memory |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~lens_model_list` | ``list`` | | List of lens models |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~cosmo` | ``astropy.cosmology`` | | Cosmology for calculations |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~spin_zero` | ``bool`` | | Flag for zero spin assumption |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~spin_precession` | ``bool`` | | Flag for spin precession |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~pdet_finder` | ``callable`` | | Probability of detection calculator |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
| :attr:`~pdet_finder_output_keys` | ``list`` | | Keys for probability of detection outputs |
+-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+
"""
def __init__(
self,
npool=4,
n_min_images=2,
n_max_images=4,
lens_model_list=["EPL_NUMBA", "SHEAR"],
image_properties_function="image_properties_epl_shear",
image_properties_function_params=None,
cosmology=None,
time_window=365 * 24 * 3600 * 2, # 2 years
spin_zero=True,
spin_precession=False,
pdet_finder=None,
include_effective_parameters=False,
multiprocessing_verbose=True,
include_redundant_parameters=False,
):
self.npool = npool
self.n_min_images = n_min_images
self.n_max_images = n_max_images
self.lens_model_list = lens_model_list # list of lens models
[docs]
self.image_properties_function = getattr(self, image_properties_function)
self.spin_zero = spin_zero
self.spin_precession = spin_precession
[docs]
self.multiprocessing_verbose = multiprocessing_verbose
self.time_window = time_window
self.cosmo = (
cosmology
if cosmology
else LambdaCDM(
H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0
)
)
self.pdet_finder = pdet_finder
self.pdet_finder_output_keys = None
self.include_effective_parameters = include_effective_parameters
[docs]
self.include_redundant_parameters = include_redundant_parameters
[docs]
self.image_properties_function_params = self.available_image_properties_functions[image_properties_function]
if image_properties_function_params is not None:
self.image_properties_function_params.update(image_properties_function_params)
if image_properties_function == "image_properties_epl_shear_njit":
from .epl_shear_njit import create_epl_shear_solver
print("initializing epl shear njit solver")
self.epl_solver = create_epl_shear_solver(
max_img=self.n_max_images,
num_th=self.image_properties_function_params["num_th"],
maginf=self.image_properties_function_params["maginf"],
max_tries=self.image_properties_function_params["max_tries"],
alpha_scaling=self.image_properties_function_params["alpha_scaling"],
magnification_limit=self.image_properties_function_params["magnification_limit"],
Nmeas=self.image_properties_function_params["Nmeas"],
Nmeas_extra=self.image_properties_function_params["Nmeas_extra"],
)
[docs]
def image_properties_epl_shear_njit(self, lens_parameters):
"""
Compute image properties for strongly lensed events. This use functions similar to lenstronomy but rewritten in numba njit for speed.
Solves the lens equation using multiprocessing to find image positions,
magnifications, time delays, and image types for each lensing event.
Parameters
----------
lens_parameters : ``dict``
Dictionary containing lens and source parameters shown in the table: \n
+------------------------------+-----------+-------------------------------------------------------+
| Parameter | Units | Description |
+==============================+===========+=======================================================+
| zl | | redshift of the lens |
+------------------------------+-----------+-------------------------------------------------------+
| zs | | redshift of the source |
+------------------------------+-----------+-------------------------------------------------------+
| sigma | km s^-1 | velocity dispersion |
+------------------------------+-----------+-------------------------------------------------------+
| q | | axis ratio |
+------------------------------+-----------+-------------------------------------------------------+
| theta_E | radian | Einstein radius |
+------------------------------+-----------+-------------------------------------------------------+
| phi | rad | axis rotation angle. counter-clockwise from the |
| | | positive x-axis (RA-like axis) to the major axis of |
| | | the projected mass distribution. |
+------------------------------+-----------+-------------------------------------------------------+
| gamma | | density profile slope of EPL galaxy |
+------------------------------+-----------+-------------------------------------------------------+
| gamma1 | | external shear component in the x-direction |
| | | (RA-like axis) |
+------------------------------+-----------+-------------------------------------------------------+
| gamma2 | | external shear component in the y-direction |
| | | (Dec-like axis) |
+------------------------------+-----------+-------------------------------------------------------+
Returns
-------
lens_parameters : ``dict``
Updated dictionary with additional image properties with the following description: \n
+------------------------------+-----------+-------------------------------------------------------+
| x0_image_positions | radian | x-coordinate (RA-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| x1_image_positions | radian | y-coordinate (Dec-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| magnifications | | magnifications |
+------------------------------+-----------+-------------------------------------------------------+
| time_delays | | time delays |
+------------------------------+-----------+-------------------------------------------------------+
| image_type | | image type |
+------------------------------+-----------+-------------------------------------------------------+
| n_images | | number of images |
+------------------------------+-----------+-------------------------------------------------------+
| x_source | radian | x-coordinate (RA-like axis) of the source |
+------------------------------+-----------+-------------------------------------------------------+
| y_source | radian | y-coordinate (Dec-like axis) of the source |
+------------------------------+-----------+-------------------------------------------------------+
"""
zs = lens_parameters["zs"]
zl = lens_parameters["zl"]
gamma1, gamma2 = lens_parameters["gamma1"], lens_parameters["gamma2"]
phi = lens_parameters["phi"]
q = lens_parameters["q"]
gamma = lens_parameters["gamma"]
theta_E = lens_parameters["theta_E"]
if not self.include_redundant_parameters:
del lens_parameters["theta_E"]
# time-delay distance
Ds = self.angular_diameter_distance.function(zs)
Dl = self.angular_diameter_distance.function(zl)
Dls = self.angular_diameter_distance_z1z2.function(zl, zs)
D_dt = (1.0 + zl) * (Dl * Ds / Dls) * Mpc_to_m # in meters
# 0.beta_x_arr, 1.beta_y_arr, 2.x_img, 3.y_img, 4.mu_arr, 5.tau_arr, 6.nimg, 7.itype
from numba import set_num_threads
set_num_threads(self.npool) # match Numba threads to npool
result = self.epl_solver(theta_E, D_dt, q, phi, gamma, gamma1, gamma2)
# time-delays: convert to positive values
# time-delays will be relative to the first arrived signal of an lensed event
time_delays= result[5]
time_delays = (
time_delays - np.array([np.sort(time_delays, axis=1)[:, 0]]).T
) # this is alright if time delays are already sorted
# Return a dictionary with all of the lens information but also the BBH parameters from gw_param
image_parameters = {
"x0_image_positions": result[2],
"x1_image_positions": result[3],
"magnifications": result[4],
"time_delays": time_delays,
"image_type": result[7],
}
idx_sort = np.argsort(time_delays, axis=1) # sort each row
# idx_sort has the shape (size, n_max_images)
for key, value in image_parameters.items():
image_parameters[key] = np.take_along_axis(value, idx_sort, axis=1)
lens_parameters.update(image_parameters)
lens_parameters["n_images"] = result[6]
lens_parameters["x_source"] = result[0]
lens_parameters["y_source"] = result[1]
return lens_parameters
[docs]
def image_properties_epl_shear(self, lens_parameters):
"""
Compute image properties for strongly lensed events. This use functions from lenstronomy.
Solves the lens equation using multiprocessing to find image positions,
magnifications, time delays, and image types for each lensing event.
Parameters
----------
lens_parameters : ``dict``
Dictionary containing lens and source parameters shown in the table: \n
+------------------------------+-----------+-------------------------------------------------------+
| Parameter | Units | Description |
+==============================+===========+=======================================================+
| zl | | redshift of the lens |
+------------------------------+-----------+-------------------------------------------------------+
| zs | | redshift of the source |
+------------------------------+-----------+-------------------------------------------------------+
| sigma | km s^-1 | velocity dispersion |
+------------------------------+-----------+-------------------------------------------------------+
| q | | axis ratio |
+------------------------------+-----------+-------------------------------------------------------+
| theta_E | radian | Einstein radius |
+------------------------------+-----------+-------------------------------------------------------+
| phi | rad | axis rotation angle. counter-clockwise from the |
| | | positive x-axis (RA-like axis) to the major axis of |
| | | the projected mass distribution. |
+------------------------------+-----------+-------------------------------------------------------+
| gamma | | density profile slope of EPL galaxy |
+------------------------------+-----------+-------------------------------------------------------+
| gamma1 | | external shear component in the x-direction |
| | | (RA-like axis) |
+------------------------------+-----------+-------------------------------------------------------+
| gamma2 | | external shear component in the y-direction |
| | | (Dec-like axis) |
+------------------------------+-----------+-------------------------------------------------------+
Returns
-------
lens_parameters : ``dict``
Updated dictionary with additional image properties with the following description: \n
+------------------------------+-----------+-------------------------------------------------------+
| x0_image_positions | radian | x-coordinate (RA-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| x1_image_positions | radian | y-coordinate (Dec-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| magnifications | | magnifications |
+------------------------------+-----------+-------------------------------------------------------+
| time_delays | | time delays |
+------------------------------+-----------+-------------------------------------------------------+
| image_type | | image type |
+------------------------------+-----------+-------------------------------------------------------+
| n_images | | number of images |
+------------------------------+-----------+-------------------------------------------------------+
| x_source | radian | x-coordinate (RA-like axis) of the source |
+------------------------------+-----------+-------------------------------------------------------+
| y_source | radian | y-coordinate (Dec-like axis) of the source |
+------------------------------+-----------+-------------------------------------------------------+
"""
zs = lens_parameters["zs"]
size = len(zs)
zl = lens_parameters["zl"]
# external shear params to the 'PEMD' galaxy lens
gamma1, gamma2 = lens_parameters["gamma1"], lens_parameters["gamma2"]
# ellipticity of the galaxy lens
phi = lens_parameters["phi"]
q = lens_parameters["q"]
e1, e2 = phi_q2_ellipticity(phi, q)
gamma = lens_parameters["gamma"]
einstein_radius = lens_parameters["theta_E"]
if not self.include_redundant_parameters:
del lens_parameters["theta_E"]
# get image properties (with Multiprocessing)
iterations = np.arange(size)
input_arguments = [
(
e1_i,
e2_i,
gamma_i,
gamma1_i,
gamma2_i,
zl_i,
zs_i,
einstein_radius_i,
iterations_i,
)
for (
e1_i,
e2_i,
gamma_i,
gamma1_i,
gamma2_i,
zl_i,
zs_i,
einstein_radius_i,
iterations_i,
) in zip(e1, e2, gamma, gamma1, gamma2, zl, zs, einstein_radius, iterations)
]
# For output
# Initialize the image positions and lens argument list here.
x0_image_positions = np.ones((size, self.n_max_images)) * np.nan
x1_image_positions = np.ones((size, self.n_max_images)) * np.nan
magnifications = np.ones((size, self.n_max_images)) * np.nan
time_delays = np.ones((size, self.n_max_images)) * np.nan
determinants = np.ones((size, self.n_max_images)) * np.nan
traces = np.ones((size, self.n_max_images)) * np.nan
n_images = np.ones(size, dtype=int)
x_source, y_source = np.ones(size) * np.nan, np.ones(size) * np.nan
# Solve the lens equation
print("solving lens equations...")
# if self.n_min_images == 2:
# solve_lens_equation_ = solve_lens_equation
# elif self.n_min_images > 2:
# print("n_min_images > 2 is not supported yet")
# raise NotImplementedError
# else:
# raise ValueError("n_min_images should be greater than 1")
self._multiprocessing_error()
with mp.Pool(
processes=self.npool,
initializer=_init_worker_multiprocessing,
initargs=(
self.n_min_images,
self.lens_model_list,
self.cosmo,
),
) as pool:
if self.multiprocessing_verbose:
for result in tqdm(
pool.imap_unordered(solve_lens_equation, input_arguments),
total=len(input_arguments),
ncols=100,
):
(
x_source_i,
y_source_i,
x0_image_position_i,
x1_image_position_i,
magnifications_i,
time_delays_i,
n_image_i,
determinant_i,
trace_i,
iter_i,
) = result
n_image_i = min(n_image_i, self.n_max_images)
n_images[iter_i] = n_image_i
x0_image_position = np.ones(self.n_max_images) * np.nan
x1_image_position = np.ones(self.n_max_images) * np.nan
x0_image_position[:n_image_i] = x0_image_position_i[:n_image_i]
x1_image_position[:n_image_i] = x1_image_position_i[:n_image_i]
x0_image_positions[iter_i] = (
x0_image_position # shape = (size, n_max_images)
)
x1_image_positions[iter_i] = (
x1_image_position # shape = (size, n_max_images)
)
magnification = np.ones(self.n_max_images) * np.nan
time_delay = np.ones(self.n_max_images) * np.nan
determinant = np.ones(self.n_max_images) * np.nan
trace = np.ones(self.n_max_images) * np.nan
magnification[:n_image_i] = magnifications_i[:n_image_i]
time_delay[:n_image_i] = time_delays_i[:n_image_i]
determinant[:n_image_i] = determinant_i[:n_image_i]
trace[:n_image_i] = trace_i[:n_image_i]
# Add the magnifications, time delays, determinants, and traces to their respective arrays
magnifications[iter_i] = magnification
time_delays[iter_i] = time_delay
determinants[iter_i] = determinant
traces[iter_i] = trace
x_source[iter_i] = x_source_i
y_source[iter_i] = y_source_i
else:
for result in pool.map(solve_lens_equation, input_arguments):
(
x_source_i,
y_source_i,
x0_image_position_i,
x1_image_position_i,
magnifications_i,
time_delays_i,
n_image_i,
determinant_i,
trace_i,
iter_i,
) = result
n_image_i = min(n_image_i, self.n_max_images)
n_images[iter_i] = n_image_i
x0_image_position = np.ones(self.n_max_images) * np.nan
x1_image_position = np.ones(self.n_max_images) * np.nan
x0_image_position[:n_image_i] = x0_image_position_i[:n_image_i]
x1_image_position[:n_image_i] = x1_image_position_i[:n_image_i]
x0_image_positions[iter_i] = (
x0_image_position # shape = (size, n_max_images)
)
x1_image_positions[iter_i] = (
x1_image_position # shape = (size, n_max_images)
)
magnification = np.ones(self.n_max_images) * np.nan
time_delay = np.ones(self.n_max_images) * np.nan
determinant = np.ones(self.n_max_images) * np.nan
trace = np.ones(self.n_max_images) * np.nan
magnification[:n_image_i] = magnifications_i[:n_image_i]
time_delay[:n_image_i] = time_delays_i[:n_image_i]
determinant[:n_image_i] = determinant_i[:n_image_i]
trace[:n_image_i] = trace_i[:n_image_i]
# Add the magnifications, time delays, determinants, and traces to their respective arrays
magnifications[iter_i] = magnification
time_delays[iter_i] = time_delay
determinants[iter_i] = determinant
traces[iter_i] = trace
x_source[iter_i] = x_source_i
y_source[iter_i] = y_source_i
# time-delays: convert to positive values
# time-delays will be relative to the first arrived signal of an lensed event
time_delays = (
time_delays - np.array([np.sort(time_delays, axis=1)[:, 0]]).T
) # this is alright if time delays are already sorted
# image type classification (morse phase)
image_type = np.zeros((size, self.n_max_images))
for i in range(size):
for j in range(self.n_max_images):
if determinants[i, j] < 0:
image_type[i, j] = 2
elif traces[i, j] > 0:
image_type[i, j] = 1
elif traces[i, j] < 0:
image_type[i, j] = 3
else:
image_type[i, j] = np.nan
# Return a dictionary with all of the lens information but also the BBH parameters from gw_param
image_parameters = {
"x0_image_positions": x0_image_positions,
"x1_image_positions": x1_image_positions,
"magnifications": magnifications,
"time_delays": time_delays,
"image_type": image_type,
}
# sorting wrt time delays
idx_sort = np.argsort(time_delays, axis=1) # sort each row
# idx_sort has the shape (size, n_max_images)
for key, value in image_parameters.items():
# sort each row
image_parameters[key] = np.take_along_axis(value, idx_sort, axis=1)
lens_parameters.update(image_parameters)
lens_parameters["n_images"] = n_images
lens_parameters["x_source"] = x_source
lens_parameters["y_source"] = y_source
return lens_parameters
def _multiprocessing_error(self):
"""
Check multiprocessing guard and raise error if not in main process.
Raises
------
RuntimeError
If called from a subprocess instead of the main process, indicating
the code is not properly wrapped in ``if __name__ == "__main__":``.
"""
# to access multi-cores instead of multithreading
if mp.current_process().name != "MainProcess":
print(
"\n\n[ERROR] This multiprocessing code must be run under 'if __name__ == \"__main__\":'.\n"
"Please wrap your script entry point in this guard.\n"
"See: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming\n"
)
raise RuntimeError(
"\nMultiprocessing code must be run under 'if __name__ == \"__main__\":'.\n\n"
)
[docs]
def get_lensed_snrs(
self, lensed_param, pdet_finder=None, include_effective_parameters=False
):
"""
Compute detection probability for each lensed image.
Calculates the effective luminosity distance, geocent time, and phase
for each image accounting for magnification and morse phase, then
computes detection probabilities using the provided calculator.
Parameters
----------
lensed_param : ``dict``
Dictionary containing lensed source and image parameters given below: \n
+------------------------------+-----------+-------------------------------------------------------+
| Parameter | Units | Description |
+==============================+===========+=======================================================+
| geocent_time | s | geocent time |
+------------------------------+-----------+-------------------------------------------------------+
| ra | rad | right ascension |
+------------------------------+-----------+-------------------------------------------------------+
| dec | rad | declination |
+------------------------------+-----------+-------------------------------------------------------+
| phase | rad | phase of GW at reference freq |
+------------------------------+-----------+-------------------------------------------------------+
| psi | rad | polarization angle |
+------------------------------+-----------+-------------------------------------------------------+
| theta_jn | rad | inclination angle |
+------------------------------+-----------+-------------------------------------------------------+
| a_1 | | spin of the primary compact binary |
+------------------------------+-----------+-------------------------------------------------------+
| a_2 | | spin of the secondary compact binary |
+------------------------------+-----------+-------------------------------------------------------+
| luminosity_distance | Mpc | luminosity distance of the source |
+------------------------------+-----------+-------------------------------------------------------+
| mass_1 | Msun | mass of the primary compact binary (detector frame) |
+------------------------------+-----------+-------------------------------------------------------+
| mass_2 | Msun | mass of the secondary compact binary (detector frame) |
+------------------------------+-----------+-------------------------------------------------------+
| x0_image_positions | radian | x-coordinate (RA-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| x1_image_positions | radian | y-coordinate (Dec-like axis) of the images |
+------------------------------+-----------+-------------------------------------------------------+
| magnifications | | magnifications |
+------------------------------+-----------+-------------------------------------------------------+
| time_delays | | time delays |
+------------------------------+-----------+-------------------------------------------------------+
| image_type | | image type |
+------------------------------+-----------+-------------------------------------------------------+
pdet_finder : ``callable``
Function that computes detection probability given GW parameters.
include_effective_parameters : ``bool``
If True, includes effective parameters in output lensed_param. \n
Returns
-------
result_dict : ``dict``
Dictionary containing: \n
- 'pdet_net': network detection probability (shape: size x n_max_images) \n
- Individual detector probabilities if pdet_finder outputs them \n
lensed_param : ``dict``
Updated dictionary with effective parameters shown below: \n
+----------------------------------+-----------+------------------------------------------------+
| Parameter | Units | Description |
+==================================+===========+================================================+
| effective_luminosity_distance | Mpc | magnification-corrected distance |
| | | luminosity_distance / sqrt(|magnifications_i|) |
+----------------------------------+-----------+------------------------------------------------|
| effective_geocent_time | s | time-delay-corrected GPS time |
| | | geocent_time + time_delays_i |
+----------------------------------+-----------+------------------------------------------------|
| effective_phase | rad | morse-phase-corrected phase |
| | | phi - morse_phase_i |
+----------------------------------+-----------+------------------------------------------------+
| effective_ra | rad | RA of the image |
| | | ra + (x0_image_positions_i - x_source)/cos(dec)|
+----------------------------------+-----------+------------------------------------------------+
| effective_dec | rad | Dec of the image |
| | | dec + (x1_image_positions_i - y_source) |
+----------------------------------+-----------+------------------------------------------------+
"""
include_effective_parameters = (
include_effective_parameters or self.include_effective_parameters
)
if not self.include_redundant_parameters:
# remove redundant parameters to save memory
del (
lensed_param["n_images"],
lensed_param["mass_1_source"],
lensed_param["mass_2_source"],
)
# needed to calculate effective luminosity distance and effective time delay
magnifications = lensed_param["magnifications"]
time_delays = lensed_param["time_delays"]
image_type = lensed_param[
"image_type"
].copy() # copy to avoid modifying original
x_source = lensed_param["x_source"]
y_source = lensed_param["y_source"]
x0_image_positions = lensed_param["x0_image_positions"]
x1_image_positions = lensed_param["x1_image_positions"]
size = len(magnifications)
# image type to morse phase
image_type[image_type == 1.0] = 0.0
image_type[image_type == 2.0] = np.pi / 2
image_type[image_type == 3.0] = np.pi
# Get the binary parameters
(
mass_1,
mass_2,
luminosity_distance,
theta_jn,
psi,
ra,
dec,
phase,
geocent_time,
a_1,
a_2,
tilt_1,
tilt_2,
phi_12,
phi_jl,
) = (
lensed_param["mass_1"],
lensed_param["mass_2"],
lensed_param["luminosity_distance"],
lensed_param["theta_jn"],
lensed_param["psi"],
lensed_param["ra"],
lensed_param["dec"],
lensed_param["phase"],
lensed_param["geocent_time"],
np.zeros(size),
np.zeros(size),
np.zeros(size),
np.zeros(size),
np.zeros(size),
np.zeros(size),
)
if not self.spin_zero:
a_1, a_2 = (lensed_param["a_1"], lensed_param["a_2"])
if self.spin_precession:
tilt_1, tilt_2, phi_12, phi_jl = (
lensed_param["tilt_1"],
lensed_param["tilt_2"],
lensed_param["phi_12"],
lensed_param["phi_jl"],
)
# checking pdet_finder output keys
result_dict = self._check_pdet_finder_output_keys(
size, lensed_param, pdet_finder
)
if not self.include_redundant_parameters:
del lensed_param["luminosity_distance"]
# Get the optimal signal to noise ratios for each image
# iterate over the image type (column)
geocent_time_min = np.min(geocent_time)
geocent_time_max = geocent_time_min + self.time_window
pdet_finder_output_keys = (
self.pdet_finder_output_keys.copy()
) # copy to avoid modifying original
for i in range(self.n_max_images):
# get the effective time for each image type
effective_geocent_time = geocent_time + time_delays[:, i]
# choose only the events that are within the time range and also not nan
idx = (effective_geocent_time <= geocent_time_max) & (
effective_geocent_time >= geocent_time_min
)
# get the effective luminosity distance for each image type
effective_luminosity_distance = luminosity_distance / np.sqrt(
np.abs(magnifications[:, i])
)
# get the effective phase for each image type
# morse phase correction
effective_phase = phase - image_type[:, i]
# get the effective sky location for each image type
# flat sky location assumption
effective_dec = dec + (x1_image_positions[:, i] - y_source)
effective_ra = ra + (x0_image_positions[:, i] - x_source) / np.cos(dec)
# check for nan values
idx = (
idx
& ~np.isnan(effective_luminosity_distance)
& ~np.isnan(effective_geocent_time)
& ~np.isnan(effective_phase)
& ~np.isnan(effective_ra)
& ~np.isnan(effective_dec)
)
# Each image has their own effective luminosity distance and effective geocent time
if sum(idx) != 0:
# Returns a dictionary
pdet = pdet_finder(
gw_param_dict=dict(
mass_1=mass_1[idx],
mass_2=mass_2[idx],
luminosity_distance=effective_luminosity_distance[idx],
theta_jn=theta_jn[idx],
psi=psi[idx],
phase=effective_phase[idx],
geocent_time=effective_geocent_time[idx],
ra=effective_ra[idx],
dec=effective_dec[idx],
a_1=a_1[idx],
a_2=a_2[idx],
tilt_1=tilt_1[idx],
tilt_2=tilt_2[idx],
phi_12=phi_12[idx],
phi_jl=phi_jl[idx],
),
)
for keys in pdet_finder_output_keys:
result_dict[keys][idx, i] = pdet[keys]
if include_effective_parameters:
lensed_param = self.produce_effective_params(lensed_param)
return result_dict, lensed_param
[docs]
def recover_redundant_parameters(self, lensed_param):
"""
Recover redundant parameters in lensed_param, i.e. theta_E, n_images, mass_1, mass_2, luminosity_distance.
"""
zs = lensed_param["zs"]
# find theta_E
if "theta_E" not in lensed_param:
lensed_param["theta_E"] = self.compute_einstein_radii(
lensed_param["sigma"], lensed_param["zl"], zs
)
else:
print("theta_E is already in lensed_param, skipping computation of theta_E")
if "n_images" not in lensed_param:
x0_image_positions = lensed_param["x0_image_positions"]
n_images = np.sum(~np.isnan(x0_image_positions), axis=1)
lensed_param["n_images"] = n_images.astype(int)
else:
print(
"n_images is already in lensed_param, skipping computation of n_images"
)
if "mass_1_source" not in lensed_param or "mass_2_source" not in lensed_param:
lensed_param["mass_1_source"] = lensed_param["mass_1"] / (1 + zs)
lensed_param["mass_2_source"] = lensed_param["mass_2"] / (1 + zs)
else:
print(
"mass_1_source and mass_2_source are already in lensed_param, skipping computation of source frame masses"
)
if "luminosity_distance" not in lensed_param:
lensed_param["luminosity_distance"] = self.luminosity_distance(zs)
else:
print(
"luminosity_distance is already in lensed_param, skipping computation of luminosity_distance"
)
return lensed_param
[docs]
def produce_effective_params(self, lensed_param):
"""
Produce effective parameters for each lensed image.
Calculates the effective luminosity distance, geocent time, phase,
RA, and Dec for each image accounting for magnification and morse phase.
Parameters
----------
lensed_param : ``dict``
Dictionary containing lensed source and image parameters.
Returns
-------
lensed_param : ``dict``
Updated dictionary with effective parameters shown below: \n
+----------------------------------+-----------+------------------------------------------------+
| Parameter | Units | Description |
+==================================+===========+================================================+
| effective_luminosity_distance | Mpc | magnification-corrected distance |
| | | luminosity_distance / sqrt(|magnifications_i|) |
+----------------------------------+-----------+------------------------------------------------|
| effective_geocent_time | s | time-delay-corrected GPS time |
| | | geocent_time + time_delays_i |
+----------------------------------+-----------+------------------------------------------------|
| effective_phase | rad | morse-phase-corrected phase |
| | | phi - morse_phase_i |
+----------------------------------+-----------+------------------------------------------------+
| effective_ra | rad | RA of the image |
| | | ra + (x0_image_positions_i - x_source)/cos(dec)|
+----------------------------------+-----------+------------------------------------------------+
| effective_dec | rad | Dec of the image |
| | | dec + (x1_image_positions_i - y_source) |
+----------------------------------+-----------+------------------------------------------------+
"""
# needed to calculate effective luminosity distance and effective time delay
magnifications = lensed_param["magnifications"]
time_delays = lensed_param["time_delays"]
image_type = lensed_param[
"image_type"
].copy() # copy to avoid modifying original
x_source = lensed_param["x_source"]
y_source = lensed_param["y_source"]
x0_image_positions = lensed_param["x0_image_positions"]
x1_image_positions = lensed_param["x1_image_positions"]
ra = lensed_param["ra"]
dec = lensed_param["dec"]
# Update lensed_param with effective values only if they weren't already present
lensed_param["effective_luminosity_distance"] = lensed_param[
"luminosity_distance"
][:, np.newaxis] / np.sqrt(np.abs(magnifications))
lensed_param["effective_geocent_time"] = (
lensed_param["geocent_time"][:, np.newaxis] + time_delays
)
# image type to morse phase
image_type[image_type == 1.0] = 0.0
image_type[image_type == 2.0] = np.pi / 2.0
image_type[image_type == 3.0] = np.pi
lensed_param["effective_phase"] = (
lensed_param["phase"][:, np.newaxis] - image_type
)
# flat sky location assumption
lensed_param["effective_ra"] = ra[:, np.newaxis] + (
x0_image_positions - x_source[:, np.newaxis]
) / np.cos(dec[:, np.newaxis])
lensed_param["effective_dec"] = dec[:, np.newaxis] + (
x1_image_positions - y_source[:, np.newaxis]
)
return lensed_param
def _check_pdet_finder_output_keys(self, size, lensed_param, pdet_finder):
"""
Helper function to check and set pdet_finder output keys.
Parameters
----------
size : ``int``
Number of lensed events.
lensed_param : ``dict``
Dictionary containing lensed source and image parameters.
pdet_finder : ``callable``
Function that computes detection probability given GW parameters.
Returns
-------
result_dict : ``dict``
Initialized dictionary to hold pdet_finder outputs.
"""
# setting up result dictionary
result_dict = dict()
# checking pdet_finder
self.pdet_finder = pdet_finder if pdet_finder else self.pdet_finder
if self.pdet_finder is None:
raise ValueError("pdet_finder function must be provided")
if self.pdet_finder_output_keys is None:
# setting up pdet dictionary
# get the keys of the output dictionary of pdet_finder
gw_param_dict = dict(
mass_1=lensed_param["mass_1"][:1],
mass_2=lensed_param["mass_2"][:1],
luminosity_distance=lensed_param["luminosity_distance"][:1],
theta_jn=lensed_param["theta_jn"][:1],
psi=lensed_param["psi"][:1],
phase=lensed_param["phase"][:1],
geocent_time=lensed_param["geocent_time"][:1],
ra=lensed_param["ra"][:1],
dec=lensed_param["dec"][:1],
)
if not self.spin_zero:
gw_param_dict["a_1"] = lensed_param["a_1"][:1]
gw_param_dict["a_2"] = lensed_param["a_2"][:1]
if self.spin_precession:
gw_param_dict["tilt_1"] = lensed_param["tilt_1"][:1]
gw_param_dict["tilt_2"] = lensed_param["tilt_2"][:1]
gw_param_dict["phi_12"] = lensed_param["phi_12"][:1]
gw_param_dict["phi_jl"] = lensed_param["phi_jl"][:1]
pdet = pdet_finder(gw_param_dict=gw_param_dict)
if not isinstance(pdet, dict) or "pdet_net" not in pdet:
raise ValueError(
"pdet_finder must return a dictionary with key 'pdet_net'"
)
else:
self.pdet_finder_output_keys = list(pdet.keys())
for keys in pdet.keys():
result_dict[keys] = np.ones((size, self.n_max_images)) * np.nan
else:
for keys in self.pdet_finder_output_keys:
result_dict[keys] = np.ones((size, self.n_max_images)) * np.nan
return result_dict
# -------------
# Properties
# -------------
@property
[docs]
def npool(self):
"""
Number of multiprocessing workers.
Returns
-------
npool : ``int``
Number of processes for multiprocessing. \n
default: 4
"""
return self._npool
@npool.setter
def npool(self, value):
self._npool = value
@property
[docs]
def n_min_images(self):
"""
Minimum number of images required for a valid lensing event.
Returns
-------
n_min_images : ``int``
Minimum number of images required. \n
default: 2
"""
return self._n_min_images
@n_min_images.setter
def n_min_images(self, value):
self._n_min_images = value
@property
[docs]
def n_max_images(self):
"""
Maximum number of images per event.
Returns
-------
n_max_images : ``int``
Maximum number of images to consider per event. \n
default: 4
"""
return self._n_max_images
@n_max_images.setter
def n_max_images(self, value):
self._n_max_images = value
@property
[docs]
def time_window(self):
"""
Time window for lensed events.
Returns
-------
time_window : ``float``
Time window for lensed events (units: s). \n
default: 365*24*3600*20 (20 years)
"""
return self._time_window
@time_window.setter
def time_window(self, value):
self._time_window = value
@property
[docs]
def include_effective_parameters(self):
"""
Flag to include effective parameters in output.
Returns
-------
include_effective_parameters : ``bool``
Whether to include effective parameters in the output of get_lensed_snrs. \n
default: False
"""
return self._include_effective_parameters
@include_effective_parameters.setter
def include_effective_parameters(self, value):
self._include_effective_parameters = value
@property
[docs]
def lens_model_list(self):
"""
List of lens models to use.
Returns
-------
lens_model_list : ``list``
List of lens model names. \n
default: ['EPL_NUMBA', 'SHEAR']
"""
return self._lens_model_list
@lens_model_list.setter
def lens_model_list(self, value):
self._lens_model_list = value
@property
[docs]
def cosmo(self):
"""
Astropy cosmology object for calculations.
Returns
-------
cosmo : ``astropy.cosmology``
Cosmology used for distance calculations. \n
default: LambdaCDM(H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0)
"""
return self._cosmo
@cosmo.setter
def cosmo(self, value):
self._cosmo = value
@property
[docs]
def spin_zero(self):
"""
Flag for zero spin assumption.
Returns
-------
spin_zero : ``bool``
Whether to assume zero spin for compact objects. \n
If True, spin parameters are set to zero (no spin sampling). \n
If False, spin parameters are sampled. \n
default: False
"""
return self._spin_zero
@spin_zero.setter
def spin_zero(self, value):
self._spin_zero = value
@property
[docs]
def spin_precession(self):
"""
Flag for spin precession.
Returns
-------
spin_precession : ``bool``
Whether to include spin precession effects. \n
If True (and spin_zero=False), sample precessing spin parameters. \n
If False (and spin_zero=False), sample aligned/anti-aligned spins. \n
default: False
"""
return self._spin_precession
@spin_precession.setter
def spin_precession(self, value):
self._spin_precession = value
@property
[docs]
def pdet_finder(self):
"""
Detection probability finder function.
Returns
-------
pdet_finder : ``callable``
Function that calculates detection probability for GW events. \n
The function signature should be: \n
``pdet_finder(gw_param_dict) -> dict`` with key 'pdet_net'.
"""
return self._pdet_finder
@pdet_finder.setter
def pdet_finder(self, value):
self._pdet_finder = value
@property
[docs]
def pdet_finder_output_keys(self):
"""
Output keys from the detection probability finder function.
Returns
-------
pdet_finder_output_keys : ``list``
List of keys returned by the pdet_finder function. \n
default: None
"""
return self._pdet_finder_output_keys
@pdet_finder_output_keys.setter
def pdet_finder_output_keys(self, value):
self._pdet_finder_output_keys = value
@property
[docs]
def available_image_properties_functions(self):
"""
Dictionary of available functions for computing image properties.
Returns
-------
available_image_properties_functions : ``dict``
Dictionary with function names and default parameters.
"""
self._available_image_properties_functions = dict(
image_properties_epl_shear_njit=dict(
max_img=self.n_max_images,
num_th=500,
maginf=-100.0,
max_tries=100,
alpha_scaling=1.0,
magnification_limit=0.01,
Nmeas=400,
Nmeas_extra=80,
),
image_properties_epl_shear=None,
)
return self._available_image_properties_functions