import os
from copy import deepcopy
import numpy as np
import logging
from typing import Dict, Collection, Union, List
from typing_extensions import Literal
from fdsreader.fds_classes import Mesh
from fdsreader.utils import Quantity, Extent
from fdsreader import settings
import fdsreader.utils.fortran_data as fdtype
_HANDLED_FUNCTIONS = {}
[docs]
def implements(np_function):
"""Decorator to register an __array_function__ implementation for GeomSlices.
"""
def decorator(func):
_HANDLED_FUNCTIONS[np_function] = func
return func
return decorator
[docs]
class SubGeomSlice:
"""Part of a geomslice that cuts through a single mesh.
:ivar mesh: The mesh the subgeomslice cuts through.
"""
def __init__(self, parent_slice, filename: str, geom_filename: str, extent: Extent, mesh: Mesh):
self._parent_slice = parent_slice
self.mesh = mesh
self.extent = extent
self.file_path = os.path.join(parent_slice._root_path, filename)
self.geom_file_path = os.path.join(parent_slice._root_path, geom_filename)
# self.vector_filenames = dict()
# self._vector_data = dict()
@property
def orientation(self) -> Literal[1, 2, 3]:
"""Orientation [1,2,3] of the geomslice in case it is 2D, 0 otherwise.
"""
return self._parent_slice.orientation
@property
def times(self):
return self._parent_slice.times
@property
def n_t(self) -> int:
"""Get the number of timesteps for which data was output.
"""
return self._parent_slice.n_t
def _load_geom_data(self):
with open(self.geom_file_path, 'rb') as infile:
dtype_meta = fdtype.new((('i', 3),))
infile.seek(2 * fdtype.INT.itemsize + dtype_meta.itemsize + fdtype.FLOAT.itemsize)
self.n_verts, self.n_faces, n_vols = fdtype.read(infile, dtype_meta, 1)[0][0]
if self.n_verts > 0 and self.n_faces > 0:
dtype_verts = fdtype.new((('f', 3 * self.n_verts),))
dtype_faces = fdtype.new((('i', 3 * self.n_faces),))
# dtype_locations = fdtype.new((('i', self.n_faces),))
# dtype_zero_floats = fdtype.new((('f', 3 * self.n_faces * 2),))
self._vertices = fdtype.read(infile, dtype_verts, 1)[0][0].reshape((self.n_verts, 3), order='F')
self._faces = fdtype.read(infile, dtype_faces, 1)[0][0].reshape((self.n_faces, 3), order='F')
else:
self._vertices = np.array([])
self._faces = np.array([])
def _load_data(self):
with open(self.file_path, 'rb') as infile:
infile.seek(2 * fdtype.INT.itemsize)
if self.n_verts > 0 and self.n_faces > 0:
dtype_data = fdtype.combine(fdtype.FLOAT, fdtype.new((('i', 4),)), fdtype.new((('f', self.n_faces),)))
else:
dtype_data = fdtype.combine(fdtype.FLOAT, fdtype.new((('i', 4),)))
load_times = self.n_t == -1
if load_times:
self._parent_slice.n_t = (os.stat(self.file_path).st_size - 2 * fdtype.INT.itemsize) // dtype_data.itemsize
self._parent_slice.times = np.empty(self.n_t)
self._data = np.empty((self.n_t, self.n_faces), dtype=np.float32)
if self.n_verts > 0 and self.n_faces > 0:
for t, data in enumerate(fdtype.read(infile, dtype_data, self.n_t)):
if load_times:
self.times[t] = data[0][0]
self._data[t, :] = data[2]
@property
def data(self) -> np.ndarray:
"""Method to lazy load the geomslice's data.
"""
if not hasattr(self, "_data"):
_ = self.vertices # Make sure geom data has been loaded already
self._load_data()
return self._data
@property
def vertices(self) -> np.ndarray:
"""Method to lazy load the geomslice's data.
"""
if not hasattr(self, "_vertices"):
self._load_geom_data()
return self._vertices
@property
def faces(self) -> np.ndarray:
"""Method to lazy load the geomslice's data.
"""
if not hasattr(self, "_faces"):
self._load_geom_data()
return self._faces
# @property
# def vector_data(self) -> Dict[str, np.ndarray]:
# """Method to lazy load the geomslice's vector data if it exists.
# """
# if not hasattr(self, "_vector_data"):
# raise AttributeError("There is no vector data available for this geomslice.")
# if len(self._vector_data) == 0:
# for direction in self.vector_filenames.keys():
# file_path = os.path.join(self._parent_slice._root_path,
# self.vector_filenames[direction])
# self._vector_data[direction] = np.empty((self.n_t,) + self.shape,
# dtype=np.float32)
# self._load_data(file_path, self._vector_data[direction])
# return self._vector_data
@property
def vmin(self):
return np.min(self.data)
@property
def vmax(self):
return np.max(self.data)
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory.
"""
if hasattr(self, "_data"):
del self._data
# del self._vector_data
# self._vector_data = dict()
def __repr__(self):
return f"SubGeomSlice(mesh={self.mesh.id})"
[docs]
class GeomSlice(np.lib.mixins.NDArrayOperatorsMixin):
"""Slice file data container including metadata. Consists of multiple subgeomslices, one for each
mesh the geomslice cuts through.
:ivar cell_centered: Indicates whether centered positioning for data is used.
:ivar quantity: Quantity object containing information about the quantity calculated for this
geomslice with the corresponding short_name and unit.
:ivar times: Numpy array containing all times for which data has been recorded.
:ivar n_t: Total number of time steps for which output data has been written.
:ivar orientation: Orientation [1,2,3] of the geomslice in case it is 2D, 0 otherwise.
:ivar extent: :class:`Extent` object containing 3-dimensional extent information.
"""
def __init__(self, root_path: str, geomslice_id: str, times: np.ndarray, multimesh_data: Collection[Dict]):
self._root_path = root_path
self.times = times
if times is not None:
self.n_t = times.size
else:
self.n_t = -1
self.id = geomslice_id
# List of all subgeomslices this geomslice consists of (one per mesh).
self._subgeomslices: Dict[str, SubGeomSlice] = dict()
vector_temp = dict()
for mesh_data in multimesh_data:
if "-VELOCITY" in mesh_data["quantity"]:
vector_temp[mesh_data["mesh"].id] = dict()
for mesh_data in multimesh_data:
if mesh_data["mesh"].id not in self._subgeomslices:
self.quantity = Quantity(mesh_data["quantity"], mesh_data["short_name"], mesh_data["unit"])
self._subgeomslices[mesh_data["mesh"].id] = SubGeomSlice(self, mesh_data["filename"], mesh_data["geomfilename"], mesh_data["extent"], mesh_data["mesh"])
# if "-VELOCITY" in mesh_data["quantity"]:
# vector_temp[mesh_data["mesh"]][mesh_data["quantity"]] = mesh_data["filename"]
# for mesh, vector_filenames in vector_temp.items():
# if "U-VELOCITY" in vector_filenames:
# self._subgeomslices[mesh].vector_filenames["u"] = vector_filenames["U-VELOCITY"]
# if "V-VELOCITY" in vector_filenames:
# self._subgeomslices[mesh].vector_filenames["v"] = vector_filenames["V-VELOCITY"]
# if "W-VELOCITY" in vector_filenames:
# self._subgeomslices[mesh].vector_filenames["w"] = vector_filenames["W-VELOCITY"]
# # Iterate over all subgeomslices and remove duplicated ones which will be created when the geomslice
# # cuts exactly through two mesh borders. Only required for non-cell-centered geomslices.
# if not self.cell_centered:
# extents_tmp = list()
# remove_tmp = list()
# for mesh, sslc in self._subgeomslices.items():
# if sslc.extent not in extents_tmp:
# extents_tmp.append(sslc.extent)
# else:
# remove_tmp.append(mesh)
# for mesh in remove_tmp:
# del self._subgeomslices[mesh]
#
vals = self._subgeomslices.values()
self.extent = Extent(min(vals, key=lambda e: e.extent.x_start).extent.x_start,
max(vals, key=lambda e: e.extent.x_end).extent.x_end,
min(vals, key=lambda e: e.extent.y_start).extent.y_start,
max(vals, key=lambda e: e.extent.y_end).extent.y_end,
min(vals, key=lambda e: e.extent.z_start).extent.z_start,
max(vals, key=lambda e: e.extent.z_end).extent.z_end)
if self.extent.x_start == self.extent.x_end:
self.orientation = 1
elif self.extent.y_start == self.extent.y_end:
self.orientation = 2
elif self.extent.z_start == self.extent.z_end:
self.orientation = 3
else:
self.orientation = 0
# If lazy loading has been disabled by the user, load the data instantaneously instead
if not settings.LAZY_LOAD:
for _, subgeomslice in self._subgeomslices.items():
_ = subgeomslice.data
[docs]
def get_subgeomslice(self, key: Union[int, str, Mesh]) -> SubGeomSlice:
"""Returns the :class:`SubGeomSlice` that cuts through the given mesh. When an int is
provided the nth SubGeomSlice will be returned.
"""
return self[key]
def __getitem__(self, key: Union[int, str, Mesh]) -> SubGeomSlice:
"""Returns the :class:`SubGeomSlice` that cuts through the given mesh. When an int is
provided the nth SubGeomSlice will be returned.
"""
if type(key) == int:
return tuple(self._subgeomslices.values())[key]
if type(key) == str:
return self._subgeomslices[key]
return self._subgeomslices[key.id]
def __len__(self):
return len(self._subgeomslices)
@property
def subgeomslices(self) -> List[SubGeomSlice]:
"""Get a list with all SubGeomSlices.
"""
return list(self._subgeomslices.values())
# @property
# def extent_dirs(self) -> Tuple[Literal['x', 'y', 'z'], Literal['x', 'y', 'z'], Literal['x', 'y', 'z']]:
# """The directions in which there is an extent. All three dimensions in case the geomslice is 3D.
# """
# ior = self.orientation
# if ior == 0:
# return 'x', 'y', 'z'
# elif ior == 1:
# return 'y', 'z'
# elif ior == 2:
# return 'x', 'z'
# else:
# return 'x', 'y'
[docs]
def get_nearest_timestep(self, time: float) -> int:
"""Calculates the nearest timestep for which data has been output for this geomslice.
"""
idx = np.searchsorted(self.times, time, side="left")
if time > 0 and (idx == len(self.times) or np.math.fabs(
time - self.times[idx - 1]) < np.math.fabs(time - self.times[idx])):
return idx - 1
else:
return idx
# def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], value: float) -> int:
# """Get the nearest mesh coordinate index in a specific dimension.
# """
# coords = self.coordinates[dimension]
# idx = np.searchsorted(coords, value, side="left")
# if idx > 0 and (idx == coords.size or np.math.fabs(value - coords[idx - 1]) < np.math.fabs(
# value - coords[idx])):
# return idx - 1
# else:
# return idx
@property
def meshes(self) -> List[Mesh]:
"""Returns a list of all meshes this geomslice cuts through.
"""
return [subgeomslc.mesh for subgeomslc in self._subgeomslices]
# @property
# def coordinates(self) -> Dict[Literal['x', 'y', 'z'], np.ndarray]:
# """Returns a dictionary containing a numpy ndarray with coordinates for each dimension.
# For cell-centered geomslices, the coordinates are adjusted to represent cell-centered coordinates.
# """
# coords = {'x': set(), 'y': set(), 'z': set()}
# for dim in ('x', 'y', 'z'):
# for mesh in self._subgeomslices.keys():
# co = mesh.coordinates[dim].copy()
# # In case the geomslice is cell-centered, we will shift the coordinates by half a cell
# # and remove the last coordinate
# if self.cell_centered:
# co = co[:-1]
# co -= (co[1] - co[0]) / 2
# coords[dim].update(co)
# coords[dim] = np.array(sorted(list(coords[dim])))
#
# return coords
@property
def vmin(self):
return np.min(self)
@property
def vmax(self):
return np.max(self)
[docs]
def clear_cache(self):
"""Remove all data from the internal cache that has been loaded so far to free memory.
"""
for subgeomslice in self._subgeomslices.values():
subgeomslice.clear_cache()
@property
def vertices(self):
n_verts = sum(subgeomslice.vertices.shape[0] for subgeomslice in self._subgeomslices.values())
vertices = np.empty((n_verts, 3))
counter = 0
for subgeomslice in self._subgeomslices.values():
size = subgeomslice.vertices.shape[0]
vertices[:, counter:counter+size] = subgeomslice.vertices
counter += size
return vertices
@property
def faces(self):
n_faces = sum(x.faces.shape[0] for x in self._subgeomslices.values())
faces = np.empty((n_faces, 3))
counter = 0
for subgeomslice in self._subgeomslices.values():
size = subgeomslice.faces.shape[0]
faces[:, counter:counter + size] = subgeomslice.faces
counter += size
return faces
@property
def data(self):
n = sum(x.data.shape[1] for x in self._subgeomslices.values())
data = np.empty((next(iter(self._subgeomslices.values())).n_t, n))
counter = 0
for subgeomslice in self._subgeomslices.values():
size = subgeomslice.data.shape[1]
data[:, counter:counter + size] = subgeomslice.data
counter += size
return data
@implements(np.min)
def _min(self):
return min(subgeomsclice.vmin for subgeomsclice in self._subgeomslices.values())
@implements(np.max)
def _max(self):
return max(subgeomsclice.vmax for subgeomsclice in self._subgeomslices.values())
[docs]
@implements(np.mean)
def mean(self):
"""Calculates the mean over the whole geomslice.
:returns: The calculated mean value.
"""
return np.mean([np.mean(subgeomsclice.data) for subgeomsclice in self._subgeomslices.values()])
[docs]
@implements(np.std)
def std(self):
"""Calculates the standard deviation over the whole geomslice.
:returns: The calculated standard deviation.
"""
mean = self.mean
sum = np.sum([np.sum(np.power(subgeomsclice.data - mean, 2)) for subgeomsclice in self._subgeomslices.values()])
N = np.sum([subgeomsclice.data.size for subgeomsclice in self._subgeomslices.values()])
return np.sqrt(sum / N)
def __array__(self):
"""Method that will be called by numpy when trying to convert the object to a numpy ndarray.
"""
raise UserWarning(
"Slices can not be converted to numpy arrays, but they support all typical numpy"
" operations such as np.multiply. If a 'global' array containing all subgeomslices is"
" required, use the 'to_global' method and use the returned numpy-array explicitly.")
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
"""Method that will be called by numpy when using a ufunction with a GeomSlice as input.
:returns: A new geomslice on which the ufunc has been applied.
"""
if method != "__call__":
logging.warning(
"The %s method has been used which is not explicitly implemented. Correctness of"
" results is not guaranteed. If you require this feature to be implemented please"
" submit an issue on Github where you explain your use case.", method)
input_list = list(inputs)
for i, inp in enumerate(inputs):
if isinstance(inp, self.__class__):
del input_list[i]
if len(input_list) == 0:
raise UserWarning(
f"The {method} operation is not implemented for multiple geomslices as input yet. If"
" you require this feature, please request this functionality by submitting an"
" issue on Github.")
new_slice = deepcopy(self)
for subgeomslice in new_slice._subgeomslices.values():
subgeomslice._data = ufunc(subgeomslice.data, input_list[0], **kwargs)
return new_slice
def __array_function__(self, func, types, args, kwargs):
"""Method that will be called by numpy when using an array function with a GeomSlice as input.
:returns: The output of the array function.
"""
if func not in _HANDLED_FUNCTIONS:
return NotImplemented
# Note: this allows subclasses that don't override __array_function__ to handle GeomSlices.
if not all(issubclass(t, self.__class__) for t in types):
return NotImplemented
return _HANDLED_FUNCTIONS[func](*args, **kwargs)
def __repr__(self):
# if self.type == '3D': # 3D-Slice
# return f"GeomSlice([3D] quantity={self.quantity}, cell_centered={self.cell_centered}, extent={self.extent})"
# else: # 2D-Slice
# return f"GeomSlice([2D] quantity={self.quantity}, cell_centered={self.cell_centered}, extent={self.extent}, " \
# f"extent_dirs={self.extent_dirs}, orientation={self.orientation})"
return f"GeomSlice(quantity={self.quantity})"
# __array_function__ implementations