Source code for ledsa.demo.demo_setup

import os
import requests
import shutil
import zipfile
from typing import Tuple
from tqdm import tqdm

from ledsa.analysis.ConfigDataAnalysis import ConfigDataAnalysis
from ledsa.core.ConfigData import ConfigData


[docs] def setup_demo(destination_path: str, image_data_url: str) -> None: """ Set up the demo environment. :param destination_path: Path where the demo should be set up. :type destination_path: str :param image_data_url: URL to the image data. :type image_data_url: str """ input_str = "Setting up the demo will download about 5 GB of data from the internet. Would you like to proceed? (yes/no): " exit_str = "Demo setup aborted by the user." proceed_demo_setup = _proceed_prompt(input_str, exit_str) if proceed_demo_setup: if os.path.isfile(os.path.join(destination_path, 'simulation', '.simulation_setup_successful')): input_str = "Looks like the simulation was already set up. Do you want to overwrite the existing data? (yes/no): " overwrite_demo = _proceed_prompt(input_str, exit_str) if overwrite_demo: # _cleanup_demo_directories(destination_path) pass else: print("No changes were made to the demo setup.") exit(0) image_dest_path, simulation_dest_path = _setup_directories(destination_path) _download_and_extract_images(image_data_url, image_dest_path, simulation_dest_path) _create_config_files(os.path.join(destination_path, 'simulation')) print("Demo setup successfully") else: exit(0)
def _create_config_files(path): owd = os.getcwd() os.chdir(path) config = ConfigData( load_config_file=False, img_directory="../image_data/", img_name_string="V001_Cam01_{}.CR2", num_img_overflow=9999, first_img_experiment_id=1, last_img_experiment_id=275, num_cores=4, date="27.11.2018", start_time="15:36:07", time_img_id=None, time_ref_img_time=None, time_diff_to_image_time=-1, ref_img_id=1, pixel_value_percentile=99.875, channel='all', max_num_leds=1000, search_area_radius=10, num_arrays=7, first_img_analysis_id=1, last_img_analysis_id=275, num_skip_imgs=0, num_skip_leds=0, merge_led_array_indices=None ) config.set('analyse_positions', ' ignore_led_indices', '781 675 746') config.set('analyse_positions', ' led_array_edge_indices', '\n 457 246\n 12 347\n 578 671\n 31 535\n 825 838\n 198 698\n 965 782\n ') config.set('analyse_positions', ' led_array_edge_coordinates','\n 6.86 3.13 1.14 5.98 2.83 3.32\n ' '5.96 2.83 0.99 5.98 2.83 3.35\n ' '5.96 2.83 1.17 5.05 2.62 3.32\n ' '5.02 2.62 1.0 5.05 2.62 3.34\n ' '4.09 2.38 1.19 5.05 2.62 3.32\n ' '4.09 2.38 1.0 4.12 2.38 3.35\n ' '3.17 2.25 1.17 4.12 2.38 3.32\n ') config.save() config_analysis = ConfigDataAnalysis( load_config_file=False, num_layers=20, num_ref_images=10, num_cores=4, reference_property='sum_col_val', average_images=False, solver='linear', weighting_preference=-6e-3, weighting_curvature=1e-6, num_iterations=200 ) config_analysis.set('experiment_geometry', ' camera_position', '7.29 6.46 2.3') config_analysis.set('DEFAULT', ' camera_channels', '0 1 2') config_analysis.set('model_parameters', ' domain_bounds', '0.99 3.35') config_analysis.set('model_parameters', ' led_array_indices', '0 1 2 3 4 5 6') config_analysis.save() os.chdir(owd) def _proceed_prompt(promt_str: str, exit_str: str) -> bool: """ Prompts the user with a given message and waits for a 'yes' or 'no' response. :param promt_str: The prompt string to display to the user. :type promt_str: str :param exit_str: The message to display when the user responds with 'no'. :type exit_str: str :return: Returns True if user responds with 'yes', and False if 'no'. :rtype: bool """ while True: proceed = input(promt_str).lower() if proceed == "yes": return True elif proceed == "no": print(exit_str) return False else: print("Invalid choice. Please enter 'yes' or 'no'.") continue def _cleanup_demo_directories(path: str) -> None: """ Removes the 'simulation' and 'image_data' directories from the given path. :param path: The base path where the directories are located. :type path: str """ shutil.rmtree(os.path.join(path, 'simulation')) shutil.rmtree(os.path.join(path, 'image_data')) def _setup_directories(destination_path: str) -> Tuple[str, str]: """ Set up required directories. :param destination_path: Path where the directories should be set up. :type destination_path: str :return: Paths to the created directories (image_data, simulation). :rtype: tuple(str) """ image_dest_path = os.path.join(destination_path, "image_data") simulation_dest_path = os.path.join(destination_path, "simulation") print("Setting up demo directories...") if not os.path.exists(image_dest_path): os.makedirs(image_dest_path) if not os.path.exists(simulation_dest_path): os.makedirs(simulation_dest_path) print("Done") return image_dest_path, simulation_dest_path def _download_and_extract_images(image_data_url: str, image_data_dest_path: str, simulation_path: str) -> None: """ Download and extract data from URLs to the target directories. :param image_data_url: URL to download the ZIP file from. :type image_data_url: str :param image_data_dest_path: Path to the destination image_data directory. :type image_data_dest_path: str :param simulation_path: Path to the simulation directory. :type simulation_path: str """ # Helper function to download a file from a given URL with progress bar def download_file_from_url(url, destination): response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) block_size = 8192 # 8 Kibibytes progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True) with open(destination, 'wb') as fd: for chunk in response.iter_content(chunk_size=block_size): progress_bar.update(len(chunk)) fd.write(chunk) progress_bar.close() # Download ZIP file zip_name = os.path.basename(image_data_url) local_zip_path = os.path.join(image_data_dest_path, zip_name) download_file_from_url(image_data_url, local_zip_path) # Extract contents of the ZIP's directory directly into image_data_path with progress bar with zipfile.ZipFile(local_zip_path, 'r') as zip_ref: members = zip_ref.namelist() for member in tqdm(members, desc="Extracting", unit="files"): if member != members[0]: # Assuming the first member is the root directory zip_ref.extract(member, image_data_dest_path) os.rename(os.path.join(image_data_dest_path, member), os.path.join(image_data_dest_path, os.path.basename(member))) # Cleanup the extracted ZIP file and any unwanted directory os.remove(local_zip_path) shutil.rmtree(os.path.join(image_data_dest_path, members[0]), ignore_errors=True) with open(os.path.join(simulation_path, '.simulation_setup_successful'), 'w'): pass print("All Image files have been downloaded successfully!") def _edit_config_files(simulation_path: str, num_cores=1) -> None: """ Edit the configuration files based on the provided parameters. :param simulation_path: Path to the simulation directory containing config files. :type simulation_path: str :param num_cores: Number of cores to be set in the config. :type num_cores: int, optional """ config_file = os.path.join(simulation_path, 'config.ini') config_analysis_file = os.path.join(simulation_path, 'config_analysis.ini') _replace_params_in_file(config_file, 'num_cores', num_cores) _replace_params_in_file(config_analysis_file, 'num_cores', num_cores) def _replace_params_in_file(file_path: str, target_word: str, target_value: str) -> None: """ For lines containing target_word, replace content after '=' with replacement_word. :param file_path: Name/path of the file. :type file_path: str :param target_word: Word to search for in each line. :type target_word: str :param target_value: Word that will replace content after '='. :type target_value: str """ # Read the file lines into memory with open(file_path, 'r') as file: lines = file.readlines() # Process the lines new_lines = [] for line in lines: if target_word in line and '=' in line: prefix = line.split('=')[0] + '=' new_line = prefix + ' ' + str(target_value) + '\n' new_lines.append(new_line) else: new_lines.append(line) # Write the modified lines back to the file with open(file_path, 'w') as file: file.writelines(new_lines)