Source code for fdsreader.pl3d.pl3d

import logging
import os
from copy import deepcopy
from typing import Dict, Sequence, Tuple, Literal, Union, List
import numpy as np
import math
import bisect

from fdsreader.fds_classes import Mesh
from fdsreader.utils import Quantity
from fdsreader import settings
import fdsreader.utils.fortran_data as fdtype

_HANDLED_FUNCTIONS = {np.mean: (lambda pl: pl.mean)}


[docs] def implements(np_function): """Decorator to register an __array_function__ implementation for Plot3Ds. """ def decorator(func): _HANDLED_FUNCTIONS[np_function] = func return func return decorator
[docs] class SubPlot3D: """Subplot of a pl3d output for a single mesh. :ivar mesh: The mesh containing the data. """ # Offset of the binary file to the end of the file header. _offset = fdtype.new((('i', 3),)).itemsize + fdtype.new((('i', 4),)).itemsize def __init__(self, mesh: Mesh, quantity_idx: int): self.file_paths: List[str] = list() # Path to the binary data file for each time step self.mesh = mesh self._quantity_idx = quantity_idx def _add_timestep(self, time_idx: int, file_path: str): self.file_paths.insert(time_idx, file_path) @property def data(self) -> np.ndarray: """Method to lazy load the 3D data for each quantity of a single mesh. :returns: 4D numpy array with (t,x,y,z) as dimensions. """ if not hasattr(self, "_data"): self._data = np.empty(shape=(len(self.file_paths),) + self.mesh.dimension.shape()) dtype_data = fdtype.new((('f', self.mesh.dimension.size() * 5),)) for t, file_path in enumerate(self.file_paths): with open(file_path, 'rb') as infile: infile.seek(self._offset) self._data[t, :, :, :] = fdtype.read(infile, dtype_data, 1)[0][0].reshape( self.mesh.dimension.shape() + (5,), order='F')[:, :, :, self._quantity_idx] return self._data
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ if hasattr(self, "_data"): del self._data
[docs] class Plot3D(np.lib.mixins.NDArrayOperatorsMixin): """Plot3d file data container including metadata. Consists of multiple subplots, one for each mesh. :ivar times: All times for which data has been recorded. :ivar quantities: List with quantity objects containing information about recorded quantities calculated for this Plot3D with the corresponding short_name and unit. """ def __init__(self, root_path: str): self._root_path = root_path self.times: List[float] = list() # List of all subplots this Plot3D consists of (one per mesh). self._subplots: Dict[str, SubPlot3D] = dict() def _add_subplot(self, filename: str, time: float, quantity: Quantity, quantity_idx: int, mesh: Mesh): self.quantity: Quantity = quantity if mesh.id not in self._subplots.keys(): self._subplots[mesh.id] = SubPlot3D(mesh, quantity_idx) if not any(np.isclose(time, t) for t in self.times): time_idx = bisect.bisect(self.times, time) self.times.insert(time_idx, time) else: time_idx = next(idx for idx, t in enumerate(self.times) if np.isclose(time, t)) self._subplots[mesh.id]._add_timestep(time_idx, os.path.join(self._root_path, filename)) # If lazy loading has been disabled by the user, load the data instantaneously instead if not settings.LAZY_LOAD: _ = self._subplots[mesh.id].data def __getitem__(self, mesh: Mesh): """Returns the :class:`SubPlot` that contains data for the given mesh. """ return self._subplots[mesh.id]
[docs] @implements(np.mean) def mean(self) -> float: """Calculates the mean value of the whole Plot3D. :returns: The calculated mean value. """ mean_sum = 0 for subplot in self._subplots.values(): mean_sum += np.mean(subplot.data) return mean_sum / len(self._subplots)
[docs] @implements(np.std) def std(self) -> float: """Calculates the standard deviation for each quantity individually of the whole Plot3D. :returns: The calculated standard deviation. """ mean = self.mean sum = np.sum([np.sum(np.power(subplot.data - mean, 2)) for subplot in self._subplots.values()]) N = np.sum([subplot.data.size for subplot in self._subplots.values()]) return np.sqrt(sum / N)
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ for subplot in self._subplots.values(): subplot.clear_cache()
[docs] def to_global(self, masked: bool = False, fill: float = 0, return_coordinates: bool = False) -> \ Union[np.ndarray, Tuple[np.ndarray, Dict[Literal['x', 'y', 'z'], np.ndarray]]]: """Creates a global numpy ndarray from all subplots. :param masked: Whether to apply the obstruction mask to the data or not. :param fill: The fill value to use for masked entries. Only used when masked=True. :param return_coordinates: If true, return the matching coordinate for each value on the generated grid. """ if len(self._subplots) == 0: if return_coordinates: return np.array([]), {d: np.array([]) for d in ('x', 'y', 'z')} else: return np.array([]) coord_min = {'x': math.inf, 'y': math.inf, 'z': math.inf} coord_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} for dim in ('x', 'y', 'z'): for subplot in self._subplots.values(): co = subplot.mesh.coordinates[dim] coord_min[dim] = min(co[0], coord_min[dim]) coord_max[dim] = max(co[-1], coord_max[dim]) # The global grid will use the finest mesh as base and duplicate values of the coarser # meshes. Therefore, we first find the finest mesh and calculate the step size in each # dimension. step_sizes_min = {'x': coord_max['x'] - coord_min['x'], 'y': coord_max['y'] - coord_min['y'], 'z': coord_max['z'] - coord_min['z']} step_sizes_max = {'x': 0, 'y': 0, 'z': 0} steps = dict() global_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} for dim in ('x', 'y', 'z'): for subplot in self._subplots.values(): step_size = subplot.mesh.coordinates[dim][1] - subplot.mesh.coordinates[dim][0] step_sizes_min[dim] = min(step_size, step_sizes_min[dim]) step_sizes_max[dim] = max(step_size, step_sizes_max[dim]) global_max[dim] = max(subplot.mesh.coordinates[dim][-1], global_max[dim]) for dim in ('x', 'y', 'z'): if step_sizes_min[dim] == 0: step_sizes_min[dim] = math.inf steps[dim] = 1 else: steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + 1 # + step_sizes_max[dim] / step_sizes_min[dim] grid = np.full((self.n_t, steps['x'], steps['y'], steps['z']), np.nan) start_idx = dict() end_idx = dict() for subplot in self._subplots.values(): subplot_data = subplot.data.copy() if masked: mask = subplot.mesh.get_obstruction_mask(self.times) start_idx = {dim: int(round( (subplot.mesh.coordinates[dim][0] - coord_min[dim]) / step_sizes_min[dim])) for dim in ('x', 'y', 'z')} end_idx = {dim: int(round( (subplot.mesh.coordinates[dim][-1] - coord_min[dim]) / step_sizes_min[dim])) for dim in ('x', 'y', 'z')} temp_data = dict() temp_mask = dict() for axis in range(3): dim = ('x', 'y', 'z')[axis] # Temporarily save border points to add them back to the array again later if np.isclose(subplot.mesh.coordinates[dim][-1], global_max[dim]): temp_data_slices = [slice(s) for s in subplot_data.shape] end_idx[dim] += 1 temp_data_slices[axis + 1] = slice(subplot_data.shape[axis + 1] - 1, None) temp_data[dim] = subplot_data[tuple(temp_data_slices)] if masked: temp_mask[dim] = mask[tuple(temp_data_slices)] # We ignore border points unless they are actually on the border of the simulation space as all # other border points actually appear twice, as the subslices overlap. This only # applies for face_centered slices, as cell_centered slices will not overlap. reduced_shape_slices = (slice(subplot.data.shape[0]),) + tuple(slice(1, None) for s in subplot.data.shape[1:]) subplot_data = subplot_data[reduced_shape_slices] if masked: mask = mask[reduced_shape_slices] n_repeat = max(int(round( (subplot.mesh.coordinates[dim][1] - subplot.mesh.coordinates[dim][0]) / step_sizes_min[dim])), 1) if n_repeat > 1: subplot_data = np.repeat(subplot_data, n_repeat, axis=axis + 1) if masked: mask = np.repeat(mask, n_repeat, axis=axis + 1) for axis in range(3): dim = ('x', 'y', 'z')[axis] # Add border points back again if needed if np.isclose(subplot.mesh.coordinates[dim][-1], global_max[dim]): temp_data_slices = [slice(s) for s in subplot_data.shape] temp_data_slices[axis + 1] = slice(None) subplot_data = np.concatenate((subplot_data, temp_data[dim][tuple(temp_data_slices)]), axis=axis + 1) if masked: mask = np.concatenate((mask, temp_mask[dim][tuple(temp_data_slices)]), axis=axis + 1) # If the slice should be masked, we set all cells at which an obstruction is in the # simulation space to the fill value set by the user if masked: subplot_data = np.where(mask, subplot_data, fill) grid[:, start_idx['x']: end_idx['x'], start_idx['y']: end_idx['y'], start_idx['z']: end_idx['z']] = subplot_data.reshape( (self.n_t, end_idx['x'] - start_idx['x'], end_idx['y'] - start_idx['y'], end_idx['z'] - start_idx['z'])) if return_coordinates: coordinates = dict() for dim_index, dim in enumerate(('x', 'y', 'z')): coordinates[dim] = np.linspace(coord_min[dim], coord_max[dim], grid.shape[dim_index + 1]) if return_coordinates: return grid, coordinates else: return grid
@property def n_t(self) -> int: """Get the number of timesteps for which data was output. """ return len(self.times) @property def subplots(self): """Returns a list with one SubPlot3D object per mesh. """ return list(self._subplots.values()) def __array__(self): """Method that will be called by numpy when trying to convert the object to a numpy ndarray. """ raise UserWarning( "Plot3Ds can not be converted to numpy arrays, but they support all typical numpy" " operations such as np.multiply. If a 'global' array containing all subplots is" " required, please use the 'to_global' method and use the returned numpy-array explicitly.") def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """Method that will be called by numpy when using a ufunction with a Plot3D as input. :returns: A new pl3d on which the ufunc has been applied. """ if method != "__call__": logging.warning( "The %s method has been used which is not explicitly implemented. Correctness of" " results is not guaranteed. If you require this feature to be implemented please" " submit an issue on Github where you explain your use case.", method) input_list = list(inputs) for i, inp in enumerate(inputs): if isinstance(inp, self.__class__): del input_list[i] new_pl3d = deepcopy(self) for subplot in self._subplots.values(): subplot._data = ufunc(subplot.data, input_list[0], **kwargs) return new_pl3d def __array_function__(self, func, types, args, kwargs): """Method that will be called by numpy when using an array function with a Slice as input. :returns: The output of the array function. """ if func not in _HANDLED_FUNCTIONS: return NotImplemented # Note: this allows subclasses that don't override __array_function__ to handle Plot3Ds. if not all(issubclass(t, self.__class__) for t in types): return NotImplemented return _HANDLED_FUNCTIONS[func](*args, **kwargs)
# __array_function__ implementations # ...