Source code for ler.image_properties.image_properties

# -*- coding: utf-8 -*-
"""
Module for computing image properties of strongly lensed gravitational wave events.

This module contains the ImageProperties class, which computes image positions,
magnifications, time delays, and other lensing-related quantities for gravitational
wave sources that are strongly lensed by intervening galaxies. The class uses
multiprocessing to efficiently solve lens equations for large samples.

Usage:
    Basic workflow example:

    >>> from ler.image_properties import ImageProperties
    >>> ip = ImageProperties()
    >>> lens_parameters = dict(zs=np.array([2.0]), zl=np.array([0.5]), ...)
    >>> result = ip.image_properties(lens_parameters)

Copyright (C) 2026 Phurailatpam Hemantakumar. Distributed under MIT License.
"""

import warnings

warnings.filterwarnings("ignore")
import logging
import multiprocessing as mp
from tqdm import tqdm
import numpy as np
from astropy.cosmology import LambdaCDM

logging.getLogger("numexpr.utils").setLevel(logging.ERROR)

# the following .py file will be called if they are not given in the class initialization
from .multiprocessing_routine_epl_shear import (
    solve_lens_equation,
    _init_worker_multiprocessing,
)
from ..lens_galaxy_population.lens_functions import phi_q2_ellipticity

[docs] Mpc_to_m = 3.085677581491367e22
[docs] class ImageProperties: """ Class to compute image properties of strongly lensed gravitational wave events. This class solves the lens equation to find image positions, magnifications, time delays, and image types (morse phase) for strongly lensed sources. It uses multiprocessing for efficient computation of large samples. Key Features: \n - Solves lens equations using multiprocessing for efficiency \n - Computes image positions, magnifications, and time delays \n - Classifies image types using morse phase \n - Calculates detection probabilities for lensed images \n Parameters ---------- npool : ``int`` Number of processes for multiprocessing. \n default: 4 n_min_images : ``int`` Minimum number of images required for a valid lensing event. \n default: 2 n_max_images : ``int`` Maximum number of images to consider per event. \n default: 4 time_window : ``float`` Time window for lensed events from min(geocent_time) (units: seconds). \n default: 365*24*3600*2 (2 years) include_effective_parameters : ``bool`` Whether to include effective parameters (effective_phase, effective_ra, effective_dec) in the output. \n default: True lens_model_list : ``list`` List of lens models to use. \n default: ['EPL_NUMBA', 'SHEAR'] cosmology : ``astropy.cosmology`` or ``None`` Cosmology for distance calculations. \n If None, uses default LambdaCDM. \n default: LambdaCDM(H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0) spin_zero : ``bool`` If True, spin parameters are set to zero (no spin sampling). \n default: False spin_precession : ``bool`` If True (and spin_zero=False), sample precessing spin parameters. \n If False (and spin_zero=False), sample aligned/anti-aligned spins. \n default: False multiprocessing_verbose : ``bool`` If True, shows a progress bar for multiprocessing tasks. \n default: True include_redundant_parameters : ``bool`` If True, removes redundant parameters (e.g., theta_E, n_images, mass_1, mass_2, luminosity_distance) from output to save memory. \n Examples -------- Basic usage: >>> from ler.image_properties import ImageProperties >>> ip = ImageProperties() >>> lens_parameters = dict( ... zs=np.array([2.0]), ... zl=np.array([0.5]), ... gamma1=np.array([0.0]), ... gamma2=np.array([0.0]), ... phi=np.array([0.0]), ... q=np.array([0.8]), ... gamma=np.array([2.0]), ... theta_E=np.array([1.0]) ... ) >>> result = ip.image_properties(lens_parameters) >>> print(result.keys()) Instance Methods ---------- ImageProperties has the following methods: \n +-----------------------------------------------------+------------------------------------------------+ | Method | Description | +=====================================================+================================================+ | :meth:`~image_properties_epl_shear` | Compute image properties for lensed events | +-----------------------------------------------------+------------------------------------------------+ | :meth:`~get_lensed_snrs` | Compute detection probability for lensed images| +-----------------------------------------------------+------------------------------------------------+ Instance Attributes ---------- ImageProperties has the following attributes: \n +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | Attribute | Type | Unit | Description | +=====================================================+===========================+==========+==================================================================+ | :attr:`~npool` | ``int`` | | Number of multiprocessing workers | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~multiprocessing_verbose` | ``bool`` | | If True, shows a progress bar for multiprocessing tasks | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~n_min_images` | ``int`` | | Minimum number of images required | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~n_max_images` | ``int`` | | Maximum number of images per event | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~time_window` | ``float`` | s | Time window for lensed events | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~include_effective_parameters` | ``bool`` | | To include effective parameters in output | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~include_redundant_parameters` | ``bool`` | | If True, removes redundant parameters from output to save memory | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~lens_model_list` | ``list`` | | List of lens models | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~cosmo` | ``astropy.cosmology`` | | Cosmology for calculations | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~spin_zero` | ``bool`` | | Flag for zero spin assumption | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~spin_precession` | ``bool`` | | Flag for spin precession | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~pdet_finder` | ``callable`` | | Probability of detection calculator | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ | :attr:`~pdet_finder_output_keys` | ``list`` | | Keys for probability of detection outputs | +-----------------------------------------------------+---------------------------+----------+------------------------------------------------------------------+ """ def __init__( self, npool=4, n_min_images=2, n_max_images=4, lens_model_list=["EPL_NUMBA", "SHEAR"], image_properties_function="image_properties_epl_shear", image_properties_function_params=None, cosmology=None, time_window=365 * 24 * 3600 * 2, # 2 years spin_zero=True, spin_precession=False, pdet_finder=None, include_effective_parameters=False, multiprocessing_verbose=True, include_redundant_parameters=False, ): self.npool = npool self.n_min_images = n_min_images self.n_max_images = n_max_images self.lens_model_list = lens_model_list # list of lens models
[docs] self.image_properties_function = getattr(self, image_properties_function)
self.spin_zero = spin_zero self.spin_precession = spin_precession
[docs] self.multiprocessing_verbose = multiprocessing_verbose
self.time_window = time_window self.cosmo = ( cosmology if cosmology else LambdaCDM( H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0 ) ) self.pdet_finder = pdet_finder self.pdet_finder_output_keys = None self.include_effective_parameters = include_effective_parameters
[docs] self.include_redundant_parameters = include_redundant_parameters
[docs] self.image_properties_function_params = self.available_image_properties_functions[image_properties_function]
if image_properties_function_params is not None: self.image_properties_function_params.update(image_properties_function_params) if image_properties_function == "image_properties_epl_shear_njit": from .epl_shear_njit import create_epl_shear_solver print("initializing epl shear njit solver") self.epl_solver = create_epl_shear_solver( max_img=self.n_max_images, num_th=self.image_properties_function_params["num_th"], maginf=self.image_properties_function_params["maginf"], max_tries=self.image_properties_function_params["max_tries"], alpha_scaling=self.image_properties_function_params["alpha_scaling"], magnification_limit=self.image_properties_function_params["magnification_limit"], Nmeas=self.image_properties_function_params["Nmeas"], Nmeas_extra=self.image_properties_function_params["Nmeas_extra"], )
[docs] def image_properties_epl_shear_njit(self, lens_parameters): """ Compute image properties for strongly lensed events. This use functions similar to lenstronomy but rewritten in numba njit for speed. Solves the lens equation using multiprocessing to find image positions, magnifications, time delays, and image types for each lensing event. Parameters ---------- lens_parameters : ``dict`` Dictionary containing lens and source parameters shown in the table: \n +------------------------------+-----------+-------------------------------------------------------+ | Parameter | Units | Description | +==============================+===========+=======================================================+ | zl | | redshift of the lens | +------------------------------+-----------+-------------------------------------------------------+ | zs | | redshift of the source | +------------------------------+-----------+-------------------------------------------------------+ | sigma | km s^-1 | velocity dispersion | +------------------------------+-----------+-------------------------------------------------------+ | q | | axis ratio | +------------------------------+-----------+-------------------------------------------------------+ | theta_E | radian | Einstein radius | +------------------------------+-----------+-------------------------------------------------------+ | phi | rad | axis rotation angle. counter-clockwise from the | | | | positive x-axis (RA-like axis) to the major axis of | | | | the projected mass distribution. | +------------------------------+-----------+-------------------------------------------------------+ | gamma | | density profile slope of EPL galaxy | +------------------------------+-----------+-------------------------------------------------------+ | gamma1 | | external shear component in the x-direction | | | | (RA-like axis) | +------------------------------+-----------+-------------------------------------------------------+ | gamma2 | | external shear component in the y-direction | | | | (Dec-like axis) | +------------------------------+-----------+-------------------------------------------------------+ Returns ------- lens_parameters : ``dict`` Updated dictionary with additional image properties with the following description: \n +------------------------------+-----------+-------------------------------------------------------+ | x0_image_positions | radian | x-coordinate (RA-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | x1_image_positions | radian | y-coordinate (Dec-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | magnifications | | magnifications | +------------------------------+-----------+-------------------------------------------------------+ | time_delays | | time delays | +------------------------------+-----------+-------------------------------------------------------+ | image_type | | image type | +------------------------------+-----------+-------------------------------------------------------+ | n_images | | number of images | +------------------------------+-----------+-------------------------------------------------------+ | x_source | radian | x-coordinate (RA-like axis) of the source | +------------------------------+-----------+-------------------------------------------------------+ | y_source | radian | y-coordinate (Dec-like axis) of the source | +------------------------------+-----------+-------------------------------------------------------+ """ zs = lens_parameters["zs"] zl = lens_parameters["zl"] gamma1, gamma2 = lens_parameters["gamma1"], lens_parameters["gamma2"] phi = lens_parameters["phi"] q = lens_parameters["q"] gamma = lens_parameters["gamma"] theta_E = lens_parameters["theta_E"] if not self.include_redundant_parameters: del lens_parameters["theta_E"] # time-delay distance Ds = self.angular_diameter_distance.function(zs) Dl = self.angular_diameter_distance.function(zl) Dls = self.angular_diameter_distance_z1z2.function(zl, zs) D_dt = (1.0 + zl) * (Dl * Ds / Dls) * Mpc_to_m # in meters # 0.beta_x_arr, 1.beta_y_arr, 2.x_img, 3.y_img, 4.mu_arr, 5.tau_arr, 6.nimg, 7.itype from numba import set_num_threads set_num_threads(self.npool) # match Numba threads to npool result = self.epl_solver(theta_E, D_dt, q, phi, gamma, gamma1, gamma2) # time-delays: convert to positive values # time-delays will be relative to the first arrived signal of an lensed event time_delays= result[5] time_delays = ( time_delays - np.array([np.sort(time_delays, axis=1)[:, 0]]).T ) # this is alright if time delays are already sorted # Return a dictionary with all of the lens information but also the BBH parameters from gw_param image_parameters = { "x0_image_positions": result[2], "x1_image_positions": result[3], "magnifications": result[4], "time_delays": time_delays, "image_type": result[7], } idx_sort = np.argsort(time_delays, axis=1) # sort each row # idx_sort has the shape (size, n_max_images) for key, value in image_parameters.items(): image_parameters[key] = np.take_along_axis(value, idx_sort, axis=1) lens_parameters.update(image_parameters) lens_parameters["n_images"] = result[6] lens_parameters["x_source"] = result[0] lens_parameters["y_source"] = result[1] return lens_parameters
[docs] def image_properties_epl_shear(self, lens_parameters): """ Compute image properties for strongly lensed events. This use functions from lenstronomy. Solves the lens equation using multiprocessing to find image positions, magnifications, time delays, and image types for each lensing event. Parameters ---------- lens_parameters : ``dict`` Dictionary containing lens and source parameters shown in the table: \n +------------------------------+-----------+-------------------------------------------------------+ | Parameter | Units | Description | +==============================+===========+=======================================================+ | zl | | redshift of the lens | +------------------------------+-----------+-------------------------------------------------------+ | zs | | redshift of the source | +------------------------------+-----------+-------------------------------------------------------+ | sigma | km s^-1 | velocity dispersion | +------------------------------+-----------+-------------------------------------------------------+ | q | | axis ratio | +------------------------------+-----------+-------------------------------------------------------+ | theta_E | radian | Einstein radius | +------------------------------+-----------+-------------------------------------------------------+ | phi | rad | axis rotation angle. counter-clockwise from the | | | | positive x-axis (RA-like axis) to the major axis of | | | | the projected mass distribution. | +------------------------------+-----------+-------------------------------------------------------+ | gamma | | density profile slope of EPL galaxy | +------------------------------+-----------+-------------------------------------------------------+ | gamma1 | | external shear component in the x-direction | | | | (RA-like axis) | +------------------------------+-----------+-------------------------------------------------------+ | gamma2 | | external shear component in the y-direction | | | | (Dec-like axis) | +------------------------------+-----------+-------------------------------------------------------+ Returns ------- lens_parameters : ``dict`` Updated dictionary with additional image properties with the following description: \n +------------------------------+-----------+-------------------------------------------------------+ | x0_image_positions | radian | x-coordinate (RA-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | x1_image_positions | radian | y-coordinate (Dec-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | magnifications | | magnifications | +------------------------------+-----------+-------------------------------------------------------+ | time_delays | | time delays | +------------------------------+-----------+-------------------------------------------------------+ | image_type | | image type | +------------------------------+-----------+-------------------------------------------------------+ | n_images | | number of images | +------------------------------+-----------+-------------------------------------------------------+ | x_source | radian | x-coordinate (RA-like axis) of the source | +------------------------------+-----------+-------------------------------------------------------+ | y_source | radian | y-coordinate (Dec-like axis) of the source | +------------------------------+-----------+-------------------------------------------------------+ """ zs = lens_parameters["zs"] size = len(zs) zl = lens_parameters["zl"] # external shear params to the 'PEMD' galaxy lens gamma1, gamma2 = lens_parameters["gamma1"], lens_parameters["gamma2"] # ellipticity of the galaxy lens phi = lens_parameters["phi"] q = lens_parameters["q"] e1, e2 = phi_q2_ellipticity(phi, q) gamma = lens_parameters["gamma"] einstein_radius = lens_parameters["theta_E"] if not self.include_redundant_parameters: del lens_parameters["theta_E"] # get image properties (with Multiprocessing) iterations = np.arange(size) input_arguments = [ ( e1_i, e2_i, gamma_i, gamma1_i, gamma2_i, zl_i, zs_i, einstein_radius_i, iterations_i, ) for ( e1_i, e2_i, gamma_i, gamma1_i, gamma2_i, zl_i, zs_i, einstein_radius_i, iterations_i, ) in zip(e1, e2, gamma, gamma1, gamma2, zl, zs, einstein_radius, iterations) ] # For output # Initialize the image positions and lens argument list here. x0_image_positions = np.ones((size, self.n_max_images)) * np.nan x1_image_positions = np.ones((size, self.n_max_images)) * np.nan magnifications = np.ones((size, self.n_max_images)) * np.nan time_delays = np.ones((size, self.n_max_images)) * np.nan determinants = np.ones((size, self.n_max_images)) * np.nan traces = np.ones((size, self.n_max_images)) * np.nan n_images = np.ones(size, dtype=int) x_source, y_source = np.ones(size) * np.nan, np.ones(size) * np.nan # Solve the lens equation print("solving lens equations...") # if self.n_min_images == 2: # solve_lens_equation_ = solve_lens_equation # elif self.n_min_images > 2: # print("n_min_images > 2 is not supported yet") # raise NotImplementedError # else: # raise ValueError("n_min_images should be greater than 1") self._multiprocessing_error() with mp.Pool( processes=self.npool, initializer=_init_worker_multiprocessing, initargs=( self.n_min_images, self.lens_model_list, self.cosmo, ), ) as pool: if self.multiprocessing_verbose: for result in tqdm( pool.imap_unordered(solve_lens_equation, input_arguments), total=len(input_arguments), ncols=100, ): ( x_source_i, y_source_i, x0_image_position_i, x1_image_position_i, magnifications_i, time_delays_i, n_image_i, determinant_i, trace_i, iter_i, ) = result n_image_i = min(n_image_i, self.n_max_images) n_images[iter_i] = n_image_i x0_image_position = np.ones(self.n_max_images) * np.nan x1_image_position = np.ones(self.n_max_images) * np.nan x0_image_position[:n_image_i] = x0_image_position_i[:n_image_i] x1_image_position[:n_image_i] = x1_image_position_i[:n_image_i] x0_image_positions[iter_i] = ( x0_image_position # shape = (size, n_max_images) ) x1_image_positions[iter_i] = ( x1_image_position # shape = (size, n_max_images) ) magnification = np.ones(self.n_max_images) * np.nan time_delay = np.ones(self.n_max_images) * np.nan determinant = np.ones(self.n_max_images) * np.nan trace = np.ones(self.n_max_images) * np.nan magnification[:n_image_i] = magnifications_i[:n_image_i] time_delay[:n_image_i] = time_delays_i[:n_image_i] determinant[:n_image_i] = determinant_i[:n_image_i] trace[:n_image_i] = trace_i[:n_image_i] # Add the magnifications, time delays, determinants, and traces to their respective arrays magnifications[iter_i] = magnification time_delays[iter_i] = time_delay determinants[iter_i] = determinant traces[iter_i] = trace x_source[iter_i] = x_source_i y_source[iter_i] = y_source_i else: for result in pool.map(solve_lens_equation, input_arguments): ( x_source_i, y_source_i, x0_image_position_i, x1_image_position_i, magnifications_i, time_delays_i, n_image_i, determinant_i, trace_i, iter_i, ) = result n_image_i = min(n_image_i, self.n_max_images) n_images[iter_i] = n_image_i x0_image_position = np.ones(self.n_max_images) * np.nan x1_image_position = np.ones(self.n_max_images) * np.nan x0_image_position[:n_image_i] = x0_image_position_i[:n_image_i] x1_image_position[:n_image_i] = x1_image_position_i[:n_image_i] x0_image_positions[iter_i] = ( x0_image_position # shape = (size, n_max_images) ) x1_image_positions[iter_i] = ( x1_image_position # shape = (size, n_max_images) ) magnification = np.ones(self.n_max_images) * np.nan time_delay = np.ones(self.n_max_images) * np.nan determinant = np.ones(self.n_max_images) * np.nan trace = np.ones(self.n_max_images) * np.nan magnification[:n_image_i] = magnifications_i[:n_image_i] time_delay[:n_image_i] = time_delays_i[:n_image_i] determinant[:n_image_i] = determinant_i[:n_image_i] trace[:n_image_i] = trace_i[:n_image_i] # Add the magnifications, time delays, determinants, and traces to their respective arrays magnifications[iter_i] = magnification time_delays[iter_i] = time_delay determinants[iter_i] = determinant traces[iter_i] = trace x_source[iter_i] = x_source_i y_source[iter_i] = y_source_i # time-delays: convert to positive values # time-delays will be relative to the first arrived signal of an lensed event time_delays = ( time_delays - np.array([np.sort(time_delays, axis=1)[:, 0]]).T ) # this is alright if time delays are already sorted # image type classification (morse phase) image_type = np.zeros((size, self.n_max_images)) for i in range(size): for j in range(self.n_max_images): if determinants[i, j] < 0: image_type[i, j] = 2 elif traces[i, j] > 0: image_type[i, j] = 1 elif traces[i, j] < 0: image_type[i, j] = 3 else: image_type[i, j] = np.nan # Return a dictionary with all of the lens information but also the BBH parameters from gw_param image_parameters = { "x0_image_positions": x0_image_positions, "x1_image_positions": x1_image_positions, "magnifications": magnifications, "time_delays": time_delays, "image_type": image_type, } # sorting wrt time delays idx_sort = np.argsort(time_delays, axis=1) # sort each row # idx_sort has the shape (size, n_max_images) for key, value in image_parameters.items(): # sort each row image_parameters[key] = np.take_along_axis(value, idx_sort, axis=1) lens_parameters.update(image_parameters) lens_parameters["n_images"] = n_images lens_parameters["x_source"] = x_source lens_parameters["y_source"] = y_source return lens_parameters
def _multiprocessing_error(self): """ Check multiprocessing guard and raise error if not in main process. Raises ------ RuntimeError If called from a subprocess instead of the main process, indicating the code is not properly wrapped in ``if __name__ == "__main__":``. """ # to access multi-cores instead of multithreading if mp.current_process().name != "MainProcess": print( "\n\n[ERROR] This multiprocessing code must be run under 'if __name__ == \"__main__\":'.\n" "Please wrap your script entry point in this guard.\n" "See: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming\n" ) raise RuntimeError( "\nMultiprocessing code must be run under 'if __name__ == \"__main__\":'.\n\n" )
[docs] def get_lensed_snrs( self, lensed_param, pdet_finder=None, include_effective_parameters=False ): """ Compute detection probability for each lensed image. Calculates the effective luminosity distance, geocent time, and phase for each image accounting for magnification and morse phase, then computes detection probabilities using the provided calculator. Parameters ---------- lensed_param : ``dict`` Dictionary containing lensed source and image parameters given below: \n +------------------------------+-----------+-------------------------------------------------------+ | Parameter | Units | Description | +==============================+===========+=======================================================+ | geocent_time | s | geocent time | +------------------------------+-----------+-------------------------------------------------------+ | ra | rad | right ascension | +------------------------------+-----------+-------------------------------------------------------+ | dec | rad | declination | +------------------------------+-----------+-------------------------------------------------------+ | phase | rad | phase of GW at reference freq | +------------------------------+-----------+-------------------------------------------------------+ | psi | rad | polarization angle | +------------------------------+-----------+-------------------------------------------------------+ | theta_jn | rad | inclination angle | +------------------------------+-----------+-------------------------------------------------------+ | a_1 | | spin of the primary compact binary | +------------------------------+-----------+-------------------------------------------------------+ | a_2 | | spin of the secondary compact binary | +------------------------------+-----------+-------------------------------------------------------+ | luminosity_distance | Mpc | luminosity distance of the source | +------------------------------+-----------+-------------------------------------------------------+ | mass_1 | Msun | mass of the primary compact binary (detector frame) | +------------------------------+-----------+-------------------------------------------------------+ | mass_2 | Msun | mass of the secondary compact binary (detector frame) | +------------------------------+-----------+-------------------------------------------------------+ | x0_image_positions | radian | x-coordinate (RA-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | x1_image_positions | radian | y-coordinate (Dec-like axis) of the images | +------------------------------+-----------+-------------------------------------------------------+ | magnifications | | magnifications | +------------------------------+-----------+-------------------------------------------------------+ | time_delays | | time delays | +------------------------------+-----------+-------------------------------------------------------+ | image_type | | image type | +------------------------------+-----------+-------------------------------------------------------+ pdet_finder : ``callable`` Function that computes detection probability given GW parameters. include_effective_parameters : ``bool`` If True, includes effective parameters in output lensed_param. \n Returns ------- result_dict : ``dict`` Dictionary containing: \n - 'pdet_net': network detection probability (shape: size x n_max_images) \n - Individual detector probabilities if pdet_finder outputs them \n lensed_param : ``dict`` Updated dictionary with effective parameters shown below: \n +----------------------------------+-----------+------------------------------------------------+ | Parameter | Units | Description | +==================================+===========+================================================+ | effective_luminosity_distance | Mpc | magnification-corrected distance | | | | luminosity_distance / sqrt(|magnifications_i|) | +----------------------------------+-----------+------------------------------------------------| | effective_geocent_time | s | time-delay-corrected GPS time | | | | geocent_time + time_delays_i | +----------------------------------+-----------+------------------------------------------------| | effective_phase | rad | morse-phase-corrected phase | | | | phi - morse_phase_i | +----------------------------------+-----------+------------------------------------------------+ | effective_ra | rad | RA of the image | | | | ra + (x0_image_positions_i - x_source)/cos(dec)| +----------------------------------+-----------+------------------------------------------------+ | effective_dec | rad | Dec of the image | | | | dec + (x1_image_positions_i - y_source) | +----------------------------------+-----------+------------------------------------------------+ """ include_effective_parameters = ( include_effective_parameters or self.include_effective_parameters ) if not self.include_redundant_parameters: # remove redundant parameters to save memory del ( lensed_param["n_images"], lensed_param["mass_1_source"], lensed_param["mass_2_source"], ) # needed to calculate effective luminosity distance and effective time delay magnifications = lensed_param["magnifications"] time_delays = lensed_param["time_delays"] image_type = lensed_param[ "image_type" ].copy() # copy to avoid modifying original x_source = lensed_param["x_source"] y_source = lensed_param["y_source"] x0_image_positions = lensed_param["x0_image_positions"] x1_image_positions = lensed_param["x1_image_positions"] size = len(magnifications) # image type to morse phase image_type[image_type == 1.0] = 0.0 image_type[image_type == 2.0] = np.pi / 2 image_type[image_type == 3.0] = np.pi # Get the binary parameters ( mass_1, mass_2, luminosity_distance, theta_jn, psi, ra, dec, phase, geocent_time, a_1, a_2, tilt_1, tilt_2, phi_12, phi_jl, ) = ( lensed_param["mass_1"], lensed_param["mass_2"], lensed_param["luminosity_distance"], lensed_param["theta_jn"], lensed_param["psi"], lensed_param["ra"], lensed_param["dec"], lensed_param["phase"], lensed_param["geocent_time"], np.zeros(size), np.zeros(size), np.zeros(size), np.zeros(size), np.zeros(size), np.zeros(size), ) if not self.spin_zero: a_1, a_2 = (lensed_param["a_1"], lensed_param["a_2"]) if self.spin_precession: tilt_1, tilt_2, phi_12, phi_jl = ( lensed_param["tilt_1"], lensed_param["tilt_2"], lensed_param["phi_12"], lensed_param["phi_jl"], ) # checking pdet_finder output keys result_dict = self._check_pdet_finder_output_keys( size, lensed_param, pdet_finder ) if not self.include_redundant_parameters: del lensed_param["luminosity_distance"] # Get the optimal signal to noise ratios for each image # iterate over the image type (column) geocent_time_min = np.min(geocent_time) geocent_time_max = geocent_time_min + self.time_window pdet_finder_output_keys = ( self.pdet_finder_output_keys.copy() ) # copy to avoid modifying original for i in range(self.n_max_images): # get the effective time for each image type effective_geocent_time = geocent_time + time_delays[:, i] # choose only the events that are within the time range and also not nan idx = (effective_geocent_time <= geocent_time_max) & ( effective_geocent_time >= geocent_time_min ) # get the effective luminosity distance for each image type effective_luminosity_distance = luminosity_distance / np.sqrt( np.abs(magnifications[:, i]) ) # get the effective phase for each image type # morse phase correction effective_phase = phase - image_type[:, i] # get the effective sky location for each image type # flat sky location assumption effective_dec = dec + (x1_image_positions[:, i] - y_source) effective_ra = ra + (x0_image_positions[:, i] - x_source) / np.cos(dec) # check for nan values idx = ( idx & ~np.isnan(effective_luminosity_distance) & ~np.isnan(effective_geocent_time) & ~np.isnan(effective_phase) & ~np.isnan(effective_ra) & ~np.isnan(effective_dec) ) # Each image has their own effective luminosity distance and effective geocent time if sum(idx) != 0: # Returns a dictionary pdet = pdet_finder( gw_param_dict=dict( mass_1=mass_1[idx], mass_2=mass_2[idx], luminosity_distance=effective_luminosity_distance[idx], theta_jn=theta_jn[idx], psi=psi[idx], phase=effective_phase[idx], geocent_time=effective_geocent_time[idx], ra=effective_ra[idx], dec=effective_dec[idx], a_1=a_1[idx], a_2=a_2[idx], tilt_1=tilt_1[idx], tilt_2=tilt_2[idx], phi_12=phi_12[idx], phi_jl=phi_jl[idx], ), ) for keys in pdet_finder_output_keys: result_dict[keys][idx, i] = pdet[keys] if include_effective_parameters: lensed_param = self.produce_effective_params(lensed_param) return result_dict, lensed_param
[docs] def recover_redundant_parameters(self, lensed_param): """ Recover redundant parameters in lensed_param, i.e. theta_E, n_images, mass_1, mass_2, luminosity_distance. """ zs = lensed_param["zs"] # find theta_E if "theta_E" not in lensed_param: lensed_param["theta_E"] = self.compute_einstein_radii( lensed_param["sigma"], lensed_param["zl"], zs ) else: print("theta_E is already in lensed_param, skipping computation of theta_E") if "n_images" not in lensed_param: x0_image_positions = lensed_param["x0_image_positions"] n_images = np.sum(~np.isnan(x0_image_positions), axis=1) lensed_param["n_images"] = n_images.astype(int) else: print( "n_images is already in lensed_param, skipping computation of n_images" ) if "mass_1_source" not in lensed_param or "mass_2_source" not in lensed_param: lensed_param["mass_1_source"] = lensed_param["mass_1"] / (1 + zs) lensed_param["mass_2_source"] = lensed_param["mass_2"] / (1 + zs) else: print( "mass_1_source and mass_2_source are already in lensed_param, skipping computation of source frame masses" ) if "luminosity_distance" not in lensed_param: lensed_param["luminosity_distance"] = self.luminosity_distance(zs) else: print( "luminosity_distance is already in lensed_param, skipping computation of luminosity_distance" ) return lensed_param
[docs] def produce_effective_params(self, lensed_param): """ Produce effective parameters for each lensed image. Calculates the effective luminosity distance, geocent time, phase, RA, and Dec for each image accounting for magnification and morse phase. Parameters ---------- lensed_param : ``dict`` Dictionary containing lensed source and image parameters. Returns ------- lensed_param : ``dict`` Updated dictionary with effective parameters shown below: \n +----------------------------------+-----------+------------------------------------------------+ | Parameter | Units | Description | +==================================+===========+================================================+ | effective_luminosity_distance | Mpc | magnification-corrected distance | | | | luminosity_distance / sqrt(|magnifications_i|) | +----------------------------------+-----------+------------------------------------------------| | effective_geocent_time | s | time-delay-corrected GPS time | | | | geocent_time + time_delays_i | +----------------------------------+-----------+------------------------------------------------| | effective_phase | rad | morse-phase-corrected phase | | | | phi - morse_phase_i | +----------------------------------+-----------+------------------------------------------------+ | effective_ra | rad | RA of the image | | | | ra + (x0_image_positions_i - x_source)/cos(dec)| +----------------------------------+-----------+------------------------------------------------+ | effective_dec | rad | Dec of the image | | | | dec + (x1_image_positions_i - y_source) | +----------------------------------+-----------+------------------------------------------------+ """ # needed to calculate effective luminosity distance and effective time delay magnifications = lensed_param["magnifications"] time_delays = lensed_param["time_delays"] image_type = lensed_param[ "image_type" ].copy() # copy to avoid modifying original x_source = lensed_param["x_source"] y_source = lensed_param["y_source"] x0_image_positions = lensed_param["x0_image_positions"] x1_image_positions = lensed_param["x1_image_positions"] ra = lensed_param["ra"] dec = lensed_param["dec"] # Update lensed_param with effective values only if they weren't already present lensed_param["effective_luminosity_distance"] = lensed_param[ "luminosity_distance" ][:, np.newaxis] / np.sqrt(np.abs(magnifications)) lensed_param["effective_geocent_time"] = ( lensed_param["geocent_time"][:, np.newaxis] + time_delays ) # image type to morse phase image_type[image_type == 1.0] = 0.0 image_type[image_type == 2.0] = np.pi / 2.0 image_type[image_type == 3.0] = np.pi lensed_param["effective_phase"] = ( lensed_param["phase"][:, np.newaxis] - image_type ) # flat sky location assumption lensed_param["effective_ra"] = ra[:, np.newaxis] + ( x0_image_positions - x_source[:, np.newaxis] ) / np.cos(dec[:, np.newaxis]) lensed_param["effective_dec"] = dec[:, np.newaxis] + ( x1_image_positions - y_source[:, np.newaxis] ) return lensed_param
def _check_pdet_finder_output_keys(self, size, lensed_param, pdet_finder): """ Helper function to check and set pdet_finder output keys. Parameters ---------- size : ``int`` Number of lensed events. lensed_param : ``dict`` Dictionary containing lensed source and image parameters. pdet_finder : ``callable`` Function that computes detection probability given GW parameters. Returns ------- result_dict : ``dict`` Initialized dictionary to hold pdet_finder outputs. """ # setting up result dictionary result_dict = dict() # checking pdet_finder self.pdet_finder = pdet_finder if pdet_finder else self.pdet_finder if self.pdet_finder is None: raise ValueError("pdet_finder function must be provided") if self.pdet_finder_output_keys is None: # setting up pdet dictionary # get the keys of the output dictionary of pdet_finder gw_param_dict = dict( mass_1=lensed_param["mass_1"][:1], mass_2=lensed_param["mass_2"][:1], luminosity_distance=lensed_param["luminosity_distance"][:1], theta_jn=lensed_param["theta_jn"][:1], psi=lensed_param["psi"][:1], phase=lensed_param["phase"][:1], geocent_time=lensed_param["geocent_time"][:1], ra=lensed_param["ra"][:1], dec=lensed_param["dec"][:1], ) if not self.spin_zero: gw_param_dict["a_1"] = lensed_param["a_1"][:1] gw_param_dict["a_2"] = lensed_param["a_2"][:1] if self.spin_precession: gw_param_dict["tilt_1"] = lensed_param["tilt_1"][:1] gw_param_dict["tilt_2"] = lensed_param["tilt_2"][:1] gw_param_dict["phi_12"] = lensed_param["phi_12"][:1] gw_param_dict["phi_jl"] = lensed_param["phi_jl"][:1] pdet = pdet_finder(gw_param_dict=gw_param_dict) if not isinstance(pdet, dict) or "pdet_net" not in pdet: raise ValueError( "pdet_finder must return a dictionary with key 'pdet_net'" ) else: self.pdet_finder_output_keys = list(pdet.keys()) for keys in pdet.keys(): result_dict[keys] = np.ones((size, self.n_max_images)) * np.nan else: for keys in self.pdet_finder_output_keys: result_dict[keys] = np.ones((size, self.n_max_images)) * np.nan return result_dict # ------------- # Properties # ------------- @property
[docs] def npool(self): """ Number of multiprocessing workers. Returns ------- npool : ``int`` Number of processes for multiprocessing. \n default: 4 """ return self._npool
@npool.setter def npool(self, value): self._npool = value @property
[docs] def n_min_images(self): """ Minimum number of images required for a valid lensing event. Returns ------- n_min_images : ``int`` Minimum number of images required. \n default: 2 """ return self._n_min_images
@n_min_images.setter def n_min_images(self, value): self._n_min_images = value @property
[docs] def n_max_images(self): """ Maximum number of images per event. Returns ------- n_max_images : ``int`` Maximum number of images to consider per event. \n default: 4 """ return self._n_max_images
@n_max_images.setter def n_max_images(self, value): self._n_max_images = value @property
[docs] def time_window(self): """ Time window for lensed events. Returns ------- time_window : ``float`` Time window for lensed events (units: s). \n default: 365*24*3600*20 (20 years) """ return self._time_window
@time_window.setter def time_window(self, value): self._time_window = value @property
[docs] def include_effective_parameters(self): """ Flag to include effective parameters in output. Returns ------- include_effective_parameters : ``bool`` Whether to include effective parameters in the output of get_lensed_snrs. \n default: False """ return self._include_effective_parameters
@include_effective_parameters.setter def include_effective_parameters(self, value): self._include_effective_parameters = value @property
[docs] def lens_model_list(self): """ List of lens models to use. Returns ------- lens_model_list : ``list`` List of lens model names. \n default: ['EPL_NUMBA', 'SHEAR'] """ return self._lens_model_list
@lens_model_list.setter def lens_model_list(self, value): self._lens_model_list = value @property
[docs] def cosmo(self): """ Astropy cosmology object for calculations. Returns ------- cosmo : ``astropy.cosmology`` Cosmology used for distance calculations. \n default: LambdaCDM(H0=70, Om0=0.3, Ode0=0.7, Tcmb0=0.0, Neff=3.04, m_nu=None, Ob0=0.0) """ return self._cosmo
@cosmo.setter def cosmo(self, value): self._cosmo = value @property
[docs] def spin_zero(self): """ Flag for zero spin assumption. Returns ------- spin_zero : ``bool`` Whether to assume zero spin for compact objects. \n If True, spin parameters are set to zero (no spin sampling). \n If False, spin parameters are sampled. \n default: False """ return self._spin_zero
@spin_zero.setter def spin_zero(self, value): self._spin_zero = value @property
[docs] def spin_precession(self): """ Flag for spin precession. Returns ------- spin_precession : ``bool`` Whether to include spin precession effects. \n If True (and spin_zero=False), sample precessing spin parameters. \n If False (and spin_zero=False), sample aligned/anti-aligned spins. \n default: False """ return self._spin_precession
@spin_precession.setter def spin_precession(self, value): self._spin_precession = value @property
[docs] def pdet_finder(self): """ Detection probability finder function. Returns ------- pdet_finder : ``callable`` Function that calculates detection probability for GW events. \n The function signature should be: \n ``pdet_finder(gw_param_dict) -> dict`` with key 'pdet_net'. """ return self._pdet_finder
@pdet_finder.setter def pdet_finder(self, value): self._pdet_finder = value @property
[docs] def pdet_finder_output_keys(self): """ Output keys from the detection probability finder function. Returns ------- pdet_finder_output_keys : ``list`` List of keys returned by the pdet_finder function. \n default: None """ return self._pdet_finder_output_keys
@pdet_finder_output_keys.setter def pdet_finder_output_keys(self, value): self._pdet_finder_output_keys = value @property
[docs] def available_image_properties_functions(self): """ Dictionary of available functions for computing image properties. Returns ------- available_image_properties_functions : ``dict`` Dictionary with function names and default parameters. """ self._available_image_properties_functions = dict( image_properties_epl_shear_njit=dict( max_img=self.n_max_images, num_th=500, maginf=-100.0, max_tries=100, alpha_scaling=1.0, magnification_limit=0.01, Nmeas=400, Nmeas_extra=80, ), image_properties_epl_shear=None, ) return self._available_image_properties_functions