Source code for ledsa.data_extraction.DataExtractor

#!/usr/bin/env python

import os

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
from tqdm import tqdm

import ledsa.core.file_handling
import ledsa.core.image_handling
import ledsa.core.image_reading
import ledsa.data_extraction.step_1_functions
import ledsa.data_extraction.step_2_functions
import ledsa.data_extraction.step_3_functions
from ledsa.core.ConfigData import ConfigData
from ledsa.data_extraction import init_functions as led


[docs] class DataExtractor: """ A class responsible for extracting data from experiment images. :ivar config: Configuration data object. :vartype config: ConfigData :ivar channels: Channels to be processed. :vartype channels: Tuple :ivar fit_leds: Whether to fit LEDs or not. :vartype fit_leds: bool :ivar fit_leds: Whether to fit LEDs or not. :vartype threshold: float, optional :ivar threshold: The threshold value used for LED detection. :vartype search_areas: numpy.ndarray, optional :ivar line_indices: 2D list with dimension (# of LED arrays) x (# of LEDs per array) or None. :vartype line_indices: list[list[int]], optional """ def __init__(self, channels=(0), load_config_file=True, build_experiment_infos=True, fit_leds=True): """ :param channels: Channels to be processed. Defaults to (0). :type channels: tuple, optional :param load_config_file: Whether to load existing configuration file. Defaults to True. :type load_config_file: bool, optional :param build_experiment_infos: Whether to create 'image_infos.csv'. Defaults to True. :type build_experiment_infos: bool, optional :param fit_leds: Whether to fit LEDs or not. Defaults to True. :type fit_leds: bool, optional """ self.config = ConfigData(load_config_file=load_config_file) self.channels = list(channels) self.fit_leds = fit_leds self.threshold = None # 2D numpy array with dimension (# of LEDs) x (LED_id, x, y) self.search_areas = None # 2D list with dimension (# of LED arrays) x (# of LEDs per array) self.line_indices = None led.create_needed_directories(self.channels) led.request_config_parameters(self.config) led.generate_image_infos_csv(self.config, build_experiment_infos=build_experiment_infos) # """ # ------------------------------------ # Step 1 - find LED search areas # ------------------------------------ # """
[docs] def load_search_areas(self) -> None: """ Load LED search areas from the 'led_search_areas.csv' file. #TODO be consistent with search areas and ROIS """ file_path = os.path.join('analysis', 'led_search_areas.csv') self.search_areas = ledsa.core.file_handling.read_table(file_path, delim=',', dtype='int')
[docs] def write_search_areas(self, reorder_leds=False) -> None: """ Writes LED search areas to a CSV file. :param reorder_leds: A flag indicating whether the LED IDs have been reordered. Affects the header of the output file. :type reorder_leds: bool """ out_file_path = os.path.join('analysis', 'led_search_areas.csv') header = 'LED id (reordered), pixel position x, pixel position y' if reorder_leds else ('LED id, pixel ' 'position x, ' 'pixel position y') np.savetxt(out_file_path, self.search_areas, delimiter=',', header=header, fmt='%d')
[docs] def find_search_areas(self) -> None: """ Identify all LEDs in the reference image and define the areas where LEDs will be searched in the experiment images. """ config = self.config['find_search_areas'] in_file_path = os.path.join(config['img_directory'], config['img_name_string'].format(int(config['ref_img_id']))) channel = config['channel'] search_area_radius = int(config['search_area_radius']) max_num_leds = int(config['max_num_leds']) pixel_value_percentile = float(config['pixel_value_percentile']) if channel == 'all': # TODO this currently only works for RAW files but should work for JPG files as well data = ledsa.core.image_reading.read_img_array_from_img(in_file_path, channel=0) # TODO: Channel to be removed here! else: channel = int(channel) data = ledsa.core.image_reading.read_channel_data_from_img(in_file_path, channel=channel) self.search_areas, self.threshold = ledsa.data_extraction.step_1_functions.find_search_areas(data, search_area_radius=search_area_radius, max_n_leds=max_num_leds, pixel_value_percentile=pixel_value_percentile) self.write_search_areas() self.plot_search_areas() ledsa.core.file_handling.remove_flag('reorder_leds')
[docs] def plot_search_areas(self, reorder_leds=False) -> None: """ Plot the identified LED search areas with their ID labels. :param reorder_leds: A flag indicating whether the LED IDs have been reordered. Affects the name of the output file. :type reorder_leds: bool """ try: os.remove(os.path.join('plots', 'led_search_areas.plot_reordered.pdf')) except OSError: pass config = self.config['find_search_areas'] if self.search_areas is None: self.load_search_areas() in_file_path = os.path.join(config['img_directory'], config['img_name_string'].format(int(config['ref_img_id']))) # TODO this currently only works for RAW files but should work for JPG files as well data = ledsa.core.image_reading.read_img_array_from_img(in_file_path, channel=0) search_area_radius = int(config['search_area_radius']) plt.figure(dpi=1200) ax = plt.gca() ledsa.data_extraction.step_1_functions.add_search_areas_to_plot(self.search_areas, search_area_radius, ax) plt.imshow(data, norm=LogNorm(vmin=self.threshold, vmax=data.max()), cmap='Grays') plt.xlim(self.search_areas[:, 2].min() - 5 * search_area_radius, self.search_areas[:, 2].max() + 5 * search_area_radius) plt.ylim(self.search_areas[:, 1].max() + 5 * search_area_radius, self.search_areas[:, 1].min() - 5 * search_area_radius) plt.colorbar() plot_filename = 'led_search_areas.plot_reordered.pdf' if reorder_leds else 'led_search_areas.plot.pdf' out_file_path = os.path.join('plots', plot_filename) plt.savefig(out_file_path) plt.close()
# """ # ------------------------------------ # Step 2 - match LEDs to arrays # ------------------------------------ # """
[docs] def match_leds_to_led_arrays(self) -> None: """ Analyze which LEDs belong to which LED array and save this mapping. """ if ledsa.core.file_handling.check_flag('reorder_leds'): exit("LED IDs have been reordered. Please run step S1 again before trying to match LEDs to LED lines!") else: if self.search_areas is None: self.load_search_areas() self.line_indices = ledsa.data_extraction.step_2_functions.match_leds_to_led_arrays(self.search_areas, self.config) self.search_areas = ledsa.data_extraction.step_2_functions.reorder_search_areas(self.search_areas, self.line_indices) self.write_search_areas(reorder_leds=True) self.line_indices = ledsa.data_extraction.step_2_functions.reorder_led_indices(self.line_indices) self.plot_search_areas(reorder_leds=True) print("LED IDs reordered successfully!") ledsa.core.file_handling.set_flag('reorder_leds') ledsa.data_extraction.step_2_functions.generate_led_array_indices_files(self.line_indices) self.plot_led_arrays() if self.config['analyse_positions']['merge_led_array_indices'] != 'None': self.line_indices = ledsa.data_extraction.step_2_functions.merge_indices_of_led_arrays(self.line_indices, self.config) self.plot_led_arrays(merge_led_arrays=True)
[docs] def load_led_array_indices(self) -> None: """ Load LED array indices from the 'led_array_indices_{...}.csv' files. """ if self.config['analyse_positions']['merge_led_array_indices'] != 'None': num_arrays = len(self.config.get2dnparray('analyse_positions', 'merge_led_array_indices', 'var')) file_extension = '_merge' print("ARRAY MERGE IS ACTIVE!") else: num_arrays = int(self.config['analyse_positions']['num_arrays']) file_extension = '' self.line_indices = [] for i in range(num_arrays): file_path = os.path.join('analysis', f'led_array_indices_{i:03}{file_extension}.csv') self.line_indices.append(ledsa.core.file_handling.read_table(file_path, dtype='int'))
[docs] def plot_led_arrays(self, merge_led_arrays=False) -> None: """ Plots the arrangement of LEDs as identified in the LED arrays and saves the plot as a PDF file. :param merge_led_arrays: A flag indicating whether LED arrays have been merged. Affects the naming of the output file. :type merge_led_arrays: bool """ for i in range(len(self.line_indices)): plt.scatter(self.search_areas[self.line_indices[i], 2], -self.search_areas[self.line_indices[i], 1], s=0.1, label='LED Array {}'.format(i)) plt.legend() plt.xticks([]) plt.yticks([]) plot_filename = 'led_arrays_merged.pdf' if merge_led_arrays else 'led_arrays.pdf' out_file_path = os.path.join('plots', plot_filename) plt.savefig(out_file_path) plt.close()
# """ # ------------------------------------ # Step 3 - LED smoke analysis # ------------------------------------ # """
[docs] def process_image_data(self) -> None: """ Process all the image data to detect changes in light intensity in the search areas across the images. Removes 'images_to_process.csv' file afterward. """ config = self.config['analyse_photo'] if self.search_areas is None: self.load_search_areas() if self.line_indices is None: self.load_led_array_indices() img_filenames = ledsa.core.file_handling.read_table('images_to_process.csv', dtype=str) num_cores = int(config['num_cores']) if num_cores > 1: from multiprocessing import Pool print('images are getting processed, this may take a while') with Pool(num_cores) as p: for _ in tqdm(p.imap(self.process_img_file, img_filenames), total=len(img_filenames), desc="Processing images", unit="image"): pass else: for img_filename in tqdm(img_filenames, desc="Processing images", unit="image"): self.process_img_file(img_filename) os.remove('images_to_process.csv')
[docs] def process_img_file(self, img_filename: str) -> None: """ Process a single image file to extract relevant data. This is a workaround for pool.map. :param img_filename: The name of the image file to be processed. :type img_filename: str """ img_id = ledsa.core.image_handling.get_img_id(img_filename) for channel in self.channels: img_data = ledsa.data_extraction.step_3_functions.generate_analysis_data(img_filename, channel, self.search_areas, self.line_indices, self.config, self.fit_leds) ledsa.data_extraction.step_3_functions.create_fit_result_file(img_data, img_id, channel)
[docs] def setup_step3(self) -> None: """ Setup the third step of the data extraction process by creating 'image_infos_analysis.csv' and 'images_to_process.csv' files. """ led.generate_image_infos_csv(self.config, build_analysis_infos=True) ledsa.data_extraction.step_3_functions.create_imgs_to_process_file()
[docs] def setup_restart(self) -> None: """ Setup a restart in case the data extraction process was interrupted earlier. """ # if len(self.channels) > 1: #TODO: deactivated for testing # print('Restart of a run currently only supports one channel. \nExiting...') # exit(1) ledsa.data_extraction.step_3_functions.find_and_save_not_analysed_imgs(self.channels[0])