from .utils import interpolator_json_path, cubic_spline_interpolator, pdf_cubic_spline_interpolator, cubic_spline_interpolator2d_array, inverse_transform_sampler, pdf_cubic_spline_interpolator2d_array, save_json, load_json, inverse_transform_sampler2d
import numpy as np
from scipy.integrate import quad, cumulative_trapezoid
from scipy.interpolate import CubicSpline
from scipy.stats import gaussian_kde
from numba import njit
[docs]
class FunctionConditioning():
def __init__(self,
function=None, # can also be an array of function values
x_array=None,
conditioned_y_array=None, # if this is not none, 2D interpolation will be used
y_array=None,
non_zero_function=False,
gaussian_kde=False,
gaussian_kde_kwargs={},
identifier_dict={},
directory='./interpolator_json',
sub_directory='default',
name='default',
create_new=False,
create_function=False,
create_function_inverse=False,
create_pdf=False,
create_rvs=False,
multiprocessing_function=False,
callback=None,
):
create = self.create_decision_function(create_function, create_function_inverse, create_pdf, create_rvs)
[docs]
self.info = identifier_dict.copy()
[docs]
self.callback = callback
if create:
# create_interpolator input list
input_list = [function, x_array, conditioned_y_array, create_function_inverse, create_pdf, create_rvs, multiprocessing_function]
input_list_kde = [x_array, y_array, gaussian_kde_kwargs]
# check first whether the directory, subdirectory and pickle exist
path_inv_cdf, it_exist = interpolator_json_path(
identifier_dict=identifier_dict,
directory=directory,
sub_directory=sub_directory,
interpolator_name=name,
)
# if the interpolator exists, load it
if create_new:
it_exist = False
if it_exist:
print(f"{name} interpolator will be loaded from {path_inv_cdf}")
# load the interpolator, where the interpolator is a dictionary
# interpolator = {
# 'x_array': x_array,
# 'z_array': z_array,
# 'conditioned_y_array': conditioned_y_array,
# 'function_spline': function_spline,
# 'function_inverse_spline': function_inverse_spline,
# 'pdf_norm_const': pdf_norm_const,
# 'cdf_values': cdf_values,
# }
interpolator = load_json(path_inv_cdf)
else:
print(f"{name} interpolator will be generated at {path_inv_cdf}")
if not gaussian_kde:
interpolator = self.create_interpolator(*input_list)
# save the interpolator
save_json(path_inv_cdf, interpolator)
else:
# gaussian KDE
interpolator = self.create_gaussian_kde(*input_list_kde)
#------------
# check scipy gaussian kde can be saved as json
#------------
# save_json(path_inv_cdf, interpolator)
if not gaussian_kde:
# Convert loaded JSON lists to NumPy arrays for Numba compatibility
x_array = np.array(interpolator['x_array'])
z_array = np.array(interpolator['z_array'])
conditioned_y_array = np.array(interpolator['conditioned_y_array']) if interpolator['conditioned_y_array'] is not None else None
y_array = None
function_spline = np.array(interpolator['function_spline'])
function_inverse_spline = np.array(interpolator['function_inverse_spline']) if interpolator['function_inverse_spline'] is not None else None
pdf_norm_const = np.array(interpolator['pdf_norm_const']) if interpolator['pdf_norm_const'] is not None else None
cdf_values = np.array(interpolator['cdf_values']) if interpolator['cdf_values'] is not None else None
if conditioned_y_array is None:
# function is 1D
# njit(lambda x: cubic_spline_interpolator(x, function_spline, x_array))
function_any = lambda x: cubic_spline_interpolator(x, function_spline, x_array)
def function_non_zero(x):
result = cubic_spline_interpolator(x, function_spline, x_array)
idx = result<0.0
result[idx] = 0.0
return result
function_final = function_any if non_zero_function else function_non_zero
self.function = njit(function_final) if create_function else None
# inverse function is 1D
function_any = lambda x: cubic_spline_interpolator(x, function_inverse_spline, z_array)
def function_non_zero(x):
result = cubic_spline_interpolator(x, function_inverse_spline, z_array)
idx = result<0.0
result[idx] = 0.0
return result
function_final = function_any if non_zero_function else function_non_zero
self.function_inverse = njit(function_final) if create_function_inverse else None
# pdf is 1D
self.pdf = njit(lambda x: pdf_cubic_spline_interpolator(x, pdf_norm_const, function_spline, x_array)) if create_pdf else None
# sampler is 1D
self.rvs = njit(lambda size: inverse_transform_sampler(size, cdf_values, x_array)) if create_rvs else None
else:
# function is 2D
function_any = lambda x, y: cubic_spline_interpolator2d_array(x, y, function_spline, x_array, conditioned_y_array)
def function_non_zero(x, y):
result = cubic_spline_interpolator2d_array(x, y, function_spline, x_array, conditioned_y_array)
idx = result<0.0
result[idx] = 0.0
return result
function_final = function_any if non_zero_function else function_non_zero
self.function = njit(function_final) if create_function else None
# inverse function is 2D
function_any = lambda x, y: cubic_spline_interpolator2d_array(x, y, function_inverse_spline, z_array, conditioned_y_array)
def function_non_zero(x, y):
result = cubic_spline_interpolator2d_array(x, y, function_inverse_spline, z_array, conditioned_y_array)
idx = result<0.0
result[idx] = 0.0
return result
function_final = function_any if non_zero_function else function_non_zero
self.function_inverse = njit(function_final) if create_function_inverse else None
self.pdf = njit(lambda x, y: pdf_cubic_spline_interpolator2d_array(x, y, pdf_norm_const, function_spline, x_array, conditioned_y_array)) if create_pdf else None
self.rvs = njit(lambda size, y: inverse_transform_sampler2d(size, y, cdf_values, x_array, conditioned_y_array)) if create_rvs else None
self.x_array = x_array
self.z_array = z_array
self.conditioned_y_array = conditioned_y_array
self.function_spline = function_spline
self.function_inverse_spline = function_inverse_spline
self.pdf_norm_const = pdf_norm_const
self.cdf_values = cdf_values
else:
x_array = interpolator['x_array']
y_array = interpolator['y_array']
kde_object = interpolator['kde_object']
self.pdf = lambda x: kde_object.pdf(x) if create_pdf else None
if y_array is None:
self.rvs = lambda size: kde_object.resample(size)[0] if create_rvs else None
else:
self.rvs = lambda size: kde_object.resample(size) if create_rvs else None
self.x_array = x_array
self.y_array = y_array
self.kde_object = kde_object
else:
self.conditioned_y_array = None
self.x_array = None
self.y_array = None
self.kde_object = None
[docs]
def __call__(self, *args):
args = [np.array(arg).reshape(-1) if isinstance(arg, float) else arg for arg in args]
return getattr(self, self.callback)(*args)
[docs]
def create_decision_function(self, create_function, create_function_inverse, create_pdf, create_rvs):
decision_bool = True
if not isinstance(create_function, bool) and callable(create_function):
self.function = create_function
decision_bool = False
if not isinstance(create_function_inverse, bool) and callable(create_function_inverse):
self.function_inverse = create_function_inverse
decision_bool = False
if not isinstance(create_pdf, bool) and callable(create_pdf):
self.pdf = create_pdf
decision_bool = False
if not isinstance(create_rvs, bool) and callable(create_rvs):
self.rvs = create_rvs
decision_bool = False
return decision_bool
[docs]
def create_gaussian_kde(self, x_array, y_array, gaussian_kde_kwargs):
# 1d KDE
if y_array is None:
kde = gaussian_kde(x_array, **gaussian_kde_kwargs)
else:
data = np.vstack([x_array, y_array])
kde = gaussian_kde(data, **gaussian_kde_kwargs)
return {
'x_array': x_array,
'y_array': y_array,
'kde_object': kde,
}
[docs]
def create_interpolator(self, function, x_array, conditioned_y_array, create_function_inverse, create_pdf, create_rvs, multiprocessing_function):
# function can be numpy array or callable
# x_array, z_array are 2D arrays if conditioned_y_array is not None
x_array, z_array, conditioned_y_array = self.create_z_array(x_array, function, conditioned_y_array, create_pdf, create_rvs, multiprocessing_function)
del function
function_spline = self.function_spline_generator(x_array, z_array, conditioned_y_array)
if create_function_inverse:
if conditioned_y_array is None:
idx_sort = np.argsort(z_array)
else:
idx_sort = np.argsort(z_array, axis=1)
x_array_ = x_array[idx_sort]
z_array_ = z_array[idx_sort]
# check z_array is strictly increasing
# if (not np.all(np.diff(z_array) > 0)) or (not np.all(np.diff(z_array) < 0)):
# raise ValueError("z_array must be strictly increasing")
function_inverse_spline = self.function_spline_generator(z_array_, x_array_, conditioned_y_array)
else:
function_inverse_spline = None
if create_pdf or create_rvs:
# cannot have -ve pdf
pdf_norm_const = self.pdf_norm_const_generator(x_array, function_spline, conditioned_y_array)
if create_rvs:
cdf_values = self.cdf_values_generator(x_array, z_array, conditioned_y_array)
else:
cdf_values = None
else:
pdf_norm_const = None
cdf_values = None
return {
'x_array': x_array,
'z_array': z_array,
'conditioned_y_array': conditioned_y_array,
'function_spline': function_spline,
'function_inverse_spline': function_inverse_spline,
'pdf_norm_const': pdf_norm_const,
'cdf_values': cdf_values,
}
[docs]
def create_z_array(self, x_array, function, conditioned_y_array, create_pdf, create_rvs, multiprocessing_function):
if callable(function):
# 1D
if conditioned_y_array is None:
z_array = function(x_array)
# remove nan values
idx = np.argwhere(np.isnan(z_array))
x_array = np.delete(x_array, idx)
z_array = np.delete(z_array, idx)
# 2D
else:
# check if x_array is 2D, if not, make it 2D of shape (len(conditioned_y_array), len(x_array))
if x_array.ndim == 1:
x_array = np.array([x_array]*len(conditioned_y_array))
idx = np.argsort(conditioned_y_array)
conditioned_y_array = conditioned_y_array[idx]
# x_array is 2D here, each row corresponds to a different y value
x_array = x_array[idx]
# sort each row of x_array
x_array = np.sort(x_array, axis=1)
if multiprocessing_function:
z_array = function(x_array, conditioned_y_array)
else:
z_list = []
for i, y in enumerate(conditioned_y_array):
try:
z_list.append(function(x_array[i], y*np.ones_like(x_array[i])))
except:
# print(x_array[i], y)
z_list.append(function(x_array[i], y))
z_array = np.array(z_list)
else:
if conditioned_y_array is None:
z_array = function
# remove nan values
idx = np.argwhere(np.isnan(z_array))
x_array = np.delete(x_array, idx)
z_array = np.delete(z_array, idx)
else:
if x_array.ndim == 1:
x_array = np.array([x_array]*len(conditioned_y_array))
if function.ndim == 1:
raise ValueError('function must be 2D array if conditioned_y_array is not None')
# row sort
idx = np.argsort(conditioned_y_array)
conditioned_y_array = conditioned_y_array[idx]
# x_array is 2D here, each row corresponds to a different y value
x_array = x_array[idx]
z_array = function[idx]
z_list = []
x_list = []
for i in range(len(conditioned_y_array)):
# column sort
idx = np.argsort(x_array[i])
x_list.append(x_array[i][idx])
z_list.append(z_array[i][idx])
x_array = np.array(x_list)
z_array = np.array(z_list)
# cannot have -ve pdf
if create_pdf or create_rvs:
z_array[z_array < 0.0] = 0.0
return x_array, z_array, conditioned_y_array
[docs]
def cdf_values_generator(self, x_array, z_array, conditioned_y_array):
# 1D case
if conditioned_y_array is None:
# z_array[z_array<0.] = 0. # already done
cdf_values = cumulative_trapezoid(z_array, x_array, initial=0)
cdf_values = cdf_values/cdf_values[-1]
# 2D case
else:
cdf_values = []
for i, y in enumerate(conditioned_y_array):
z_array_ = z_array[i]
z_array_[z_array_<0.] = 0.
cdfs_ = cumulative_trapezoid(z_array_, x_array[i], initial=0)
cdf_values.append(cdfs_/cdfs_[-1])
# cdf_values.append(cdfs_)
return np.array(cdf_values)
[docs]
def pdf_norm_const_generator(self, x_array, function_spline, conditioned_y_array):
# 1D case
if conditioned_y_array is None:
# pdf_unorm = lambda x: cubic_spline_interpolator(np.array([x]), function_spline, x_array)
def pdf_unorm(x):
result = cubic_spline_interpolator(np.array([x]), function_spline, x_array)
idx = result<0.
result[idx] = 0.0
return result
norm = quad(pdf_unorm, min(x_array), max(x_array))[0]
return norm
# 2D case
else:
norm = []
for i, y in enumerate(conditioned_y_array):
# pdf_unorm = lambda x: cubic_spline_interpolator(np.array([x]), function_spline[i], x_array[i])
def pdf_unorm(x):
result = cubic_spline_interpolator(np.array([x]), function_spline[i], x_array[i])
idx = result<0.
result[idx] = 0.0
return result
norm.append(quad(pdf_unorm, min(x_array[i]), max(x_array[i]))[0])
return np.array(norm)
[docs]
def function_spline_generator(self, x_array, z_array, conditioned_y_array):
# 1D case
if conditioned_y_array is None:
function_spline = CubicSpline(x_array, z_array).c
# 2D case
else:
function_spline = []
for i, y in enumerate(conditioned_y_array):
function_spline.append(CubicSpline(x_array[i], z_array[i]).c)
return np.array(function_spline)