Source code for fdsreader.bndf.obstruction

import math
import os
from typing import List, Dict, Tuple, Union, Sequence
from typing_extensions import Literal
import numpy as np

from fdsreader.utils import Extent, Quantity, Dimension
import fdsreader.utils.fortran_data as fdtype
from fdsreader import settings

# Unfortunately, this is necessary due to a cyclic reference. "Mesh" is only needed for static type hints anyway
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from fdsreader.fds_classes import Mesh


[docs] class Patch: """Container for the actual data which is stored as rectangular plane with specific orientation and extent. :ivar dimension: :class:`Dimension` object containing information about steps in each dimension. :ivar extent: :class:`Extent` object containing 3-dimensional extent information. :ivar orientation: The direction the patch is facing (x={-1;1}, y={-2;2}, z={-3;3}). :ivar cell_centered: Indicates whether centered positioning for data is used. :ivar _n_t: Total number of time steps for which output data has been written. """ def __init__(self, file_path: str, dimension: Dimension, extent: Extent, orientation: int, cell_centered: bool, patch_offset: int, initial_offset: int, n_t: int, mesh: 'Mesh'): self.file_path = file_path self.dimension = dimension self.extent = extent self.orientation = orientation self.cell_centered = cell_centered self._patch_offset = patch_offset self._initial_offset = initial_offset self._time_offset = -1 self._n_t = n_t self.mesh = mesh self._boundary_parent: 'Boundary' = None
[docs] def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ if count_duplicates: return self._n_t return np.unique(self._boundary_parent.times).size
@property def shape(self) -> Tuple: """Convenience function to calculate the shape of the array containing data for this patch. """ return self.dimension.shape(self.cell_centered) @property def size(self) -> int: """Convenience function to calculate the number of data points in the array for this patch. """ return self.dimension.size(self.cell_centered) def _post_init(self, time_offset: int): """Fully initialize the patch as soon as the number of timesteps is known. """ self._time_offset = time_offset
[docs] def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal['x', 'y', 'z'], np.ndarray]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ # orientation = ('x', 'y', 'z')[self.orientation - 1] if self.orientation != 0 else '' # coords = {'x': set(), 'y': set(), 'z': set()} coords: Dict[Literal['x', 'y', 'z'], np.ndarray] = {} for dim in ('x', 'y', 'z'): co = self.mesh.coordinates[dim].copy() # In case the slice is cell-centered, we will shift the coordinates by half a cell # and remove the last coordinate if self.cell_centered and not ignore_cell_centered: co = co[:-1] co += abs(co[1] - co[0]) / 2 coords[dim] = co[np.where(np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]))] if coords[dim].size == 0: coords[dim] = np.array([co[np.argmin(np.abs(co - self.extent[dim][0]))]]) return coords
@property def data(self): """Method to load the quantity data for a single patch. """ if not hasattr(self, "_data"): dtype_data = fdtype.new((('f', self.dimension.size(cell_centered=False)),)) if self._n_t == -1: self._n_t = (os.stat(self.file_path).st_size - self._initial_offset) // self._time_offset self._data = np.empty((self.n_t(count_duplicates=True),) + self.shape) with open(self.file_path, 'rb') as infile: for t in range(self.n_t(count_duplicates=True)): infile.seek(self._initial_offset + self._patch_offset + t * self._time_offset) data = np.fromfile(infile, dtype_data, 1)[0][1].reshape( self.dimension.shape(cell_centered=False), order='F') if self.cell_centered: self._data[t, :] = data[:-1, :-1] else: self._data[t, :] = data unique_times_indices = np.unique(self._boundary_parent.times, return_index=True)[1] self._data = self._data[unique_times_indices] return self._data
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ if hasattr(self, "_data"): del self._data
def __repr__(self, *args, **kwargs): return f"Patch(shape={self.shape}, orientation={self.orientation}, extent={self.extent})"
[docs] class Boundary: """Container for boundary data specific to one quantity. :ivar quantity: Quantity object containing information about the quantity calculated for this :class:`Obstruction` with the corresponding short_name and unit. :ivar times: Numpy array containing all times for which data has been recorded. :ivar cell_centered: Indicates whether centered positioning for data is used. :ivar lower_bounds: Dictionary with lower bounds for each timestep with meshes as keys. :ivar upper_bounds: Dictionary with upper bounds for each timestep with meshes as keys. :ivar n_t: Total number of time steps for which output data has been written. """ def __init__(self, quantity: Quantity, cell_centered: bool, times: Sequence[float], patches: List[Patch], lower_bounds: np.ndarray, upper_bounds: np.ndarray): self.quantity = quantity self.cell_centered = cell_centered self._patches = patches self.times = times self.lower_bounds = lower_bounds self.upper_bounds = upper_bounds
[docs] def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ return self._patches[0].n_t(count_duplicates=count_duplicates)
@property def orientations(self): """Return all orientations for which there is data available. """ return [p.orientation for p in self._patches]
[docs] def get_nearest_timestep(self, time: float) -> int: """Calculates the nearest timestep for which data has been output for this obstruction. """ idx = np.searchsorted(self.times, time, side="left") if time > 0 and (idx == len(self.times) or np.math.fabs( time - self.times[idx - 1]) < np.math.fabs(time - self.times[idx])): return idx - 1 else: return idx
@property def data(self) -> Dict[int, Patch]: """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y). """ return {p.orientation: p for p in self._patches}
[docs] def vmin(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time. :param orientation: Optionally filter by patches with a specific orientation. """ if orientation == 0: curr_min = np.min(self.lower_bounds) if curr_min == 0.0: return float(min(np.min(p.data) for p in self._patches)) return float(curr_min) else: return float(np.min(self.data[orientation].data))
[docs] def vmax(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time. :param orientation: Optionally filter by patches with a specific orientation. """ if orientation == 0: curr_max = np.max(self.upper_bounds) if curr_max == np.float32(-1e33): return float(max(np.max(p.data) for p in self._patches)) return float(curr_max) else: return float(np.max(self.data[orientation].data))
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ for p in self._patches: p.clear_cache()
def __repr__(self): return f"Boundary(Quantity={self.quantity}, Patches={len(self._patches)})"
[docs] class SubObstruction: """An :class:`Obstruction` consists of 1 or more SubObstructions which can be hidden at specific points in time. :ivar extent: :class:`Extent` object containing 3-dimensional extent information. :ivar bound_indices: Indices used to define obstruction bounds in terms of mesh locations. :ivar side_surfaces: Tuple of six :class:`Surface` s for each side of the cuboid. :ivar hide_times: List with points in time from when on the SubObstruction will be hidden. :ivar show_times: List with points in time from when on the SubObstruction will be shown. """ def __init__(self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int, int, int], extent: Extent, mesh: 'Mesh'): self.extent = extent self.side_surfaces = side_surfaces self.bound_indices = {'x': (bound_indices[0], bound_indices[1]), 'y': (bound_indices[2], bound_indices[3]), 'z': (bound_indices[4], bound_indices[5])} self.mesh = mesh self._boundary_data: Dict[int, Boundary] = dict() self.hide_times = list() self.show_times = list() def _add_patches(self, bid: int, cell_centered: bool, quantity: str, short_name: str, unit: str, patches: List[Patch], times: Sequence[float], lower_bounds: np.ndarray, upper_bounds: np.ndarray): if bid not in self._boundary_data: self._boundary_data[bid] = Boundary(Quantity(quantity, short_name, unit), cell_centered, times, patches, lower_bounds, upper_bounds) # Add reference to parent boundary class in patches for patch in patches: patch._boundary_parent = self._boundary_data[bid] if not settings.LAZY_LOAD: _ = self._boundary_data[bid].data @property def orientations(self): """Return all orientations for which there is data available. """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).orientations return []
[docs] def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: return {orientation: patch.get_coordinates(ignore_cell_centered) for orientation, patch in next(iter(self._boundary_data.values())).data.items()} return {}
[docs] def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. """ if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") if idx > 0 and (idx == coords.size or np.math.fabs(value - coords[idx - 1]) < np.math.fabs( value - coords[idx])): return idx - 1 else: return idx return np.nan
@property def has_boundary_data(self): return len(self._boundary_data) != 0
[docs] def get_data(self, quantity: Union[str, Quantity]): if type(quantity) == Quantity: quantity = quantity.name return next(b for b in self._boundary_data.values() if b.quantity.name.lower() == quantity.lower() or b.quantity.short_name.lower() == quantity.lower())
def __getitem__(self, item): if type(item) == int: return self._boundary_data[item] return self.get_data(item) def _hide(self, time: float): self.hide_times.append(time) self.hide_times.sort() def _show(self, time: float): self.show_times.append(time) self.show_times.sort()
[docs] def n_t(self, count_duplicates=True) -> int: """Returns the number of timesteps for which boundary data is available. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).n_t(count_duplicates=count_duplicates) return 0
@property def times(self): """Return all timesteps for which boundary data is available, if any. """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).times return np.array([])
[docs] def get_visible_times(self, times: Sequence[float]) -> np.ndarray: """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden. """ ret = list() hidden = False for time in times: if time in self.show_times: hidden = False if time in self.hide_times: hidden = True if not hidden: ret.append(time) return np.array(ret)
[docs] def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return self.get_data(quantity).vmin(orientation) return np.nan
[docs] def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return self.get_data(quantity).vmax(orientation) return np.nan
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ for bndf in self._boundary_data.values(): bndf.clear_cache()
def __repr__(self): return f"SubObstruction(Extent={self.extent})"
[docs] class Obstruction: """A box-shaped obstruction with specific surfaces (materials) on each side. :ivar id: ID of the obstruction. :ivar color_index: Type of coloring used to color obstruction. -1 - default color -2 - invisible -3 - use red, green, blue and alpha (rgba attribute) n>0 - use n’th color table entry :ivar block_type: Defines how the obstruction is drawn. -1 - use surface to obtain blocktype 0 - regular block 2 - outline :ivar texture_origin: Origin position of the texture provided by the surface. When the texture does have a pattern, for example windows or bricks, the texture_origin specifies where the pattern should begin. :ivar rgba: Optional color of the obstruction in form of a 4-element tuple (ranging from 0.0 to 1.0). """ def __init__(self, oid: str, color_index: int, block_type: int, texture_origin: Tuple[float, float, float], rgba: Union[Tuple[()], Tuple[float, float, float, float]] = ()): self.id = oid self.color_index = color_index self.block_type = block_type self.texture_origin = texture_origin if len(rgba) != 0: self.rgba = rgba self._subobstructions: Dict[str, SubObstruction] = dict() @property def bounding_box(self) -> Extent: """:class:`Extent` object representing the bounding box around the Obstruction. """ extents = [sub.extent for sub in self._subobstructions.values()] return Extent(min(extents, key=lambda e: e.x_start).x_start, max(extents, key=lambda e: e.x_end).x_end, min(extents, key=lambda e: e.y_start).y_start, max(extents, key=lambda e: e.y_end).y_end, min(extents, key=lambda e: e.z_start).z_start, max(extents, key=lambda e: e.z_end).z_end) @property def orientations(self): """Return all orientations for which there is data available. """ if self.has_boundary_data: orientations = set() for subobst in self._subobstructions.values(): orientations.update(subobst.orientations) return sorted(list(orientations)) return []
[docs] def n_t(self, count_duplicates=True) -> int: """Returns the number of timesteps for which boundary data is available. :param count_duplicates: If true, return the total number of data points, even if there is duplicate data for a timestep. Duplicate data might be output when restarting the simulation in between the simulation run. """ for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.n_t(count_duplicates=count_duplicates) return 0
@property def times(self): """Return all timesteps for which boundary data is available, if any. """ for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.times return np.array([])
[docs] def get_visible_times(self, times: Sequence[float]): """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden. """ for subobst in self._subobstructions.values(): return subobst.get_visible_times(times) return np.array([])
[docs] def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: all_coords = dict() for orientation_int in self.orientations: orientation = ('x', 'y', 'z')[abs(orientation_int) - 1] coords = {'x': set(), 'y': set(), 'z': set()} for dim in ('x', 'y', 'z'): if orientation == dim: bounding_box_index = 0 if orientation_int < 0 else 1 coords[dim] = np.array([self.bounding_box[dim][bounding_box_index]]) continue min_delta = math.inf for subobst in self._subobstructions.values(): subobst_coords = subobst.get_coordinates(ignore_cell_centered) if orientation_int not in subobst_coords: continue co = subobst_coords[orientation_int][dim] min_delta = min(min_delta, co[1] - co[0]) coords[dim].update(co) coords[dim] = np.array(sorted(list(coords[dim]))) to_keep = np.full(coords[dim].shape, True) for i in range(len(coords[dim]) - 1): if abs(coords[dim][i] - coords[dim][i + 1]) < min_delta / 2: to_keep[i + 1] = False coords[dim] = coords[dim][to_keep] if len(coords[dim]) == 0: bounding_box_index = 0 if orientation_int < 0 else 1 single_coordinate = self.bounding_box[dim][bounding_box_index] nearest_coordinate = np.inf for subobst in self._subobstructions.values(): mesh = subobst.mesh mesh_coords = mesh.coordinates[dim] idx = np.searchsorted(mesh_coords, single_coordinate, side="left") if idx > 0 and (idx == mesh_coords.size or np.math.fabs( single_coordinate - mesh_coords[idx - 1]) < np.math.fabs( single_coordinate - mesh_coords[idx])): idx = idx + 1 if mesh_coords[idx] - single_coordinate < nearest_coordinate - single_coordinate: nearest_coordinate = mesh_coords[idx] coords[dim] = np.array([nearest_coordinate]) all_coords[orientation_int] = coords return all_coords return dict()
[docs] def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. """ if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") if idx > 0 and (idx == coords.size or np.math.fabs(value - coords[idx - 1]) < np.math.fabs( value - coords[idx])): return idx - 1 else: return idx return np.nan
@property def quantities(self) -> List[Quantity]: """Get a list of all quantities for which boundary data exists. """ if self.has_boundary_data: qs = set() for subobst in self._subobstructions.values(): for b in subobst._boundary_data.values(): qs.add(b.quantity) return list(qs) return [] @property def meshes(self) -> List['Mesh']: """Returns a list of all meshes this slice cuts through. """ return [subobst.mesh for subobst in self._subobstructions.values()]
[docs] def filter_by_orientation(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> List[SubObstruction]: """Filter all SubObstructions by a specific orientation. All returned SubObstructions will contain boundary data in the specified orientation. """ if self.has_boundary_data: return [subobst for subobst in self._subobstructions.values() if orientation in subobst.orientations] return []
[docs] def get_boundary_data(self, quantity: Union[Quantity, str], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> Dict[str, Boundary]: """Gets the boundary data for a specific quantity of all SubObstructions. :param quantity: The quantity to filter by. :param orientation: Optionally filter by a specific orientation as well (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=z). A value of 0 indicates to no filter. """ if type(quantity) == Quantity: quantity = quantity.name ret = {subobst.mesh.id: subobst.get_data(quantity) for subobst in self._subobstructions.values() if subobst.has_boundary_data} if orientation == 0: return ret return {mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys()}
[docs] def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: """Calculates the nearest timestep for which data has been output for this obstruction. """ if self.has_boundary_data: times = self.get_visible_times(self.times) if visible_only else self.times idx = np.searchsorted(times, time, side="left") if time > 0 and (idx == len(times) or np.math.fabs( time - times[idx - 1]) < np.math.fabs(time - times[idx])): return idx - 1 else: return idx return np.nan
[docs] def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): """Gets the patch of the :class:`SubObstruction` that has the least distance to the given point. If there are multiple patches with the same distance, a random one will be selected. """ if self.has_boundary_data: d_min = np.finfo(float).max patches_min = list() for subobst in self._subobstructions.values(): for patch in subobst.get_data(self.quantities[0])._patches: dx = max(patch.extent.x_start - x, 0, x - patch.extent.x_end) if x is not None else 0 dy = max(patch.extent.y_start - y, 0, y - patch.extent.y_end) if y is not None else 0 dz = max(patch.extent.z_start - z, 0, z - patch.extent.z_end) if z is not None else 0 d = np.sqrt(dx * dx + dy * dy + dz * dz) if d <= d_min: d_min = d patches_min.append(patch) if x is not None: patches_min.sort(key=lambda patch: (patch.extent.x_end - patch.extent.x_start)) if y is not None: patches_min.sort(key=lambda patch: (patch.extent.y_end - patch.extent.y_start)) if z is not None: patches_min.sort(key=lambda patch: (patch.extent.z_end - patch.extent.z_start)) if len(patches_min) > 0: return patches_min[0] return None
@property def has_boundary_data(self): """Whether boundary data has been output in the simulation. """ return any(subobst.has_boundary_data for subobst in self._subobstructions.values())
[docs] def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dict[ int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: """Creates a global numpy ndarray from all subobstruction's boundary data for each orientation. :param quantity: The quantity's name or short name for which to load the global arrays. """ if not self.has_boundary_data: return dict() for subobst in self._subobstructions.values(): if subobst.has_boundary_data: cell_centered = next(iter(subobst._boundary_data.values())).cell_centered result = dict() for orientation_int in self.orientations: subobst_sets = [list(), list()] dim = ['x', 'y', 'z'][abs(orientation_int) - 1] random_subobst = next( subobst for subobst in self._subobstructions.values() if orientation_int in subobst.get_coordinates()) base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[orientation_int][dim][0] for subobst in self._subobstructions.values(): if not subobst.has_boundary_data: continue coords = subobst.get_coordinates(ignore_cell_centered=False) if orientation_int not in coords: continue if coords[orientation_int][dim][0] == base_coord: subobst_sets[0].append(subobst) else: subobst_sets[1].append(subobst) if len(subobst_sets[1]) == 0: subobst_sets.pop(1) first_result_grid = None for subobsts in subobst_sets: coord_min = {'x': math.inf, 'y': math.inf, 'z': math.inf} coord_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} for dim in ('x', 'y', 'z'): for subobst in subobsts: patch_extent = subobst.get_data(quantity).data[orientation_int].extent co = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int][dim] co = co[np.where(np.logical_and(co >= patch_extent[dim][0], co <= patch_extent[dim][1]))] coord_min[dim] = min(co[0], coord_min[dim]) coord_max[dim] = max(co[-1], coord_max[dim]) # The global grid will use the finest mesh as base and duplicate values of the coarser meshes # Therefore we first find the finest mesh and calculate the step size in each dimension step_sizes_min = {'x': coord_max['x'] - coord_min['x'], 'y': coord_max['y'] - coord_min['y'], 'z': coord_max['z'] - coord_min['z']} step_sizes_max = {'x': 0, 'y': 0, 'z': 0} steps = dict() global_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} for dim in ('x', 'y', 'z'): for subobst in subobsts: subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] if len(subobst_coords[dim]) <= 1: step_size = 0 else: step_size = subobst_coords[dim][1] - subobst_coords[dim][0] step_sizes_min[dim] = min(step_size, step_sizes_min[dim]) step_sizes_max[dim] = max(step_size, step_sizes_max[dim]) global_max[dim] = max(subobst_coords[dim][-1], global_max[dim]) for dim in ('x', 'y', 'z'): if step_sizes_min[dim] == 0: step_sizes_min[dim] = math.inf steps[dim] = 1 else: steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + ( 0 if cell_centered else 1) grid = np.full((self.n_t(count_duplicates=False), steps['x'], steps['y'], steps['z']), np.nan) start_coordinates = {'x': coord_min['x'], 'y': coord_min['y'], 'z': coord_min['z']} start_idx = dict() end_idx = dict() for subobst in subobsts: patch_data = np.expand_dims(subobst.get_data(quantity).data[orientation_int].data, axis=abs(orientation_int)) subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] for axis in (0, 1, 2): dim = ('x', 'y', 'z')[axis] if axis == abs(orientation_int) - 1: start_idx[dim] = 0 end_idx[dim] = 1 continue n_repeat = max( int(round((subobst_coords[dim][1] - subobst_coords[dim][0]) / step_sizes_min[dim])), 1) start_idx[dim] = int( round((subobst_coords[dim][0] - start_coordinates[dim]) / step_sizes_min[dim])) end_idx[dim] = int( round((subobst_coords[dim][-1] - start_coordinates[dim]) / step_sizes_min[dim])) # We ignore border points unless they are actually on the border of the simulation space as all # other border points actually appear twice, as the subobstructions overlap. This only # applies for face_centered data, as cell_centered data will not overlap. if not cell_centered: if axis != abs(orientation_int) - 1: reduced_shape = list(patch_data.shape) reduced_shape[axis + 1] -= 1 reduced_data_slices = tuple(slice(s) for s in reduced_shape) patch_data = patch_data[reduced_data_slices] # Temporarily save border points to add them back to the array again later if subobst_coords[dim][-1] == global_max[dim]: end_idx[dim] += 1 temp_data_slices = [slice(s) for s in patch_data.shape] temp_data_slices[axis + 1] = slice(patch_data.shape[axis + 1] - 1, None) temp_data = patch_data[tuple(temp_data_slices)] if n_repeat > 1: patch_data = np.repeat(patch_data, n_repeat, axis=axis + 1) # Add border points back again if needed if not cell_centered and subobst_coords[dim][-1] == global_max[dim]: patch_data = np.concatenate((patch_data, temp_data), axis=axis + 1) grid[:, start_idx['x']: end_idx['x'], start_idx['y']: end_idx['y'], start_idx['z']: end_idx['z']] = patch_data.reshape( (self.n_t(count_duplicates=False), end_idx['x'] - start_idx['x'], end_idx['y'] - start_idx['y'], end_idx['z'] - start_idx['z'])) # Remove empty dimensions, but make sure to note remove the time dimension if there is only a single timestep grid = np.squeeze(grid) if len(grid.shape) == 2: grid = grid[np.newaxis, :, :] if first_result_grid is not None: result[orientation_int] = (first_result_grid, grid) else: result[orientation_int] = grid return result
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ for s in self._subobstructions.values(): s.clear_cache()
[docs] def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return min(s.vmin(quantity, orientation) for s in self._subobstructions.values()) else: return np.nan
[docs] def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: return max(s.vmax(quantity, orientation) for s in self._subobstructions.values()) return np.nan
def __getitem__(self, key: Union[int, str, 'Mesh']): """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id. """ if type(key) == int: return tuple(self._subobstructions.values())[key] elif type(key) == str: return self._subobstructions[key] return self._subobstructions[key.id] def __eq__(self, other): return self.id == other.id def __repr__(self, *args, **kwargs): return f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, SubObstructions={len(self._subobstructions)}" + ( f", Quantities={[q.short_name for q in self.quantities]}" if self.has_boundary_data else "") + ")"