import glob
import os
import numpy as np
import pandas as pd
from ledsa.core.image_reading import read_channel_data_from_img
from ledsa.core.ConfigData import ConfigData
from ledsa.analysis.ConfigDataAnalysis import ConfigDataAnalysis
from ledsa.analysis.ConfigDataStacked import ConfigDataStacked
# ──────────────────────────────────────────────────────────────────────────────
# Shared base
# ──────────────────────────────────────────────────────────────────────────────
class _BaseSimData:
"""
Shared base for :class:`SimData` and :class:`StackedSimData`.
Holds layer geometry, the combined extinction-coefficient DataFrame, and all
query methods. Subclasses are responsible for populating the attributes in
their ``__init__`` and for implementing :meth:`_get_extco_df_from_path`.
"""
def __init__(self):
self.all_extco_df = None
self.solver = None
self.camera_channels = [0]
self.num_ref_images = None
self.n_layers = None
self.lower_domain_bound = None
self.upper_domain_bound = None
self.layer_bottom_heights = None
self.layer_top_heights = None
self.layer_center_heights = None
# ------------------------------------------------------------------
# Layer-geometry helpers
# ------------------------------------------------------------------
def _set_layer_geometry(self, domain_bounds, n_layers):
"""Compute and store layer border and centre arrays."""
self.n_layers = n_layers
self.lower_domain_bound = domain_bounds[0]
self.upper_domain_bound = domain_bounds[1]
borders = [
(i / n_layers) * (domain_bounds[1] - domain_bounds[0]) + domain_bounds[0]
for i in range(n_layers + 1)
]
self.layer_bottom_heights = np.round(np.array(borders[:-1]), 2)
self.layer_top_heights = np.round(np.array(borders[1:]), 2)
self.layer_center_heights = np.round(
(self.layer_bottom_heights + self.layer_top_heights) / 2, 2
)
def layer_from_height(self, height):
"""Return the layer index that contains *height* [m]."""
return int(
(height - self.lower_domain_bound)
/ (self.upper_domain_bound - self.lower_domain_bound)
* self.n_layers
)
# ------------------------------------------------------------------
# Smoothing
# ------------------------------------------------------------------
def _apply_moving_average(self, df, window, smooth='ma'):
"""
Apply a centred rolling window to *df*.
If *window* is 1, the raw data is returned unchanged. Otherwise a
centred rolling window is applied using either mean (``'ma'``) or
median (``'median'``).
:param df: Input DataFrame (time as index).
:type df: pd.DataFrame
:param window: Window width. A value of 1 returns *df* unchanged.
:type window: int
:param smooth: Smoothing method: ``'ma'`` for moving average or
``'median'`` for rolling median. Defaults to ``'ma'``.
:type smooth: str
:return: Smoothed DataFrame of the same shape.
:rtype: pd.DataFrame
"""
if window == 1:
return df
roller = df.rolling(window=window, closed='right')
smoothed = roller.median() if smooth == 'median' else roller.mean()
return smoothed.shift(-int(window / 2) + 1)
# ------------------------------------------------------------------
# Time helpers
# ------------------------------------------------------------------
def get_closest_time(self, time):
"""
Return the index value in :attr:`all_extco_df` nearest to *time*.
:param time: Target time [s].
:type time: int or float
:return: Closest matching time index value.
"""
return self.all_extco_df.index[abs(self.all_extco_df.index - time).argmin()]
def set_timeshift(self, timedelta):
"""
Shift the time axis of :attr:`all_extco_df` by *timedelta* seconds.
:param timedelta: Time offset [s].
:type timedelta: float
"""
if self.all_extco_df is not None:
self.all_extco_df.index += timedelta
# ------------------------------------------------------------------
# Extinction-coefficient queries
# ------------------------------------------------------------------
def get_extco_at_time(self, channel, time, yaxis='layer', height_reference='center', window=1, smooth='ma'):
"""
Extinction coefficients for all LED arrays at a single time step.
:param channel: Colour channel index.
:type channel: int
:param time: Experiment time [s]. Use :meth:`get_closest_time` to snap
to the nearest available time.
:param yaxis: Index type of the returned DataFrame.
``'layer'`` returns integer layer indices; ``'height'`` returns
physical heights [m].
:type yaxis: str
:param height_reference: Which layer border to use as the height
reference: ``'center'``, ``'top'``, or ``'bottom'``.
:type height_reference: str
:param window: Rolling-window size for temporal smoothing. 1 = no
smoothing.
:type window: int
:param smooth: Smoothing method: ``'ma'`` (mean) or ``'median'``.
:type smooth: str
:return: DataFrame indexed by layer / height; one column per LED array.
:rtype: pd.DataFrame
"""
ch_df = self.all_extco_df.xs(channel, level=0, axis=1)
ma_df = self._apply_moving_average(ch_df, window, smooth)
ma_df = ma_df.loc[time, :].reset_index().pivot(columns='LED Array', index='Layer')
ma_df.columns = ma_df.columns.droplevel()
ma_df.index = range(ma_df.shape[0])
return self._set_extco_yaxis(ma_df, yaxis, height_reference)
def get_extco_at_led_array(self, channel, led_array, yaxis='layer', height_reference='center', window=1, smooth='ma'):
"""
Time series of extinction coefficients for one LED array.
*led_array* is an ``int`` for :class:`SimData` (LED array ID) and a
``str`` for :class:`StackedSimData` (virtual array label, e.g.
``'array_A'``).
:param channel: Colour channel index.
:type channel: int
:param led_array: LED array identifier (int for SimData, str for StackedSimData).
:param yaxis: Column type of the returned DataFrame:
``'layer'`` or ``'height'``.
:type yaxis: str
:param height_reference: ``'center'``, ``'top'``, or ``'bottom'``.
:type height_reference: str
:param window: Rolling-window size for temporal smoothing.
:type window: int
:param smooth: Smoothing method: ``'ma'`` or ``'median'``.
:type smooth: str
:return: DataFrame indexed by time; one column per layer / height.
:rtype: pd.DataFrame
"""
ma_df = self._apply_moving_average(
self.all_extco_df.xs(channel, level=0, axis=1).xs(led_array, level=0, axis=1),
window, smooth,
)
if yaxis == 'layer':
ma_df.columns.names = ['Layer']
elif yaxis == 'height':
ma_df.columns = self._height_axis(height_reference)
return ma_df
def get_extco_at_layer(self, channel, layer, window=1, smooth='ma'):
"""
Time series of extinction coefficients for a single layer.
:param channel: Colour channel index.
:type channel: int
:param layer: Layer index (0 = bottom-most layer).
:type layer: int
:param window: Rolling-window size for temporal smoothing.
:type window: int
:param smooth: Smoothing method: ``'ma'`` or ``'median'``.
:type smooth: str
:return: DataFrame indexed by time; one column per LED array.
:rtype: pd.DataFrame
"""
ch_df = self.all_extco_df.xs(channel, level=0, axis=1).xs(layer, level=1, axis=1)
return self._apply_moving_average(ch_df, window, smooth)
def get_extco_at_height(self, channel, height, window=1, smooth='ma'):
"""
Time series of extinction coefficients at a physical height.
Converts *height* to a layer index via :meth:`layer_from_height` and
delegates to :meth:`get_extco_at_layer`.
:param channel: Colour channel index.
:type channel: int
:param height: Target height [m].
:type height: float
:param window: Rolling-window size for temporal smoothing.
:type window: int
:param smooth: Smoothing method: ``'ma'`` or ``'median'``.
:type smooth: str
:return: DataFrame indexed by time; one column per LED array.
:rtype: pd.DataFrame
"""
return self.get_extco_at_layer(channel, self.layer_from_height(height), window, smooth)
# ------------------------------------------------------------------
# Internal helpers shared by query methods above
# ------------------------------------------------------------------
def _height_axis(self, height_reference):
if height_reference == 'center':
return self.layer_center_heights
elif height_reference == 'top':
return self.layer_top_heights
elif height_reference == 'bottom':
return self.layer_bottom_heights
raise ValueError("height_reference must be 'center', 'top', or 'bottom'")
def _set_extco_yaxis(self, df, yaxis, height_reference):
if yaxis == 'layer':
df.index.names = ['Layer']
elif yaxis == 'height':
df.index = self._height_axis(height_reference)
df.index.names = ['Height / m']
return df
# ──────────────────────────────────────────────────────────────────────────────
# Single-camera simulation
# ──────────────────────────────────────────────────────────────────────────────
[docs]
class SimData(_BaseSimData):
def __init__(self, path_simulation, read_all=True, remove_duplicates=False, load_config_params=True):
"""
Initializes SimData with simulation and image analysis settings.
:param path_simulation: Directory containing simulation data.
:type path_simulation: str
:param read_all: If True, reads all data on initialization, defaults to True.
:type read_all: bool, optional
:param remove_duplicates: If True, removes duplicate entries from analysis, defaults to False.
:type remove_duplicates: bool, optional
:param load_config_params: If True, loads parameters from config files, defaults to True.
:type load_config_params: bool, optional
"""
super().__init__()
self.led_params_timedelta = 0
self.ch0_ledparams_df = None
self.ch1_ledparams_df = None
self.ch2_ledparams_df = None
self.ch0_extcos = None
self.ch1_extcos = None
self.ch2_extcos = None
self.average_images = False
self.search_area_radius = None
self.path_images = None
if load_config_params:
os.chdir(path_simulation)
self.config = ConfigData()
self.config_analysis = ConfigDataAnalysis()
self.search_area_radius = self.config['find_search_areas']['search_area_radius']
self.path_images = self.config['find_search_areas']['img_directory']
self.solver = self.config_analysis['DEFAULT']['solver']
self.average_images = self.config_analysis.getboolean('DEFAULT', 'average_images')
self.camera_channels = self.config_analysis.get_list_of_values('DEFAULT', 'camera_channels', dtype=int)
self.num_ref_images = int(self.config_analysis['DEFAULT']['num_ref_images'])
self._set_layer_geometry(
self.config_analysis.get_list_of_values('model_parameters', 'domain_bounds', dtype=float),
int(self.config_analysis['model_parameters']['num_layers']),
)
self.path_simulation = path_simulation
self.led_info_path = os.path.join(self.path_simulation, 'analysis', 'led_search_areas_with_coordinates.csv')
image_infos_file = (
'analysis/image_infos_analysis_avg.csv'
if self.average_images
else 'analysis/image_infos_analysis.csv'
)
self.image_info_df = pd.read_csv(os.path.join(self.path_simulation, image_infos_file))
if read_all:
self.read_all()
if remove_duplicates:
self.remove_duplicate_heights()
# set_timeshift override: also updates LED-parameter time axis
[docs]
def set_timeshift(self, timedelta):
"""
Sets the time shift for the experiment's start time.
:param timedelta: Time shift in seconds.
:type timedelta: int
"""
super().set_timeshift(timedelta)
self.led_params_timedelta = timedelta
# ------------------------------------------------------------------
# Data loading
# ------------------------------------------------------------------
def _get_ledparams_df_from_path(self, channel):
"""
Reads experimental parameters from a binary HDF5 table based on color channel.
:param channel: The color channel index to read the parameters for.
:type channel: int
:return: DataFrame with LED parameters.
:rtype: pd.DataFrame
"""
if self.average_images:
file = os.path.join(self.path_simulation, 'analysis', f'channel{channel}', 'all_parameters_avg.h5')
else:
file = os.path.join(self.path_simulation, 'analysis', f'channel{channel}', 'all_parameters.h5')
table = pd.read_hdf(file, key='channel_values')
time = pd.Series(
self.image_info_df['Experiment_Time[s]'].astype(int).values,
index=self.image_info_df['#ID'],
name='Experiment_Time[s]',
)
table = table.merge(time, left_on='img_id', right_index=True)
table.set_index(['Experiment_Time[s]'], inplace=True)
self.led_heights = table['height']
return table
def _get_extco_df_from_path(self):
"""
Read all extinction coefficients from the simulation dir and put them in all_extco_df.
"""
extco_list = []
files_list = glob.glob(
os.path.join(
self.path_simulation, 'analysis', 'extinction_coefficients',
self.solver, 'extinction_coefficients*.csv',
)
)
for file in files_list:
file_df = pd.read_csv(file, skiprows=7)
channel = int(file.split('channel_')[1].split('_')[0])
led_array = int(file.split('array_')[1].split('.')[0])
if '# Experiment_Time[s]' not in file_df.columns:
time = self.image_info_df['Experiment_Time[s]'].astype(int)
file_df = file_df.merge(time, left_index=True, right_index=True)
else:
file_df.rename(columns={'# Experiment_Time[s]': 'Experiment_Time[s]'}, inplace=True)
file_df.set_index('Experiment_Time[s]', inplace=True)
n_layers = len(file_df.columns)
file_df.columns = pd.MultiIndex.from_product(
[[channel], [led_array], list(range(n_layers))],
names=["Channel", "LED Array", "Layer"],
)
extco_list.append(file_df)
self.all_extco_df = pd.concat(extco_list, axis=1)
self.all_extco_df.sort_index(ascending=True, axis=1, inplace=True)
self.all_extco_df = self.all_extco_df[~self.all_extco_df.index.duplicated(keep='first')]
[docs]
def read_led_params(self):
"""Read led parameters for all color channels from the simulation path."""
self.ch0_ledparams_df = self._get_ledparams_df_from_path(0)
self.ch1_ledparams_df = self._get_ledparams_df_from_path(1)
self.ch2_ledparams_df = self._get_ledparams_df_from_path(2)
[docs]
def read_all(self):
"""Read led parameters and extinction coefficients for all channels."""
self.read_led_params()
self._get_extco_df_from_path()
[docs]
def remove_duplicate_heights(self):
"""Remove duplicate height entries for each LED parameter DataFrame."""
self.ch0_ledparams_df = self.ch0_ledparams_df.groupby(['Experiment_Time[s]', 'height']).last()
self.ch1_ledparams_df = self.ch1_ledparams_df.groupby(['Experiment_Time[s]', 'height']).last()
self.ch2_ledparams_df = self.ch2_ledparams_df.groupby(['Experiment_Time[s]', 'height']).last()
# ------------------------------------------------------------------
# LED parameter queries
# ------------------------------------------------------------------
[docs]
def get_ledparams_at_led_array(self, channel, led_array_id, param='sum_col_val', yaxis='led_id', window=1, n_ref=None):
"""
Retrieves a DataFrame containing normalized LED parameters for a specific LED array.
:param channel: The channel index from which to extract LED parameters.
:type channel: int
:param led_array_id: The ID for which to extract LED parameters.
:type led_array_id: int
:param param: The specific LED parameter to extract and analyze. Defaults to 'sum_col_val'.
:type param: str, optional
:param yaxis: Column labeling: ``'led_id'`` or ``'height'``. Defaults to ``'led_id'``.
:type yaxis: str, optional
:param window: Smoothing window size. 1 = no smoothing.
:type window: int, optional
:param n_ref: Number of initial entries for normalisation. ``False`` disables
normalisation. Defaults to ``None`` (uses :attr:`num_ref_images`).
:type n_ref: int or bool, optional
:return: DataFrame; index = time, columns = LED IDs or heights.
:rtype: pd.DataFrame
"""
led_params = {0: self.ch0_ledparams_df, 1: self.ch1_ledparams_df, 2: self.ch2_ledparams_df}[channel]
index = 'height' if yaxis == 'height' else 'led_id'
led_params = led_params.reset_index().set_index(['Experiment_Time[s]', index])
led_params.index = led_params.index.set_levels(
led_params.index.levels[0] + self.led_params_timedelta, level=0
)
ii = led_params[led_params['led_array_id'] == led_array_id][[param]]
if n_ref is False:
rel_i = ii
else:
n_ref = self.num_ref_images if n_ref is None else n_ref
i0 = ii.groupby([index]).agg(lambda g: g.iloc[:n_ref].mean())
rel_i = ii / i0
rel_i = rel_i.reset_index().pivot(columns=index, index='Experiment_Time[s]')
rel_i.columns = rel_i.columns.droplevel()
return self._apply_moving_average(rel_i, window)
[docs]
def get_ledparams_at_led_id(self, channel, led_id, param='sum_col_val', window=1, n_ref=None):
"""
Retrieves a DataFrame containing normalized LED parameters for a specific LED ID.
:param channel: The channel index from which to extract LED parameters.
:type channel: int
:param led_id: The identifier of the LED for which to extract parameters.
:type led_id: int
:param param: The specific LED parameter to extract and analyze. Defaults to 'sum_col_val'.
:type param: str, optional
:param window: Smoothing window size. 1 = no smoothing.
:type window: int, optional
:param n_ref: Number of initial entries for normalisation. ``False`` disables
normalisation. Defaults to ``None`` (uses :attr:`num_ref_images`).
:type n_ref: int or bool, optional
:return: DataFrame indexed by time with the specified LED parameter.
:rtype: pd.DataFrame
"""
led_params = {0: self.ch0_ledparams_df, 1: self.ch1_ledparams_df, 2: self.ch2_ledparams_df}[channel]
led_params = led_params.reset_index().set_index(['Experiment_Time[s]'])
ii = led_params[led_params['led_id'] == led_id][[param]]
if n_ref is False:
rel_i = ii
else:
n_ref = self.num_ref_images if n_ref is None else n_ref
i0 = ii.iloc[:n_ref].mean()
rel_i = ii / i0
return self._apply_moving_average(rel_i, window)
# ------------------------------------------------------------------
# Image helpers
# ------------------------------------------------------------------
[docs]
def get_image_name_from_time(self, time):
"""
Retrieves the image name corresponding to a specific time.
:param time: The time index.
:type time: int
:return: The name of the image.
:rtype: str
"""
return self.image_info_df.loc[self.image_info_df['Experiment_Time[s]'] == time]['Name'].values[0]
[docs]
def get_pixel_cordinates_of_LED(self, led_id):
"""
Returns the pixel coordinates of a specified LED.
:param led_id: The identifier for the LED.
:type led_id: int
:return: A list containing the x and y pixel coordinates of the LED.
:rtype: list
"""
led_info_df = pd.read_csv(self.led_info_path)
return led_info_df.loc[led_info_df.index == led_id][
[' pixel position x', ' pixel position y']
].values[0]
[docs]
def get_pixel_values_of_led(self, led_id, channel, time, radius=None):
"""
Retrieves a cropped numpy array of pixel values around a specified LED.
:param led_id: The identifier for the LED of interest.
:type led_id: int
:param channel: The image channel from which to extract pixel values.
:type channel: int
:param time: The time at which the image was taken.
:type time: int
:param radius: Pixel radius around the LED. If not specified, uses
``search_area_radius`` from the config file.
:type radius: int, optional
:return: A numpy array of pixel values around the LED.
:rtype: numpy.ndarray
:raises ValueError: If ``path_images`` is not available.
"""
if not self.path_images:
raise ValueError("path_images is required for accessing image files")
radius = radius if radius is not None else self.search_area_radius
if radius:
pixel_positions = self.get_pixel_cordinates_of_LED(led_id)
imagename = self.get_image_name_from_time(time)
imagefile = os.path.join(self.path_images, imagename)
channel_array = read_channel_data_from_img(imagefile, channel)
x, y = pixel_positions[0], pixel_positions[1]
return np.flip(channel_array[x - radius:x + radius, y - radius:y + radius], axis=0)
return None
# ──────────────────────────────────────────────────────────────────────────────
# Multi-camera stacked analysis
# ──────────────────────────────────────────────────────────────────────────────
[docs]
class StackedSimData(_BaseSimData):
"""
Postprocessing interface for stacked multi-camera extinction coefficient results.
Reads ``config_stacked.ini`` from *config_path*, discovers the stacked CSV
output files produced by
:class:`~ledsa.analysis.StackedExtinctionCoefficients`, and exposes the same
:class:`_BaseSimData` query API for those results.
LED parameter data is not available through this class because stacked results
aggregate rays from multiple cameras. Use individual per-camera
:class:`SimData` objects for raw LED intensities.
:ivar config_stacked: Parsed stacked-analysis configuration.
:vartype config_stacked: ConfigDataStacked
"""
def __init__(self, config_path='.', read_all=True):
"""
:param config_path: Directory containing ``config_stacked.ini``.
Defaults to the current working directory.
:type config_path: str
:param read_all: If ``True``, immediately load all stacked extinction
coefficient CSV files. Defaults to ``True``.
:type read_all: bool
"""
super().__init__()
prev_dir = os.getcwd()
try:
os.chdir(config_path)
self.config_stacked = ConfigDataStacked()
finally:
os.chdir(prev_dir)
self.config_path = os.path.abspath(config_path)
self.output_path = os.path.join(self.config_path, str(self.config_stacked.output_path))
self.solver = self.config_stacked.solver
self.camera_channels = self.config_stacked.camera_channels
self.num_ref_images = self.config_stacked.num_ref_images
self._set_layer_geometry(self.config_stacked.domain_bounds, self.config_stacked.num_layers)
if read_all:
self._get_extco_df_from_path()
def _get_extco_df_from_path(self):
"""
Discover and load all stacked extinction coefficient CSV files.
Files are expected in::
<output_path>/analysis/extinction_coefficients/<solver>/
The virtual LED-array ID is stored as an integer in the
``all_extco_df`` MultiIndex (level ``"LED Array"``).
"""
extco_list = []
extco_dir = os.path.join(
self.output_path, 'analysis', 'extinction_coefficients', self.solver,
)
files = glob.glob(os.path.join(extco_dir, 'extinction_coefficients*.csv'))
if not files:
print(
f'No stacked extinction coefficient files found in:\n {extco_dir}\n'
'Run the stacked analysis first with: python -m ledsa -as'
)
return
for filepath in files:
fname = os.path.basename(filepath)
channel = int(fname.split('channel_')[1].split('_')[0])
led_array_id = int(fname.split('_led_array_')[1].removesuffix('.csv'))
file_df = pd.read_csv(filepath, skiprows=4)
file_df.rename(columns={'# Experiment_Time[s]': 'Experiment_Time[s]'}, inplace=True)
file_df['Experiment_Time[s]'] = file_df['Experiment_Time[s]'].astype(float)
file_df.set_index('Experiment_Time[s]', inplace=True)
n_layers = len(file_df.columns)
file_df.columns = pd.MultiIndex.from_product(
[[channel], [led_array_id], list(range(n_layers))],
names=["Channel", "LED Array", "Layer"],
)
extco_list.append(file_df)
if not extco_list:
return
self.all_extco_df = pd.concat(extco_list, axis=1)
self.all_extco_df.sort_index(ascending=True, axis=1, inplace=True)
self.all_extco_df = self.all_extco_df[~self.all_extco_df.index.duplicated(keep='first')]