# -*- coding: utf-8 -*-
"""
This module contains helper routines for other modules in the ler package.
"""
import os
import h5py
import numpy as np
import json
from scipy.interpolate import interp1d
from scipy.interpolate import CubicSpline
from scipy.integrate import quad
from numba import njit
from importlib import resources
# import datetime
from numba.core.registry import CPUDispatcher
[docs]
def is_njitted(func):
return isinstance(func, CPUDispatcher)
[docs]
class NumpyEncoder(json.JSONEncoder):
"""
Class for storing a numpy.ndarray or any nested-list composition as JSON file. This is required for dealing np.nan and np.inf.
Parameters
----------
json.JSONEncoder : `class`
class for encoding JSON file
Returns
----------
json.JSONEncoder.default : `function`
function for encoding JSON file
Example
----------
>>> import numpy as np
>>> import json
>>> from ler import helperroutines as hr
>>> # create a dictionary
>>> param = {'a': np.array([1,2,3]), 'b': np.array([4,5,6])}
>>> # save the dictionary as json file
>>> with open('param.json', 'w') as f:
>>> json.dump(param, f, cls=hr.NumpyEncoder)
>>> # load the dictionary from json file
>>> with open('param.json', 'r') as f:
>>> param = json.load(f)
>>> # print the dictionary
>>> print(param)
{'a': [1, 2, 3], 'b': [4, 5, 6]}
"""
[docs]
def default(self, obj):
"""function for encoding JSON file"""
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
[docs]
def remove_file(file_name):
"""Remove a file."""
print(f"removing {file_name} if it exists")
if os.path.exists(file_name):
os.remove(file_name)
[docs]
def load_pickle(file_name):
"""Load a pickle file.
Parameters
----------
file_name : `str`
pickle file name for storing the parameters.
Returns
----------
param : `dict`
Note
----
Uses dill instead of pickle to support serialization of local functions,
lambdas, and closures.
"""
import dill
with open(file_name, "rb") as handle:
param = dill.load(handle)
return param
[docs]
def save_pickle(file_name, param):
"""Save a dictionary as a pickle file.
Parameters
----------
file_name : `str`
pickle file name for storing the parameters.
param : `dict`
dictionary to be saved as a pickle file.
Note
----
Uses dill instead of pickle to support serialization of local functions,
lambdas, and closures.
"""
import dill
with open(file_name, "wb") as handle:
dill.dump(param, handle, protocol=dill.HIGHEST_PROTOCOL)
# hdf5
[docs]
def load_hdf5(file_name):
"""Load a hdf5 file.
Parameters
----------
file_name : `str`
hdf5 file name for storing the parameters.
Returns
----------
param : `dict`
"""
return h5py.File(file_name, "r")
[docs]
def save_hdf5(file_name, param):
"""Save a dictionary as a hdf5 file.
Parameters
----------
file_name : `str`
hdf5 file name for storing the parameters.
param : `dict`
dictionary to be saved as a hdf5 file.
"""
with h5py.File(file_name, "w") as f:
for key, value in param.items():
f.create_dataset(key, data=value)
[docs]
def load_json(file_name):
"""Load a json file.
Parameters
----------
file_name : `str`
json file name for storing the parameters.
Returns
----------
param : `dict`
"""
with open(file_name, "r", encoding="utf-8") as f:
param = json.load(f)
return param
[docs]
def save_json(file_name, param):
"""Save a dictionary as a json file.
Parameters
----------
file_name : `str`
json file name for storing the parameters.
param : `dict`
dictionary to be saved as a json file.
Example
----------
>>> from ler.utils import save_json, load_json
>>> param = dict_list = [{"name": "lens_mass", "type": "cubic_spline", "x": np.array([0, 1, 2, 3]), "y": [0, 1, 0, 1]},]
>>> save_json("param.json", param)
>>> param = load_json("param.json")
"""
try:
with open(file_name, "w", encoding="utf-8") as write_file:
json.dump(param, write_file)
except:
with open(file_name, "w", encoding="utf-8") as write_file:
json.dump(param, write_file, indent=4, cls=NumpyEncoder)
[docs]
def append_json(file_name, new_dictionary, old_dictionary=None, replace=False):
"""
Append (values with corresponding keys) and update a json file with a dictionary. There are four options:
1. If old_dictionary is provided, the values of the new dictionary will be appended to the old dictionary and save in the 'file_name' json file.
2. If replace is True, replace the json file (with the 'file_name') content with the new_dictionary.
3. If the file does not exist, create a new one with the new_dictionary.
4. If none of the above, append the new dictionary to the content of the json file.
Parameters
----------
file_name : `str`
json file name for storing the parameters.
new_dictionary : `dict`
dictionary to be appended to the json file.
old_dictionary : `dict`, optional
If provided the values of the new dictionary will be appended to the old dictionary and save in the 'file_name' json file.
Default is None.
replace : `bool`, optional
If True, replace the json file with the dictionary. Default is False.
"""
# check if the file exists
# time
# start = datetime.datetime.now()
if old_dictionary:
data = old_dictionary
elif replace:
data = new_dictionary
elif not os.path.exists(file_name):
# print(f" {file_name} file does not exist. Creating a new one...")
replace = True
data = new_dictionary
else:
# print("getting data from file")
with open(file_name, "r", encoding="utf-8") as f:
data = json.load(f)
# end = datetime.datetime.now()
# print(f"Time taken to load the json file: {end-start}")
# start = datetime.datetime.now()
if not replace:
data = add_dictionaries_together(data, new_dictionary)
# data_key = data.keys()
# for key, value in new_dictionary.items():
# if key in data_key:
# data[key] = np.concatenate((data[key], value)).tolist()
# end = datetime.datetime.now()
# print(f"Time taken to append the dictionary: {end-start}")
# save the dictionary
# start = datetime.datetime.now()
# print(data)
with open(file_name, "w", encoding="utf-8") as write_file:
json.dump(data, write_file, indent=4, cls=NumpyEncoder)
# end = datetime.datetime.now()
# print(f"Time taken to save the json file: {end-start}")
return data
[docs]
def concatenate_dict_values(dict1, dict2):
"""Adds the values of two dictionaries together.
Parameters
----------
dict1 : `dict`
dictionary to be added.
dict2 : `dict`
dictionary to be added.
Returns
----------
dict1 : `dict`
dictionary with added values.
"""
data_key = dict1.keys()
for key, value in dict2.items():
if key in data_key:
dict1[key] = np.concatenate((dict1[key], value))
return dict1
[docs]
def get_param_from_json(json_file):
"""
Function to get the parameters from json file.
Parameters
----------
json_file : `str`
json file name for storing the parameters.
Returns
----------
param : `dict`
"""
with open(json_file, "r", encoding="utf-8") as f:
param = json.load(f)
for key, value in param.items():
param[key] = np.array(value)
return param
def load_txt_from_module(package, directory, filename):
"""
Function to load a specific dataset from a .txt file within the package
Parameters
----------
package : str
name of the package
directory : str
name of the directory within the package
filename : str
name of the .txt file
Returns
----------
data : `dict`
Dictionary loaded from the .txt file
"""
with resources.path(package + "." + directory, filename) as txt_path:
return np.loadtxt(txt_path)
[docs]
def rejection_sample(pdf, xmin, xmax, size=100, chunk_size=10000):
"""
Helper function for rejection sampling from a pdf with maximum and minimum arguments.
Parameters
----------
pdf : `function`
pdf function.
xmin : `float`
minimum value of the pdf.
xmax : `float`
maximum value of the pdf.
size : `int`, optional
number of samples. Default is 100.
chunk_size : `int`, optional
chunk size for sampling. Default is 10000.
Returns
----------
x_sample : `numpy.ndarray`
samples from the pdf.
"""
x = np.linspace(xmin, xmax, chunk_size)
y = pdf(x)
# Maximum value of the pdf
ymax = np.max(y)
# Rejection sample in chunks
x_sample = []
while len(x_sample) < size:
x_try = np.random.uniform(xmin, xmax, size=chunk_size)
pdf_x_try = pdf(x_try) # Calculate the pdf at the random x values
# this is for comparing with the pdf value at x_try, will be used to accept or reject the sample
y_try = np.random.uniform(0, ymax, size=chunk_size)
# Update the maximum value of the pdf
ymax = max(ymax, np.max(pdf_x_try))
# applying condition to accept the sample
# Add while retaining 1D shape of the list
x_sample += list(x_try[y_try < pdf_x_try])
# Transform the samples to a 1D numpy array
x_sample = np.array(x_sample).flatten()
# Return the correct number of samples
return x_sample[:size]
[docs]
def rejection_sample2d(pdf, xmin, xmax, ymin, ymax, size=100, chunk_size=10000):
"""
Helper function for rejection sampling from a 2D pdf with maximum and minimum arguments.
Parameters
----------
pdf : `function`
2D pdf function.
xmin : `float`
minimum value of the pdf in the x-axis.
xmax : `float`
maximum value of the pdf in the x-axis.
ymin : `float`
minimum value of the pdf in the y-axis.
ymax : `float`
maximum value of the pdf in the y-axis.
size : `int`, optional
number of samples. Default is 100.
chunk_size : `int`, optional
chunk size for sampling. Default is 10000.
Returns
----------
x_sample : `numpy.ndarray`
samples from the pdf in the x-axis.
"""
x = np.random.uniform(xmin, xmax, chunk_size)
y = np.random.uniform(ymin, ymax, chunk_size)
z = pdf(x, y)
# Maximum value of the pdf
zmax = np.max(z)
# Rejection sample in chunks
x_sample = []
y_sample = []
while len(x_sample) < size:
x_try = np.random.uniform(xmin, xmax, size=chunk_size)
y_try = np.random.uniform(ymin, ymax, size=chunk_size)
pdf_xy_try = pdf(x_try, y_try)
# this is for comparing with the pdf value at x_try, will be used to accept or reject the sample
z_try = np.random.uniform(0, zmax, size=chunk_size)
# Update the maximum value of the pdf
zmax = max(zmax, np.max(pdf_xy_try))
x_sample += list(x_try[z_try < pdf_xy_try])
y_sample += list(y_try[z_try < pdf_xy_try])
# Transform the samples to a 1D numpy array
x_sample = np.array(x_sample).flatten()
y_sample = np.array(y_sample).flatten()
# Return the correct number of samples
return x_sample[:size], y_sample[:size]
[docs]
def add_dictionaries_together(dictionary1, dictionary2):
"""
Adds two dictionaries with the same keys together.
Parameters
----------
dictionary1 : `dict`
dictionary to be added.
dictionary2 : `dict`
dictionary to be added.
Returns
----------
dictionary : `dict`
dictionary with added values.
"""
dictionary = {}
# Check if either dictionary empty, in which case only return the dictionary with values
if len(dictionary1) == 0:
return dictionary2
elif len(dictionary2) == 0:
return dictionary1
# Check if the keys are the same
if dictionary1.keys() != dictionary2.keys():
raise ValueError("The dictionaries have different keys.")
for key in dictionary1.keys():
value1 = dictionary1[key]
value2 = dictionary2[key]
# check if the value is empty
bool0 = len(value1) == 0 or len(value2) == 0
# check if the value is an ndarray or a list
bool1 = isinstance(value1, np.ndarray) and isinstance(value2, np.ndarray)
bool2 = isinstance(value1, list) and isinstance(value2, list)
bool3 = isinstance(value1, np.ndarray) and isinstance(value2, list)
bool4 = isinstance(value1, list) and isinstance(value2, np.ndarray)
bool4 = bool4 or bool3
bool5 = isinstance(value1, dict) and isinstance(value2, dict)
if bool0:
if len(value1) == 0 and len(value2) == 0:
dictionary[key] = np.array([])
elif len(value1) != 0 and len(value2) == 0:
dictionary[key] = np.array(value1)
elif len(value1) == 0 and len(value2) != 0:
dictionary[key] = np.array(value2)
elif bool1:
dictionary[key] = np.concatenate((value1, value2))
elif bool2:
dictionary[key] = value1 + value2
elif bool4:
dictionary[key] = np.concatenate((np.array(value1), np.array(value2)))
elif bool5:
dictionary[key] = add_dictionaries_together(
dictionary1[key], dictionary2[key]
)
else:
raise ValueError(
"The dictionary contains an item which is neither an ndarray nor a dictionary."
)
return dictionary
[docs]
def trim_dictionary(dictionary, size):
"""
Filters an event dictionary to only contain the size.
Parameters
----------
dictionary : `dict`
dictionary to be trimmed.
size : `int`
size to trim the dictionary to.
Returns
----------
dictionary : `dict`
trimmed dictionary.
"""
for key in dictionary.keys():
# Check if the item is an ndarray
if isinstance(dictionary[key], np.ndarray):
dictionary[key] = dictionary[key][:size] # Trim the array
# Check if the item is a nested dictionary
elif isinstance(dictionary[key], dict):
dictionary[key] = trim_dictionary(
dictionary[key], size
) # Trim the nested dictionary
else:
raise ValueError(
"The dictionary contains an item which is neither an ndarray nor a dictionary."
)
return dictionary
[docs]
def create_func_pdf_invcdf(x, y, category="function"):
"""
Function to create a interpolated function, inverse function or inverse cdf from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values. This has to sorted in ascending order.
y : `numpy.ndarray`
y values. Corresponding to the x values.
category : `str`, optional
category of the function. Default is "function". Other options are "function_inverse", "pdf" and "inv_cdf".
Returns
----------
pdf : `pdf function`
interpolated pdf function.
inv_pdf : `function inverse`
interpolated inverse pdf function.
inv_cdf : `function`
interpolated inverse cdf.
"""
idx = np.argwhere(np.isnan(y))
x = np.delete(x, idx)
y = np.delete(y, idx)
# create pdf with interpolation
pdf_unorm = interp1d(x, y, kind="cubic", fill_value="extrapolate")
if category == "function":
return pdf_unorm
if category == "function_inverse":
# create inverse function
return interp1d(y, x, kind="cubic", fill_value="extrapolate")
min_, max_ = min(x), max(x)
norm = quad(pdf_unorm, min_, max_)[0]
y = y / norm
if category == "pdf" or category is None:
# normalize the pdf
pdf = interp1d(x, y, kind="cubic", fill_value="extrapolate")
return pdf
# cdf
cdf_values = cumulative_trapezoid(y, x, initial=0)
idx = np.argwhere(cdf_values > 0)[0][0]
cdf_values = cdf_values[idx:]
x = x[idx:]
inv_cdf = interp1d(cdf_values, x, kind="cubic", fill_value="extrapolate")
if category == "inv_cdf":
return inv_cdf
if category == "all":
return [pdf, inv_cdf]
[docs]
def create_conditioned_pdf_invcdf(x, conditioned_y, pdf_func, category):
"""
pdf_func is the function to calculate the pdf of x given y
x is an array and the output of pdf_func is an array
y is the condition
we consider parameter plane of x and y
Parameters
----------
x : `numpy.ndarray`
x values.
conditioned_y : `numpy.ndarray`
conditioned y values.
pdf_func : `function`
function to calculate the pdf of x given y.
category : `str`, optional
category of the function. Default is "function". Other options are "function_inverse", "pdf" and "inv_cdf".
"""
list_ = []
for y in conditioned_y:
phi = pdf_func(x, y)
# append pdf for each y along the x-axis
list_.append(create_func_pdf_invcdf(x, phi, category=category))
return list_
[docs]
def create_func(x, y):
"""
Function to create a spline interpolated function from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
y : `numpy.ndarray`
y values.
Returns
----------
c : `numpy.ndarray`
spline coefficients.
"""
idx = np.argwhere(np.isnan(y))
x = np.delete(x, idx)
y = np.delete(y, idx)
return CubicSpline(x, y).c, x
[docs]
def create_func_inv(x, y):
"""
Function to create a spline interpolated inverse function from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
y : `numpy.ndarray`
y values.
Returns
----------
c : `numpy.ndarray`
spline coefficients.
"""
idx = np.argwhere(np.isnan(y))
x = np.delete(x, idx)
y = np.delete(y, idx)
return CubicSpline(y, x).c, y
[docs]
def create_pdf(x, y):
"""
Function to create a spline interpolated normalized pdf from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
y : `numpy.ndarray`
y values.
Returns
----------
c : `numpy.ndarray`
spline coefficients.
"""
idx = np.argwhere(np.isnan(y))
x = np.delete(x, idx)
y = np.delete(y, idx)
pdf_unorm = interp1d(x, y, kind="cubic", fill_value="extrapolate")
min_, max_ = min(x), max(x)
norm = quad(pdf_unorm, min_, max_)[0]
y = y / norm
return CubicSpline(x, y).c, x
[docs]
def create_inv_cdf_array(x, y):
"""
Function to create a spline interpolated inverse cdf from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
y : `numpy.ndarray`
y values.
Returns
----------
c : `numpy.ndarray`
spline coefficients.
"""
idx = np.argwhere(np.isnan(y))
x = np.delete(x, idx)
y = np.delete(y, idx)
cdf_values = cumulative_trapezoid(y, x, initial=0)
cdf_values = cdf_values / cdf_values[-1]
# to remove duplicate values on x-axis before interpolation
# idx = np.argwhere(cdf_values > 0)[0][0]
# cdf_values = cdf_values[idx:]
# x = x[idx:]
# cdf_values = np.insert(cdf_values, 0, 0)
# x = np.insert(x, 0, x[idx-1])
return np.array([cdf_values, x])
[docs]
def create_conditioned_pdf(x, conditioned_y, pdf_func):
"""
Function to create a conditioned pdf from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
conditioned_y : `numpy.ndarray`
conditioned y values.
pdf_func : `function`
function to calculate the pdf of x given y.
Returns
----------
list_ : `list`
list of pdfs.
"""
list_ = []
for y in conditioned_y:
phi = pdf_func(x, y)
list_.append(create_pdf(x, phi))
return np.array(list_)
[docs]
def create_conditioned_inv_cdf_array(x, conditioned_y, pdf_func):
"""
Function to create a conditioned inv_cdf from the input x and y.
Parameters
----------
x : `numpy.ndarray`
x values.
conditioned_y : `numpy.ndarray`
conditioned y values.
pdf_func : `function`
function to calculate the pdf of x given y.
Returns
----------
list_ : `list`
list of inv_cdfs.
"""
list_ = []
size = len(x)
for y in conditioned_y:
phi = pdf_func(x, y * np.ones(size))
list_.append(create_inv_cdf_array(x, phi))
return np.array(list_)
[docs]
def interpolator_from_json(
identifier_dict,
directory,
sub_directory,
name,
x,
pdf_func=None,
y=None,
conditioned_y=None,
dimension=1,
category="pdf",
create_new=False,
):
"""
Function to decide which interpolator to use.
Parameters
----------
identifier_dict : `dict`
dictionary of identifiers.
directory : `str`
directory to store the interpolator.
sub_directory : `str`
sub-directory to store the interpolator.
name : `str`
name of the interpolator.
x : `numpy.ndarray`
x values.
pdf_func : `function`
function to calculate the pdf of x given y.
y : `numpy.ndarray`
y values.
conditioned_y : `numpy.ndarray`
conditioned y values.
dimension : `int`
dimension of the interpolator. Default is 1.
category : `str`
category of the function. Default is "pdf".
create_new : `bool`
if True, create a new interpolator. Default is False.
Returns
----------
interpolator : `function`
interpolator function.
"""
# check first whether the directory, subdirectory and json exist
path_inv_cdf, it_exist = interpolator_json_path(
identifier_dict=identifier_dict,
directory=directory,
sub_directory=sub_directory,
interpolator_name=name,
)
if create_new:
it_exist = False
if it_exist:
print(f"{name} interpolator will be loaded from {path_inv_cdf}")
# load the interpolator
with open(path_inv_cdf, "rb") as handle:
interpolator = json.load(handle)
return interpolator
else:
print(f"{name} interpolator will be generated at {path_inv_cdf}")
# create the interpolator
if dimension == 1:
if y is None:
y = pdf_func(x)
if category == "function":
interpolator = create_func(x, y)
elif category == "function_inverse":
interpolator = create_func_inv(x, y)
elif category == "pdf":
interpolator = create_pdf(x, y)
elif category == "inv_cdf":
interpolator = create_inv_cdf_array(x, y)
else:
raise ValueError(
"The category given should be function, function_inverse, pdf or inv_cdf."
)
elif dimension == 2:
if category == "pdf":
interpolator = create_conditioned_pdf(x, conditioned_y, pdf_func)
elif category == "inv_cdf":
interpolator = create_conditioned_inv_cdf_array(
x, conditioned_y, pdf_func
)
else:
raise ValueError("The dimension is not supported.")
# save the interpolator
save_json(path_inv_cdf, interpolator)
return interpolator
[docs]
def interpolator_json_path(
identifier_dict,
directory,
sub_directory,
interpolator_name,
):
"""
Function to create the interpolator json file path.
Parameters
----------
identifier_dict : `dict`
dictionary of identifiers.
directory : `str`
directory to store the interpolator.
sub_directory : `str`
sub-directory to store the interpolator.
interpolator_name : `str`
name of the interpolator.
Returns
----------
path_inv_cdf : `str`
path of the interpolator json file.
it_exist : `bool`
if True, the interpolator exists.
"""
# convert values of identifier_dict to string
identifier_dict = {k: str(v) for k, v in identifier_dict.items()}
# check the dir 'interpolator' exist
full_dir = directory + "/" + sub_directory
if not os.path.exists(directory):
os.makedirs(directory)
os.makedirs(full_dir)
else:
if not os.path.exists(full_dir):
os.makedirs(full_dir)
# check if identifier_dict_list.json exists
path1 = full_dir + "/init_dict.json"
if not os.path.exists(path1):
dict_list = []
save_json(path1, dict_list)
# check if the input dict is the same as one of the dict inside the json file
identifier_dict_stored = load_json(path1)
path2 = full_dir
len_ = len(identifier_dict_stored)
if identifier_dict in identifier_dict_stored:
idx = identifier_dict_stored.index(identifier_dict)
# load the interpolator
path_inv_cdf = path2 + "/" + interpolator_name + "_" + str(idx) + ".json"
# there will be exception if the file is deleted by mistake
if os.path.exists(path_inv_cdf):
it_exist = True
else:
it_exist = False
else:
it_exist = False
path_inv_cdf = path2 + "/" + interpolator_name + "_" + str(len_) + ".json"
identifier_dict_stored.append(identifier_dict)
save_json(path1, identifier_dict_stored)
return path_inv_cdf, it_exist
# def interpolator_pdf_conditioned(x, conditioned_y, y_array, interpolator_list):
# """
# Function to find the pdf interpolator coefficients from the conditioned y.
# Parameters
# ----------
# x : `numpy.ndarray`
# x values.
# conditioned_y : `float`
# conditioned y value.
# y_array : `numpy.ndarray`
# y values.
# interpolator_list : `list`
# list of interpolators.
# Returns
# ----------
# interpolator_list[idx](x) : `numpy.ndarray`
# samples from the interpolator.
# """
# # find the index of z in zlist
# idx = np.searchsorted(y_array, conditioned_y)
# return interpolator_list[idx](x)
[docs]
def batch_handler(
size,
batch_size,
sampling_routine,
output_jsonfile,
save_batch=True,
resume=False,
param_name="parameters",
):
"""
Function to run the sampling in batches.
Parameters
----------
size : `int`
number of samples.
batch_size : `int`
batch size.
sampling_routine : `function`
sampling function. It should have 'size' as input and return a dictionary.
output_jsonfile : `str`
json file name for storing the parameters.
save_batch : `bool`, optional
if True, save sampled parameters in each iteration. Default is True.
resume : `bool`, optional
if True, resume sampling from the last batch. Default is False.
param_name : `str`, optional
name of the parameter. Default is 'parameters'.
Returns
----------
dict_buffer : `dict`
dictionary of parameters.
"""
# sampling in batches
if resume and os.path.exists(output_jsonfile):
# get sample from json file
dict_buffer = get_param_from_json(output_jsonfile)
else:
remove_file(output_jsonfile)
dict_buffer = None
# if size is multiple of batch_size
if size % batch_size == 0:
num_batches = size // batch_size
# if size is not multiple of batch_size
else:
num_batches = size // batch_size + 1
# note frac_batches+(num_batches-1)*batch_size = size
if size > batch_size:
frac_batches = size - (num_batches - 1) * batch_size
# if size is less than batch_size
else:
frac_batches = size
track_batches = 0 # to track the number of batches
if not resume:
# create new first batch with the frac_batches
track_batches, dict_buffer = create_batch_params(
sampling_routine,
frac_batches,
dict_buffer,
save_batch,
output_jsonfile,
track_batches=track_batches,
)
else:
# check where to resume from
# identify the last batch and assign current batch number
# try-except is added to avoid the error when the file does not exist or if the file is empty or corrupted or does not have the required key.
try:
print(f"resuming from {output_jsonfile}")
len_ = len(list(dict_buffer.values())[0])
track_batches = (len_ - frac_batches) // batch_size + 1
except:
# create new first batch with the frac_batches
track_batches, dict_buffer = create_batch_params(
sampling_routine,
frac_batches,
dict_buffer,
save_batch,
output_jsonfile,
track_batches=track_batches,
)
# loop over the remaining batches
min_, max_ = track_batches, num_batches
# print(f"min_ = {min_}, max_ = {max_}")
save_param = False
if min_ == max_:
print(f"{param_name} already sampled.")
save_param = True
elif min_ > max_:
len_ = len(list(dict_buffer.values())[0])
print(
f"existing {param_name} size is {len_} is more than the required size={size}. It will be trimmed."
)
dict_buffer = trim_dictionary(dict_buffer, size)
save_param = True
else:
for i in range(min_, max_):
_, dict_buffer = create_batch_params(
sampling_routine,
batch_size,
dict_buffer,
save_batch,
output_jsonfile,
track_batches=i,
resume=True,
)
if save_batch:
# if save_batch=True, then dict_buffer is only the last batch
# batch saving is already done in create_batch_params function
dict_buffer = get_param_from_json(output_jsonfile)
else: # dont save in batches
# this if condition is required if there is nothing to save
save_param = True
if save_param:
# store all params in json file
print(f"saving all {param_name} in {output_jsonfile} ")
append_json(output_jsonfile, dict_buffer, replace=True)
return dict_buffer
[docs]
def create_batch_params(
sampling_routine,
frac_batches,
dict_buffer,
save_batch,
output_jsonfile,
track_batches,
resume=False,
):
"""
Helper function to batch_handler. It create batch parameters and store in a dictionary.
Parameters
----------
sampling_routine : `function`
sampling function. It should have 'size' as input and return a dictionary.
frac_batches : `int`
batch size.
dict_buffer : `dict`
dictionary of parameters.
save_batch : `bool`
if True, save sampled parameters in each iteration.
output_jsonfile : `str`
json file name for storing the parameters.
track_batches : `int`
track the number of batches.
resume : `bool`, optional
if True, resume sampling from the last batch. Default is False.
Returns
----------
track_batches : `int`
track the number of batches.
"""
track_batches = track_batches + 1
print(f"Batch no. {track_batches}")
param = sampling_routine(
size=frac_batches,
save_batch=save_batch,
output_jsonfile=output_jsonfile,
resume=resume,
)
# adding batches and hold it in the buffer attribute.
if not save_batch:
# in the new batch (new sampling run), dict_buffer will be None
if dict_buffer is None:
dict_buffer = param
else:
for key, value in param.items():
try:
dict_buffer[key] = np.concatenate((dict_buffer[key], value))
except Exception as e:
raise ValueError(
f"For key {key}, concatenate failed for dictionary 1 {dict_buffer[key]} and dictionary 2 {value}. Error: {e}"
) from e
else:
# store all params in json file
print(f"saving batch {track_batches} in {output_jsonfile} ")
dict_buffer = append_json(
file_name=output_jsonfile,
new_dictionary=param,
old_dictionary=dict_buffer,
replace=not (resume),
)
return track_batches, dict_buffer
[docs]
def monte_carlo_integration(function, uniform_prior, size=10000):
"""
Function to perform Monte Carlo integration.
Parameters
----------
function : `function`
function to be integrated.
prior : `function`
prior function.
Returns
----------
integral : `float`
integral value.
"""
# sample from the prior
x = uniform_prior(size=size)
# calculate the function
y = np.array([function(x[i]) for i in range(size)])
# calculate the integral
integral = np.mean(y) * (np.max(x) - np.min(x))
return integral
@njit
[docs]
def cubic_spline_interpolator(xnew, coefficients, x):
"""
Function to interpolate using cubic spline.
Parameters
----------
xnew : `numpy.ndarray`
new x values.
coefficients : `numpy.ndarray`
coefficients of the cubic spline.
x : `numpy.ndarray`
x values.
Returns
----------
result : `numpy.ndarray`
interpolated values.
"""
# Handling extrapolation
i = np.searchsorted(x, xnew) - 1
idx1 = xnew <= x[0]
idx2 = xnew > x[-1]
i[idx1] = 0
i[idx2] = len(x) - 2
# i = np.array(i, dtype=np.int32)
# x = np.array(x, dtype=np.float64)
# Calculate the relative position within the interval
# print(f"i = {i}\n xnew = {xnew}\n x = {x}")
# print(x[i])
dx = xnew - x[i]
# Calculate the interpolated value
# Cubic polynomial: a + b*dx + c*dx^2 + d*dx^3
# coefficients = np.array(coefficients, dtype=np.float64)
# print(f"coefficients = {coefficients}")
a, b, c, d = coefficients[:, i]
# result = a + b*dx + c*dx**2 + d*dx**3
result = d + c * dx + b * dx**2 + a * dx**3
return result
@njit
[docs]
def pdf_cubic_spline_interpolator(xnew, norm_const, coefficients, x):
"""
Function to interpolate pdf using cubic spline.
Parameters
----------
xnew : `numpy.ndarray`
new x values.
coefficients : `numpy.ndarray`
coefficients of the cubic spline.
x : `numpy.ndarray`
x values.
Returns
----------
result : `numpy.ndarray`
interpolated values.
"""
# Handling extrapolation
i = np.searchsorted(x, xnew) - 1
idx1 = xnew <= x[0]
idx2 = xnew > x[-1]
i[idx1] = 0
i[idx2] = len(x) - 2
# i = np.array(i, dtype=np.int32)
# x = np.array(x, dtype=np.float64)
# Calculate the relative position within the interval
# print(f"i = {i}\n xnew = {xnew}\n x = {x}")
# print(x[i])
dx = xnew - x[i]
# Calculate the interpolated value
# Cubic polynomial: a + b*dx + c*dx^2 + d*dx^3
# coefficients = np.array(coefficients, dtype=np.float64)
# print(f"coefficients = {coefficients}")
a, b, c, d = coefficients[:, i]
# result = a + b*dx + c*dx**2 + d*dx**3
result = d + c * dx + b * dx**2 + a * dx**3
return result / norm_const
@njit
[docs]
def pdf_cubic_spline_interpolator2d_array(
xnew_array, ynew_array, norm_array, coefficients, x, y
):
"""
Function to calculate the interpolated value of snr_partialscaled given the mass ratio (ynew) and total mass (xnew). This is based off 2D bicubic spline interpolation.
Parameters
----------
xnew_array : `numpy.ndarray`
Total mass of the binary.
ynew_array : `numpy.ndarray`
Mass ratio of the binary.
coefficients : `numpy.ndarray`
Array of coefficients for the cubic spline interpolation.
x : `numpy.ndarray`
Array of total mass values for the coefficients.
y : `numpy.ndarray`
Array of mass ratio values for the coefficients.
Returns
-------
result : `float`
Interpolated value of snr_partialscaled.
"""
n_samples = len(xnew_array)
result_array = np.empty(n_samples, dtype=np.float64)
xnew_single = np.empty(1, dtype=np.float64)
for i in range(n_samples):
xnew = xnew_array[i]
ynew = ynew_array[i]
xnew_single[0] = xnew
len_y = len(y)
# find the index nearest to the ynew in y
y_idx = np.searchsorted(y, ynew) - 1 if ynew > y[0] else 0
if (ynew > y[0]) and (ynew < y[1]):
if ynew > y[y_idx] + (y[y_idx + 1] - y[y_idx]) / 2:
y_idx = y_idx + 1
result = pdf_cubic_spline_interpolator(
xnew_single, norm_array[y_idx], coefficients[y_idx], x[y_idx]
)[0]
elif y_idx == 0: # lower end point
result = pdf_cubic_spline_interpolator(
xnew_single, norm_array[0], coefficients[0], x[0]
)[0]
# print("a")
elif y_idx + 1 == len_y: # upper end point
result = pdf_cubic_spline_interpolator(
xnew_single, norm_array[-1], coefficients[-1], x[-1]
)[0]
# print("b")
elif y_idx + 2 == len_y: # upper end point
result = pdf_cubic_spline_interpolator(
xnew_single, norm_array[-1], coefficients[-1], x[-1]
)[0]
# print("b")
else:
y_idx1 = y_idx - 1
y_idx2 = y_idx
y_idx3 = y_idx + 1
y_idx4 = y_idx + 2
coeff_low, coeff_high = 4, 8
# print("c")
y1, y2, y3, y4 = y[y_idx1], y[y_idx2], y[y_idx3], y[y_idx4]
z1 = pdf_cubic_spline_interpolator(
xnew_single, norm_array[y_idx1], coefficients[y_idx1], x[y_idx1]
)[0]
z2 = pdf_cubic_spline_interpolator(
xnew_single, norm_array[y_idx2], coefficients[y_idx2], x[y_idx2]
)[0]
z3 = pdf_cubic_spline_interpolator(
xnew_single, norm_array[y_idx3], coefficients[y_idx3], x[y_idx3]
)[0]
z4 = pdf_cubic_spline_interpolator(
xnew_single, norm_array[y_idx4], coefficients[y_idx4], x[y_idx4]
)[0]
coeff = coefficients_generator_ler(y1, y2, y3, y4, z1, z2, z3, z4)
matrixD = coeff[coeff_low:coeff_high]
matrixB = np.array([ynew**3, ynew**2, ynew, 1.0])
result = np.dot(matrixB, matrixD)
result_array[i] = result
# Clip negative values to zero
for i in range(n_samples):
if result_array[i] < 0.0:
result_array[i] = 0.0
return result_array
@njit
[docs]
def cubic_spline_interpolator2d_array(xnew_array, ynew_array, coefficients, x, y):
"""
Function to calculate the interpolated value of snr_partialscaled given the mass ratio (ynew) and total mass (xnew). This is based off 2D bicubic spline interpolation.
Parameters
----------
xnew_array : `numpy.ndarray`
Total mass of the binary.
ynew_array : `numpy.ndarray`
Mass ratio of the binary.
coefficients : `numpy.ndarray`
Array of coefficients for the cubic spline interpolation.
x : `numpy.ndarray`
Array of total mass values for the coefficients.
y : `numpy.ndarray`
Array of mass ratio values for the coefficients.
Returns
-------
result : `float`
Interpolated value of snr_partialscaled.
"""
n_samples = len(xnew_array)
result_array = np.empty(n_samples, dtype=np.float64)
xnew_single = np.empty(1, dtype=np.float64)
for i in range(n_samples):
xnew = xnew_array[i]
ynew = ynew_array[i]
xnew_single[0] = xnew
len_y = len(y)
# find the index nearest to the ynew in y
y_idx = np.searchsorted(y, ynew) - 1 if ynew > y[0] else 0
if (ynew > y[0]) and (ynew < y[1]):
if ynew > y[y_idx] + (y[y_idx + 1] - y[y_idx]) / 2:
y_idx = y_idx + 1
result_array[i] = cubic_spline_interpolator(
xnew_single, coefficients[y_idx], x[y_idx]
)[0]
# print(f"a) idx = {y_idx}")
elif y_idx == 0: # lower end point
result_array[i] = cubic_spline_interpolator(
xnew_single, coefficients[0], x[0]
)[0]
# print(f"a) idx = {y_idx}")
elif y_idx + 1 == len_y: # upper end point
result_array[i] = cubic_spline_interpolator(
xnew_single, coefficients[-1], x[-1]
)[0]
# print(f"b) idx = {y_idx}")
elif y_idx + 2 == len_y: # upper end point
result_array[i] = cubic_spline_interpolator(
xnew_single, coefficients[-1], x[-1]
)[0]
# print(f"c) idx = {y_idx}")
else:
y_idx1 = y_idx - 1
y_idx2 = y_idx
y_idx3 = y_idx + 1
y_idx4 = y_idx + 2
coeff_low, coeff_high = 4, 8
# print(f"d) idx1, idx2, idx3, idx4 = {y_idx1}, {y_idx2}, {y_idx3}, {y_idx4}")
y1, y2, y3, y4 = y[y_idx1], y[y_idx2], y[y_idx3], y[y_idx4]
z1 = cubic_spline_interpolator(
xnew_single, coefficients[y_idx1], x[y_idx1]
)[0]
z2 = cubic_spline_interpolator(
xnew_single, coefficients[y_idx2], x[y_idx2]
)[0]
z3 = cubic_spline_interpolator(
xnew_single, coefficients[y_idx3], x[y_idx3]
)[0]
z4 = cubic_spline_interpolator(
xnew_single, coefficients[y_idx4], x[y_idx4]
)[0]
coeff = coefficients_generator_ler(y1, y2, y3, y4, z1, z2, z3, z4)
matrixD = coeff[coeff_low:coeff_high]
matrixB = np.array([ynew**3, ynew**2, ynew, 1.0])
result_array[i] = np.dot(matrixB, matrixD)
return result_array
@njit
[docs]
def coefficients_generator_ler(y1, y2, y3, y4, z1, z2, z3, z4):
"""
Function to generate the coefficients for the cubic spline interpolation of fn(y)=z.
Parameters
----------
y1, y2, y3, y4, z1, z2, z3, z4: `float`
Values of y and z for the cubic spline interpolation.
Returns
-------
coefficients: `numpy.ndarray`
Coefficients for the cubic spline interpolation.
"""
matrixA = np.array(
[
[y1**3, y1**2, y1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[y2**3, y2**2, y2, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, y2**3, y2**2, y2, 1, 0, 0, 0, 0],
[0, 0, 0, 0, y3**3, y3**2, y3, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, y3**3, y3**2, y3, 1],
[0, 0, 0, 0, 0, 0, 0, 0, y4**3, y4**2, y4, 1],
[3 * y2**2, 2 * y2, 1, 0, -3 * y2**2, -2 * y2, -1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 3 * y3**2, 2 * y3, 1, 0, -3 * y3**2, -2 * y3, -1, 0],
[6 * y2, 2, 0, 0, -6 * y2, -2, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 6 * y3, 2, 0, 0, -6 * y3, -2, 0, 0],
[6 * y1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 6 * y4, 2, 0, 0],
]
)
matrixC = np.array([z1, z2, z2, z3, z3, z4, 0, 0, 0, 0, 0, 0])
return np.dot(np.linalg.inv(matrixA), matrixC)
@njit
@njit
@njit
[docs]
def normal_pdf(x, mean=0.0, std=0.05):
"""
Calculate the value of a normal probability density function.
Parameters
----------
x : `float` or `numpy.ndarray`
The value(s) at which to evaluate the PDF.
mean : `float`, optional
The mean of the normal distribution. Default is 0.
std : `float`, optional
The standard deviation of the normal distribution. Default is 0.05.
Returns
-------
pdf : `float` or `numpy.ndarray`
The probability density function value(s) at x.
"""
return (1 / (std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mean) / std) ** 2)
@njit
[docs]
def normal_pdf_2d(x, y, mean_x=0.0, std_x=0.05, mean_y=0.0, std_y=0.05):
"""
Calculate the value of a 2D normal probability density function.
Parameters
----------
x : `float`
The x-coordinate for which the PDF is evaluated.
y : `float`
The y-coordinate for which the PDF is evaluated.
mean_x : `float`, optional
The mean of the normal distribution along the x-axis. Default is 0.
std_x : `float`, optional
The standard deviation of the normal distribution along the x-axis. Default is 0.05.
mean_y : `float`, optional
The mean of the normal distribution along the y-axis. Default is 0.
std_y : `float`, optional
The standard deviation of the normal distribution along the y-axis. Default is 0.05.
Returns
-------
`float`
The probability density function value at the given x and y coordinates.
"""
factor_x = (1 / (std_x * np.sqrt(2 * np.pi))) * np.exp(
-0.5 * ((x - mean_x) / std_x) ** 2
)
factor_y = (1 / (std_y * np.sqrt(2 * np.pi))) * np.exp(
-0.5 * ((y - mean_y) / std_y) ** 2
)
return factor_x * factor_y
[docs]
def load_txt_from_module(package, directory, filename):
""" """
with resources.path(package + "." + directory, filename) as txt_path:
return np.loadtxt(txt_path)
@njit
[docs]
def cumulative_trapezoid(y, x=None, dx=1.0, initial=0.0):
"""
Compute the cumulative integral of a function using the trapezoidal rule.
"""
if x is None:
x = np.arange(len(y)) * dx
# Calculate the cumulative integral using trapezoidal rule
cumsum = np.zeros_like(y)
cumsum[0] = initial
for i in range(1, len(y)):
cumsum[i] = cumsum[i - 1] + (y[i - 1] + y[i]) * (x[i] - x[i - 1]) / 2.0
return cumsum
@njit
[docs]
def sample_from_powerlaw_distribution(size, alphans, mminns, mmaxns):
"""
Inverse transform sampling for a power-law mass distribution:
p(m) ∝ m^{-alphans}, m in [mminns, mmaxns]
Parameters
----------
size : int
Number of samples to generate.
alphans : float
Power-law index (alpha).
mminns : float
Minimum neutron star mass (lower bound).
mmaxns : float
Maximum neutron star mass (upper bound).
random_state : int, np.random.Generator, or None
Seed or random generator for reproducibility.
Returns
-------
m : ndarray
Array of sampled neutron star masses.
"""
u = np.random.uniform(0, 1, size)
if alphans == 1.0:
# Special case α=1
m = mminns * (mmaxns / mminns) ** u
elif alphans == 0.0:
# Special case α=0 (uniform distribution)
m = mminns + (mmaxns - mminns) * u
else:
pow1 = 1.0 - alphans
mmin_pow = mminns**pow1
mmax_pow = mmaxns**pow1
m = (u * (mmax_pow - mmin_pow) + mmin_pow) ** (1.0 / pow1)
return m