# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
import subprocess
import warnings
import numpy as np
import pandas as pd
from typing import List, Dict, Union # for type hints in functions
[docs]def series_to_numpy(data: Union[np.ndarray, pd.Series]) -> np.ndarray:
"""
Helper function that converts a Pandas Series to a NumPy array if necessary.
Parameters
----------
data : np.ndarray or pd.Series
Input data to be converted. If already a NumPy array,
it is returned unchanged.
Returns
-------
np.ndarray
The input data as a NumPy array.
"""
if type(data) == pd.Series:
data = data.to_numpy()
elif type(data) == list:
data = np.array(data)
else:
data
return data
[docs]def ensure_nested_dict(nested_dict, keys):
"""
Ensures a nested dictionary structure exists the given sequence of keys.
Parameters
----------
nested_dict: dict
The dictionary to operate on. It will be modified in place
to include the full nested structure.
keys : Sequence
Sequence of keys representing the path of nested dictionaries to create.
Returns
-------
dict
The final nested dictionary at the end of the path.
"""
# Iterate through keys to build the nested structure
for key in keys:
if key not in nested_dict:
nested_dict[key] = dict()
nested_dict = nested_dict[key]
# Return nested dictionary
return nested_dict
[docs]def store_in_nested_dict(nested_dict, new_data, keys):
"""
Stores data in a nested dictionary structure, for the given keys.
Intermediate levels will be created if they do not already exist.
Parameters
----------
nested_dict : dict
The dictionary to operate on. It will be modified in place
new_data : Any
Data to store at the specified nested location.
keys : Sequence
Sequence of keys representing the nested path.
Returns
-------
None
The nested dictionary is changed in place.
"""
if not isinstance(nested_dict, dict):
raise TypeError("Expected 'dictionary' to be of type dict.")
if not isinstance(keys, (list, tuple)) or not keys:
raise ValueError("Expected 'keys' to be a non-empty list or tuple.")
storage_location = ensure_nested_dict(nested_dict, keys[:-1])
storage_location[keys[-1]] = new_data
[docs]def get_nested_value(nested_dict, keys):
"""
Retrieve a value from a nested dictionary, using a sequence of keys.
Parameters
----------
nested_dict : dict
The nested dictionary to traverse.
keys : Sequence
A sequence of keys (e.g., list or tuple) specifying the path
to the data.
Returns
-------
Any
The data at the specified location in the nested dictionary.
"""
current = nested_dict
for key in keys:
try:
current = current[key]
except KeyError:
print(f" * The key '{key}' does not exist.")
# Gracefully return None if any key is missing in the nested path
return None
return current
[docs]def linear_model(x, m, b):
# TODO: Move to numpy polyfit/polyval?
"""
Linear model function: y = mx + b.
"""
return m * x + b
[docs]def calculate_residuals(data_y, fit_y):
"""
Compute the residuals between observed data and fitted values.
Residuals are calculated as the difference between actual values (data_y)
and predicted values (fit_y). Useful for evaluating the quality of
regression or curve fitting results.
Parameters
----------
data_y : array-like
The observed data values.
fit_y : array-like
The predicted or fitted values.
Returns
-------
np.ndarray
The residuals, calculated as data_y - fit_y.
"""
# Element-wise subtraction of predicted values from actual data
residuals = data_y - fit_y
return residuals
[docs]def calculate_R_squared(residuals, data_y):
"""
Calculate the coefficient of determination (R-squared) for a set of data.
R-squared is defined as:
.. math::
R^2 = 1 - (SS_{res} / SS_{tot})
where SS_res is the sum of squares of residuals (the differences between the
observed and predicted values) and SS_tot is the total sum of squares
(the differences between the observed values and their mean).
This metric indicates the proportion of the variance in the dependent
variable that is explained by the model.
Parameters
----------
residuals : array-like
The residuals (errors) from the fitted model,
typically computed as (observed - predicted).
data_y : array-like
The observed data values.
Returns
-------
float
The R-squared value. Higher values indicate a better fit.
Note: R² can be negative if the model performs worse
than a constant mean.
"""
# Calculate the sum of squares of residuals.
ss_res = np.sum(residuals**2)
# Calculate the total sum of squares relative to the mean of the observed data.
ss_tot = np.sum((data_y - np.mean(data_y))**2)
# Compute R-squared: 1 - (sum of squares of residuals divided by total sum of squares)
r_squared = 1 - (ss_res / ss_tot)
return r_squared
[docs]def calculate_RMSE(residuals):
"""
Compute the Root Mean Squared Error (RMSE) from residuals.
RMSE is the square root of the average of the squared residuals.
It provides an estimate of the standard deviation of the prediction errors
and is commonly used to quantify the accuracy of a model.
Parameters
----------
residuals : array-like
The residuals (differences between observed and predicted values).
Returns
-------
float
The RMSE value, representing the standard deviation of residual errors.
"""
# Compute RMSE by taking the square root of the mean squared residuals
rmse = np.sqrt(np.mean(residuals**2))
return rmse
[docs]def gaussian(x, mu, sigma, a=1.0):
"""
Compute the Gaussian (normal) distribution function.
The Gaussian function is defined as shown below.
.. math::
f(x) = \\frac{a}{\\sigma \\sqrt{2\\pi}} \\exp\\left( -\\frac{1}{2} \\left( \\frac{x - \\mu}{\\sigma} \\right)^2 \\right)
Parameters
----------
x : float or ndarray
The input value(s) where the Gaussian function is evaluated.
mu : float
The mean (center) of the Gaussian distribution.
sigma : float
The standard deviation (spread) of the Gaussian distribution.
Must be positive.
a : float
A scaling factor of the Gaussian distribution, default: 1.0.
Returns
-------
float or ndarray
The computed value(s) of the Gaussian function at `x`.
"""
exponent = -0.5 * ((x - mu) / sigma) ** 2
normalisation = a / (sigma * np.sqrt(2 * np.pi))
f_x = normalisation * np.exp(exponent)
return f_x
[docs]def dynamic_local_change_simplification(time, temperature, basis_tolerance=0.2):
"""
Simplify a time-temperature dataset by dynamically assessing local changes
against a "basis" change.
This function allows to remove data points from a series but preserves the
shape. This is helpful when defining a RAMP for FDS. For example, when a
simulation is to be conducted where the temperature development of a heater
over time is to be used as input, e.g. TGA, or cone calorimeter.
It works as follows:
The first point is retained. Then, the change between the first and second
data point is established (basis change). A range is defined around the
basis change, using the `basis_tolerance`. Next, it is determined if the
change between the first and third point is inside that range. If this is
true, the point is excluded and the change between first and fourth point
is assessed. This process is repeated until a point is outside the range.
This point is retained and the process starts again. The last point in the
series is always retained.
Parameters
----------
time : ndarray
Time values.
temperature : ndarray
Temperature values.
basis_tolerance : float
Tolerance range for deviations from the basis change (e.g., 0.2 = 20%).
Returns
-------
ndarray
Indices of the retained points in the original data.
"""
# Always keep the first point
retained_indices = [0]
# Establish the basis change (change between the first two points)
start_idx = 0
basis_change = abs(temperature[1] - temperature[0])
for i in range(1, len(temperature)):
# Calculate the cumulative change since the current start point
cumulative_change = abs(temperature[i] - temperature[start_idx])
# Calculate the allowed range around the basis change
lower_bound = basis_change * (1 - basis_tolerance)
upper_bound = basis_change * (1 + basis_tolerance)
# If the cumulative change exceeds the allowed range
if cumulative_change < lower_bound or cumulative_change > upper_bound:
# Keep the current point
retained_indices.append(i)
# Reset the start point and basis change
start_idx = i
if i + 1 < len(temperature): # Check to avoid index errors
basis_change = abs(temperature[i + 1] - temperature[i])
else:
basis_change = cumulative_change # Final segment uses the last change
# Always keep the last point
retained_indices.append(len(temperature) - 1)
return np.array(retained_indices)
[docs]def get_git_commit_hash(path_to_git_repo):
"""
Retrieve the short and long commit hashes of the current HEAD in a local Git repository.
This function runs ``git rev-parse`` in the specified repository directory and
returns both the short and full (long) forms of the current HEAD commit hash.
Note that uncommitted changes in the working tree are not reflected in these hashes.
Parameters
----------
path_to_git_repo : str or os.PathLike
Path to the local clone of a Git repository.
Returns
-------
rev_short : str
Short form of the current HEAD commit hash (e.g., ``a1b2c3d``).
rev_long : str
Full (long) form of the current HEAD commit hash (e.g., ``a1b2c3d4e5f6...``).
Raises
------
RuntimeError
If the path is not a Git repository, Git is not installed, or HEAD
cannot be resolved.
"""
try:
rev_short = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"],
cwd=path_to_git_repo
).strip().decode()
rev_long = subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=path_to_git_repo
).strip().decode()
except FileNotFoundError as e:
raise RuntimeError(
"Git does not seem to be installed or is not available on the PATH."
) from e
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Path '{path_to_git_repo}' is not a valid Git repository or HEAD is not defined."
) from e
return rev_short, rev_long
[docs]def get_window_points(phi, delta_phi):
"""
Compute the number of data points required to cover a given window width.
Useful for determining the window size for smoothing operations where the
desired window is specified in physical units (e.g. temperature in K or
time in s) rather than in number of points.
The result is always an odd number (required by most smoothing filters)
and at least 5.
Parameters
----------
phi : array-like
The independent variable series (e.g. temperature or time).
delta_phi : float
Desired window width in the same units as phi.
Returns
-------
int
Odd number of data points covering the requested window width.
"""
# Estimate the typical spacing between consecutive data points.
d_phi = np.median(np.diff(phi))
# Convert the desired window width to a number of points.
win_pts = int(round(delta_phi / d_phi))
win_pts = max(win_pts, 5)
# Smoothing filters (e.g. Savitzky-Golay) require an odd window size.
if win_pts % 2 == 0:
win_pts += 1
return win_pts
[docs]def get_peak_value(x_values, y_values):
"""
Find the peak (maximum) of a data series and return its coordinates and index.
Parameters
----------
x_values : array-like or pd.Series
The independent variable (e.g. time or temperature).
y_values : array-like or pd.Series
The dependent variable whose maximum is sought.
Returns
-------
x_peak : float
The x-value at the peak.
y_peak : float
The y-value at the peak.
peak_idx : int
The index of the peak in the input arrays.
"""
x_values = series_to_numpy(x_values)
y_values = series_to_numpy(y_values)
peak_idx = np.argmax(y_values)
x_peak = x_values[peak_idx]
y_peak = y_values[peak_idx]
return x_peak, y_peak, peak_idx
[docs]def cubic_max_exact(coeffs, x1, x2):
"""
Find the exact maximum of a cubic polynomial on the interval [x1, x2].
Evaluates the polynomial at its critical points (where the derivative is
zero) and at both endpoints, then returns the location and value of the
global maximum on that interval.
Parameters
----------
coeffs : array-like
Polynomial coefficients in descending order, as returned by
``numpy.polyfit`` with degree 3.
x1 : float
Left boundary of the search interval.
x2 : float
Right boundary of the search interval.
Returns
-------
x_max : float
The x-value at which the polynomial reaches its maximum.
y_max : float
The maximum value of the polynomial on [x1, x2].
"""
p = np.poly1d(coeffs)
dp = np.polyder(coeffs) # first derivative (quadratic)
# Critical points are where the first derivative is zero.
crit_x = np.roots(dp)
crit_x = crit_x[np.isreal(crit_x)].real
# Keep only critical points strictly inside the interval.
crit_x = crit_x[(crit_x > x1) & (crit_x < x2)]
# Evaluate at critical points and both endpoints, return the maximum.
xs = np.array([x1, x2, *crit_x])
ys = p(xs)
i = np.argmax(ys)
return xs[i], ys[i]
[docs]def interpolate_experiment_data(time, temp, signal_1, signal_2=None,
T_step=0.5, mode="temperature_grid",
nominal_heating_rate=None, non_monotonic="raise"):
"""
Interpolate experimental data to achieve a uniform temperature or time spacing.
Accepts up to two signal columns, for example for STA data
such as mass and heat flow.
Two processing modes are available:
1) ``"temperature_grid"``:
A temperature grid with spacing ``T_step`` is created and used
for interpolation. This assumes strictly increasing temperature.
2) ``"nominal_rate"``:
Time is treated as the independent variable and a time grid is
built from the nominal heating rate and the desired temperature
step size ``T_step``.
Parameters
----------
time : array-like or pd.Series
Time series from the experimental data.
temp : array-like or pd.Series
Temperature series from the experimental data.
signal_1 : array-like or pd.Series
First signal from the experimental data, e.g. mass (TGA).
signal_2 : array-like or pd.Series, optional
Second signal from the experimental data, e.g. heat flow (STA).
T_step : float
Desired temperature step size in Kelvin, e.g. 0.5 K.
mode : str
Interpolation mode: ``"nominal_rate"`` or ``"temperature_grid"``.
nominal_heating_rate : float, optional
Nominal heating rate in K/min. Required when ``mode="nominal_rate"``.
non_monotonic : str
How to handle non-monotonic temperature data in ``"temperature_grid"``
mode. Options are ``"raise"``, ``"warn"``, or ``"ignore"``.
Returns
-------
tuple
``(new_time, new_temp, new_signal_1)`` or
``(new_time, new_temp, new_signal_1, new_signal_2)`` if signal_2
is provided.
Raises
------
ValueError
If ``T_step`` is not positive, input arrays have inconsistent lengths,
``non_monotonic`` receives an unknown value, or the mode is unknown.
"""
time = series_to_numpy(time)
temp = series_to_numpy(temp)
signal_1 = series_to_numpy(signal_1)
if signal_2 is not None:
signal_2 = series_to_numpy(signal_2)
if T_step <= 0:
raise ValueError("T_step must be greater than zero.")
if not (len(time) == len(temp) == len(signal_1)):
raise ValueError("time, temp, and signal_1 must have the same length.")
if signal_2 is not None and len(signal_2) != len(time):
raise ValueError("signal_2 must have the same length as time and temp.")
if mode == "nominal_rate":
if not np.all(np.diff(time) > 0):
raise ValueError(
"Time data is not strictly monotonically increasing. "
"Time-based interpolation may be unreliable."
)
# Convert heating rate from K/min to K/s to match time in seconds.
beta = nominal_heating_rate / 60
t_step = T_step / beta
t_start = time[0]
t_end = time[-1]
new_time = np.arange(t_start, t_end, t_step)
new_temp = np.interp(new_time, time, temp)
new_signal_1 = np.interp(new_time, time, signal_1)
if signal_2 is not None:
new_signal_2 = np.interp(new_time, time, signal_2)
return new_time, new_temp, new_signal_1, new_signal_2
return new_time, new_temp, new_signal_1
elif mode == "temperature_grid":
if not np.all(np.diff(temp) > 0):
msg = (
"Temperature data is not strictly monotonically increasing. "
"Temperature-based interpolation may be unreliable."
)
if non_monotonic == "raise":
raise ValueError(msg)
elif non_monotonic == "warn":
warnings.warn(msg, stacklevel=2)
elif non_monotonic == "ignore":
pass
else:
raise ValueError(f"Unknown option for non_monotonic: {non_monotonic!r}")
T_start = temp[0]
T_end = temp[-1]
new_temp = np.arange(T_start, T_end, T_step)
new_time = np.interp(new_temp, temp, time)
new_signal_1 = np.interp(new_temp, temp, signal_1)
if signal_2 is not None:
new_signal_2 = np.interp(new_temp, temp, signal_2)
return new_time, new_temp, new_signal_1, new_signal_2
return new_time, new_temp, new_signal_1
else:
raise ValueError(f"Mode {mode!r} is unknown. Use 'nominal_rate' or 'temperature_grid'.")