Source code for ledsa.analysis.ExtinctionCoefficients

from abc import ABC, abstractmethod
from multiprocessing import Pool

import numpy as np
import pandas as pd

from ledsa.analysis.Experiment import Experiment, Layers, Camera
from ledsa.core.file_handling import read_hdf, read_hdf_avg, extend_hdf, create_analysis_infos_avg
from ledsa.data_extraction.data_integrity import check_intensity_normalization

from importlib.metadata import version


[docs] class ExtinctionCoefficients(ABC): """ Parent class for the calculation of the Extinction Coefficients. :ivar coefficients_per_image_and_layer: List of coefficients for each image and layer. :vartype coefficients_per_image_and_layer: list[np.ndarray] :ivar experiment: Object representing the experimental setup. :vartype experiment: Experiment :ivar reference_property: Reference property to be analysed. :vartype reference_property: str :ivar num_ref_imgs: Number of reference images. # TODO: create test for this :vartype num_ref_imgs: int :ivar ref_img_indices: Indices of reference images to use. If None, use num_ref_imgs. :vartype ref_img_indices: list[int] or None :ivar calculated_img_data: DataFrame containing calculated image data. :vartype calculated_img_data: pd.DataFrame :ivar distances_per_led_and_layer: Array of distances traversed between camera and LEDs in each layer. :vartype distances_per_led_and_layer: np.ndarray :ivar ref_intensities: Array of reference intensities for all LEDs. :vartype ref_intensities: np.ndarray :ivar cc_matrix: Color correction matrix. :vartype cc_matrix: np.ndarray or None :ivar average_images: Flag to determine if intensities are computed as an average from two consecutive images. :vartype average_images: bool :ivar solver: Indication whether the calculation is to be carried out numerically or analytically. :vartype type: str """ def __init__(self, experiment, reference_property='sum_col_val', num_ref_imgs=10, ref_img_indices=None, average_images=False): """ :param experiment: Object representing the experimental setup. :type experiment: Experiment :param reference_property: Reference property to be analysed. :type reference_property: str :param num_ref_imgs: Number of reference images. :type num_ref_imgs: int :param ref_img_indices: Indices of reference images to use. If None, use num_ref_imgs. :type ref_img_indices: list[int] or None :param average_images: Flag to determine if intensities are computed as an average from two consecutive images. :type average_images: bool """ self.coefficients_per_image_and_layer = [] self.experiment = experiment self.reference_property = reference_property self.num_ref_imgs = num_ref_imgs self.ref_img_indices = ref_img_indices self.calculated_img_data = pd.DataFrame() self.distances_per_led_and_layer = np.array([]) self.ref_intensities = np.array([]) self.cc_matrix = None self.average_images = average_images self.solver = None self.lambda_reg = None def __str__(self): out = str(self.experiment) + \ f'Reference_property: {self.reference_property}, Ref_img_indices: {self.ref_img_indices}, Solver: {self.solver}, Lambda_reg: {self.lambda_reg}, LEDSA {version("ledsa")}\n' return out
[docs] def calc_and_set_coefficients(self) -> None: """ Serial calculation of extinction coefficients for every image """ # Load and calculate all needed variables self.set_all_member_variables() for img_id, single_img_data in self.calculated_img_data.groupby(level=0): single_img_array = single_img_data[self.reference_property].to_numpy() rel_intensities = single_img_array / self.ref_intensities # Calculate the extinction coefficients depending on child class used sigmas = self.calc_coefficients_of_img(rel_intensities) self.coefficients_per_image_and_layer.append(sigmas)
[docs] def calc_and_set_coefficients_mp(self, cores=4) -> None: """ Uses multiprocessing to calculate and set extinction coefficients. :param cores: Number of cores to use. :type cores: int """ # Load and calculate all needed variables self.set_all_member_variables() img_property_array = multiindex_series_to_nparray(self.calculated_img_data[self.reference_property]) rel_intensities = img_property_array / self.ref_intensities # Calculate the extinction coefficients depending on child class used pool = Pool(processes=cores) sigmas = pool.map(self.calc_coefficients_of_img, rel_intensities) pool.close() self.coefficients_per_image_and_layer = sigmas
[docs] def set_all_member_variables(self, save_distances: bool = True) -> None: """ Calculate distance traveled per layer, for every led, load image data from binary file and calculate reference intensities for each LED :param save_distances: If True, write the distance matrix to a text file in the current working directory. Pass False when calling from a stacked context that manages its own distance file. :type save_distances: bool """ if len(self.distances_per_led_and_layer) == 0: self.distances_per_led_and_layer = self.calc_distance_array() if save_distances: file_name = f'led_array_{self.experiment.led_array}_distances_per_layer.txt' np.savetxt(file_name, self.distances_per_led_and_layer) if self.calculated_img_data.empty: self.load_img_data() if self.ref_intensities.shape[0] == 0: self.calc_and_set_ref_intensities()
[docs] def load_img_data(self) -> None: """ Load processed image data from binary file """ if self.average_images: img_data = read_hdf_avg(self.experiment.channel, path=self.experiment.path) create_analysis_infos_avg() else: img_data = read_hdf(self.experiment.channel, path=self.experiment.path) img_data_cropped = img_data[['led_array_id', self.reference_property]] self.calculated_img_data = img_data_cropped[img_data_cropped['led_array_id'] == self.experiment.led_array] if self.calculated_img_data.empty: exit(f"Apparently there are no intensity values for led array {self.experiment.led_array}!")
[docs] def save(self) -> None: """ Save the computed extinction coefficients to a file. """ path = self.experiment.path / 'analysis' / 'extinction_coefficients' / self.solver if not path.exists(): path.mkdir(parents=True) path = path / f'extinction_coefficients_{self.solver}_channel_{self.experiment.channel}_{self.reference_property}_led_array_{self.experiment.led_array}.csv' header = str(self) header += 'Layer_bottom_height[m],' header += ','.join(map(str, self.experiment.layers.borders[:-1])) + '\n' header += 'Layer_top_height[m],' header += ','.join(map(str, self.experiment.layers.borders[1:])) + '\n' header += 'Layer_center_height[m],' header += ','.join(map(lambda x: f"{x:.2f}", (np.array(self.experiment.layers.borders[:-1]) + np.array( self.experiment.layers.borders[1:])) / 2)) + '\n' header += 'Experiment_Time[s],' header += 'layer0' header += ''.join(f',layer{i + 1}' for i in range(self.experiment.layers.amount - 1)) experiment_times = _get_experiment_times_from_image_infos_file(self.average_images) coefficients_per_time_and_layer = np.column_stack( (experiment_times, self.coefficients_per_image_and_layer)) np.savetxt(path, coefficients_per_time_and_layer, delimiter=',', header=header)
[docs] def calc_distance_array(self) -> np.ndarray: """ Calculate the distances traversed between camera and LEDs in each layer. :return: Array of distances traversed between camera and LEDs in each layer. :rtype: np.ndarray """ distances = np.zeros((self.experiment.num_leds, self.experiment.layers.amount)) count = 0 # self.experiment.leds need to be reversed to build the distance array the right way for led in reversed(self.experiment.leds): d = self.experiment.calc_traversed_dist_per_layer(led) distances[count] = d count += 1 return distances
[docs] def calc_and_set_ref_intensities(self) -> None: """ Calculate and set the reference intensities for all LEDs based on the reference images. """ if self.ref_img_indices is not None: ref_img_data = self.calculated_img_data.query(f'img_id == {self.ref_img_indices}') else: ref_img_data = self.calculated_img_data.query(f'img_id <= {self.num_ref_imgs}') print( f"Images with indices {ref_img_data.index.get_level_values('img_id').unique().values} were used for calculating reference intensities.") ref_intensities = ref_img_data.groupby(level='led_id')[self.reference_property].mean() check_intensity_normalization(ref_img_data, ref_intensities, self.reference_property) self.ref_intensities = ref_intensities.to_numpy()
[docs] def apply_color_correction(self, cc_matrix, on='sum_col_val', nchannels=3) -> None: # TODO: remove hardcoding of nchannels """ Apply color correction on channel values based on color correction matrix. :param cc_matrix: Color correction matrix. :type cc_matrix: np.ndarray :param on: Reference property to apply the color correction on. :type on: str :param nchannels: Number of channels. :type nchannels: int """ self.cc_matrix = cc_matrix cc_matrix_inv = np.linalg.inv(self.cc_matrix) quanity = on fit_params_list = [] for channel in range(nchannels): fit_parameters = read_hdf(channel)[quanity] fit_params_list.append(fit_parameters) raw_val_array = pd.concat(fit_params_list, axis=1) cc_val_array = np.dot(cc_matrix_inv, raw_val_array.T).T cc_val_array = cc_val_array.astype(np.int16) for channel in range(nchannels): extend_hdf(channel, quanity + '_cc', cc_val_array[:, channel]) print(f"Color correction applied on {nchannels} Channels!")
[docs] @abstractmethod def calc_coefficients_of_img(self, rel_intensities: np.ndarray) -> np.ndarray: """ Calculate the extinction coefficients for a single image. Needs to be implemented by the child class. :param rel_intensities: Array of relative change in intensity of every LED. :return: Array of the computed extinction coefficients :rtype: np.ndarray """ pass
[docs] def multiindex_series_to_nparray(multi_series: pd.Series) -> np.ndarray: """ Convert a multi-index series to a NumPy array. :param multi_series: Series with multi-index to convert. :type multi_series: pd.Series :return: Converted array. :rtype: np.ndarray """ num_leds = pd.Series(multi_series.groupby(level=0).size()).iloc[0] num_imgs = pd.Series(multi_series.groupby(level=1).size()).iloc[0] array = np.zeros((num_imgs, num_leds)) for i in range(num_imgs): array[i] = multi_series.loc[i + 1] return array
def _get_experiment_times_from_image_infos_file(average_images): if average_images == True: image_infos_file = 'analysis/image_infos_analysis_avg.csv' else: image_infos_file = 'analysis/image_infos_analysis.csv' image_info_df = pd.read_csv(image_infos_file) return image_info_df['Experiment_Time[s]'].to_numpy()