Source code for fdsreader.geom.geometry

import threading
from typing import Tuple, Dict, Iterable

import numpy as np

from fdsreader.fds_classes import Surface
from fdsreader.utils import Quantity
import fdsreader.utils.fortran_data as fdtype


[docs] class GeomBoundary: """Boundary data of a specific quantity for all geoms in the simulation. :ivar quantity: Quantity object containing information about the quantity calculated for geoms. :ivar times: Numpy array containing all times for which data has been recorded. :ivar n_t: Total number of time steps for which output data has been written. :ivar lower_bounds: Dictionary with lower bounds for each timestep per mesh. :ivar upper_bounds: Dictionary with upper bounds for each timestep per mesh. """ def __init__(self, quantity: Quantity, times: np.ndarray, n_t: int): self.quantity = quantity self.times = times self.n_t = n_t self.lower_bounds: Dict[int, np.ndarray] = dict() self.upper_bounds: Dict[int, np.ndarray] = dict() self.file_paths_be: Dict[int, str] = dict() self.file_paths_gbf: Dict[int, str] = dict() def _add_data(self, mesh: int, file_path_be: str, file_path_gbf: str, lower_bounds: np.ndarray, upper_bounds: np.ndarray): self.file_paths_be[mesh] = file_path_be self.file_paths_gbf[mesh] = file_path_gbf self.lower_bounds[mesh] = lower_bounds self.upper_bounds[mesh] = upper_bounds def _load_data(self): self._vertices: Dict[int, np.ndarray] = dict() self._faces: Dict[int, np.ndarray] = dict() self._data: Dict[int, np.ndarray] = dict() for mesh in self.file_paths_be.keys(): file_path_be = self.file_paths_be[mesh] file_path_gbf = self.file_paths_gbf[mesh] # Load .gbf with open(file_path_gbf, 'rb') as infile: offset = fdtype.INT.itemsize * 2 + fdtype.new( (('i', 3),)).itemsize + fdtype.FLOAT.itemsize infile.seek(offset) dtype_meta = fdtype.new((('i', 3),)) n_vertices, n_faces, _ = np.fromfile(infile, dtype_meta, 1)[0][1] dtype_vertices = fdtype.new((('f', 3 * n_vertices),)) vertices = np.fromfile(infile, dtype_vertices, 1)[0][1].reshape( (n_vertices, 3)).astype(float) dtype_faces = fdtype.new((('i', 3 * n_faces),)) faces = fdtype.read(infile, dtype_faces, 1)[0][0].reshape((n_faces, 3)).astype( int) - 1 # Load .be dtype_faces = fdtype.new((('f', n_faces),)) dtype_meta = fdtype.new((('i', 4),)) data = np.empty((self.n_t, n_faces), dtype=np.float32) with open(file_path_be, 'rb') as infile: offset = fdtype.INT.itemsize * 2 infile.seek(offset) for t in range(self.n_t): # Skip time value infile.seek(fdtype.FLOAT.itemsize, 1) assert fdtype.read(infile, dtype_meta, 1)[0][0][3] == n_faces, "Something went wrong, please" \ " submit an issue on Github" \ " attaching your fds case!" if n_faces > 0: data[t, :] = np.fromfile(infile, dtype_faces, 1)[0][1] self._vertices[mesh] = vertices self._faces[mesh] = faces self._data[mesh] = data # def _load_data2(self): # self._vertices: Dict[int, np.ndarray] = dict() # self._faces: Dict[int, np.ndarray] = dict() # self._data: Dict[int, np.ndarray] = dict() # # for mesh in self.file_paths_be.keys(): # file_path_be = self.file_paths_be[mesh] # file_path_gbf = self.file_paths_gbf[mesh] # # Load .gbf # with open(file_path_gbf, 'rb') as infile: # offset = fdtype.INT.itemsize * 2 + fdtype.new( # (('i', 3),)).itemsize + fdtype.FLOAT.itemsize # infile.seek(offset) # # dtype_meta = fdtype.new((('i', 3),)) # n_vertices, n_faces, _ = np.fromfile(infile, dtype_meta, 1)[0][1] # # dtype_vertices = fdtype.new((('f', 3 * n_vertices),)) # vertices = np.fromfile(infile, dtype_vertices, 1)[0][1].reshape( # (n_vertices, 3)).astype(float) # # dtype_faces = fdtype.new((('i', 3 * n_faces),)) # faces = fdtype.read(infile, dtype_faces, 1)[0][0].reshape((n_faces, 3)).astype( # int) - 1 # # # Load .be # class BEReader(threading.Thread): # def __init__(self, path, thread_offset, n_t, data, t_start, n_faces): # threading.Thread.__init__(self) # self.path = path # self.n_faces = n_faces # self.offset = thread_offset # self.n_t = n_t # self.data = data # self.t_start = t_start # # def run(self): # dtype_faces = fdtype.new((('f', self.n_faces),)) # dtype_meta = fdtype.new((('i', 4),)) # with open(self.path, 'rb') as infile: # for t in range(self.n_t): # # Skip time and n_faces values # infile.seek(fdtype.FLOAT.itemsize + dtype_meta.itemsize, 1) # # if self.n_faces > 0: # self.data[self.t_start + t, :] = np.fromfile(infile, dtype_faces, 1)[0][1] # # dtype_faces = fdtype.new((('f', n_faces),)) # dtype_meta = fdtype.new((('i', 4),)) # data = np.empty((self.n_t, n_faces), dtype=np.float32) # # offset = fdtype.INT.itemsize * 2 # stride = dtype_faces.itemsize + dtype_meta.itemsize + fdtype.FLOAT.itemsize # # times = list(range(0, self.n_t, self.n_t // 8)) # times.append(self.n_t) # threads = list() # for i in range(len(times)-1): # threads.append(BEReader(file_path_be, offset+stride*times[i], times[i+1] - times[i], data, times[i], n_faces)) # # for thread in threads: # thread.start() # # for thread in threads: # thread.join() # # self._vertices[mesh] = vertices # self._faces[mesh] = faces # self._data[mesh] = data @property def vertices(self) -> Iterable: """Returns a global array of the vertices from all meshes. """ if not hasattr(self, "_vertices"): self._load_data() size = sum(v.shape[0] for v in self._vertices.values()) ret = np.empty((size, 3), dtype=float) counter = 0 for v in self._vertices.values(): size = v.shape[0] ret[counter:counter + size, :] = v counter += size return ret @property def faces(self) -> np.ndarray: """Returns a global array of the faces from all meshes. """ if not hasattr(self, "_faces"): self._load_data() size = sum(f.shape[0] for f in self._faces.values()) ret = np.empty((size, 3), dtype=int) counter = 0 verts_counter = 0 for m, f in self._faces.items(): size = f.shape[0] ret[counter:counter + size, :] = f + verts_counter counter += size verts_counter += self._vertices[m].shape[0] return ret @property def data(self) -> np.ndarray: """Returns a global array of the loaded data for the quantity with data from all meshes. """ if not hasattr(self, "_data"): self._load_data() size = sum(d[0].shape[0] for d in self._data.values()) ret = np.empty((self.n_t, size), dtype=np.float32) for t in range(self.n_t): counter = 0 for d in self._data.values(): size = d[t].shape[0] ret[t, counter:counter + size] = d[t] counter += size return ret # @property # def data2(self) -> np.ndarray: # """Returns a global array of the loaded data for the quantity with data from all meshes. # """ # # if not hasattr(self, "_data"): # # self._load_data2() # self._load_data2() # # size = sum(d[0].shape[0] for d in self._data.values()) # ret = np.empty((self.n_t, size), dtype=np.float32) # for t in range(self.n_t): # counter = 0 # for d in self._data.values(): # size = d[t].shape[0] # ret[t, counter:counter + size] = d[t] # counter += size # # return ret @property def vmin(self) -> float: """Minimum value of all faces at any time. """ curr_min = min(np.min(b) for b in self.lower_bounds.values()) if curr_min == 0.0: return min(min(np.min(p.data) for p in ps) for ps in self._faces.values()) return curr_min @property def vmax(self) -> float: """Maximum value of all faces at any time. """ curr_max = max(np.max(b) for b in self.upper_bounds.values()) if curr_max == np.float32(-1e33): return max(max(np.max(p.data) for p in ps) for ps in self._faces.values()) return curr_max def __repr__(self, *args, **kwargs): return f"GeomBoundary(quantity={self.quantity})"
[docs] class Geometry: """Obstruction defined as a complex geometry. :ivar file_path: Path to the .ge file that defines the geom. :ivar texture_map: Path to the texture map of the geometry. :ivar texture_origin: Origin position of the texture provided by the surface. :ivar is_terrain: Indicates whether the geometry is a regular complex geom or terrain geometry. :ivar rgb: Color of the geometry in form of a 3-element tuple. :ivar surface: Surface object used for the geometry. """ def __init__(self, file_path: str, texture_mapping: str, texture_origin: Tuple[float, float, float], is_terrain: bool, rgb: Tuple[int, int, int], surface: Surface = None): self.file_path = file_path self.texture_map = texture_mapping self.texture_origin = texture_origin self.is_terrain = is_terrain self.rgb = rgb self.surface = surface def __repr__(self, *args, **kwargs): return f"Geometry(file_path={self.file_path})"