Source code for fdsreader.part.particle

from typing import List, Tuple, Dict, Sequence

import numpy as np

from fdsreader.fds_classes import Mesh
from fdsreader.utils import Quantity


[docs] class Particle: """Container to store particle data from particle simulations with FDS. :ivar class_name: Name of the particle class defined in the FDS input-file. :ivar quantities: List of all quantities for which data has been written out. :ivar color: Color assigned to the particle. :ivar n_particles: Number of existing particles for each timestep per mesh. :ivar lower_bounds: Dictionary with lower bounds for each timestep with quantities as keys. :ivar upper_bounds: Dictionary with upper bounds for each timestep with quantities as keys. """ def __init__(self, class_name: str, quantities: List[Quantity], color: Tuple[float, float, float]): self.class_name = class_name self.quantities = quantities self.color = color self.n_particles: Dict[str, List[int]] = dict() self._positions: List[np.ndarray] = list() self._tags: List[np.ndarray] = list() self._data: Dict[str, List[np.ndarray]] = {q.name: [] for q in self.quantities} self.times: Sequence[float] = list() self.lower_bounds = {q.name: [] for q in self.quantities} self.upper_bounds = {q.name: [] for q in self.quantities} self._init_callback = None @property def id(self): return self.class_name
[docs] def filter_by_tag(self, tag: int): """Filter all particles by a single one with the specified tag. """ data = self.data tags = self.tags positions = self.positions part = Particle(self.class_name, self.quantities, self.color) part._tags = tag part._data = {quantity: list() for quantity in data.keys()} part._positions = list() part.times = list() for t, tags in enumerate(tags): if tag in tags: idx = np.where(tags == tag)[0] for quantity in data.keys(): part._data[quantity].append(data[quantity][t][idx][0]) part._positions.append(positions[t][idx][0]) part.times.append(self.times[t]) part.lower_bounds = dict() part.upper_bounds = dict() for q in self.quantities: if len(part._positions) != 0: part.lower_bounds[q.name] = np.min(part._data[q.name]) part.upper_bounds[q.name] = np.max(part._data[q.name]) else: part.lower_bounds[q.name] = 0 part.upper_bounds[q.name] = 0 return part
@property def data(self) -> Dict[str, List[np.ndarray]]: """Dictionary with quantities as keys and a list with a numpy array for each timestep which contains data for each particle in that timestep. """ if len(self._positions) == 0 and len(self._tags) == 0: self._init_callback() return self._data @property def tags(self) -> List[np.ndarray]: """List with a numpy array for each timestep which contains a tag for each particle in that timestep. """ if len(self._positions) == 0 and len(self._tags) == 0: self._init_callback() return self._tags @property def positions(self) -> List[np.ndarray]: """List with a numpy array for each timestep which contains the position of each particle in that timestep. """ if len(self._positions) == 0 and len(self._tags) == 0: self._init_callback() return self._positions
[docs] def clear_cache(self): """Remove all data from the internal cache that has been loaded so far to free memory. """ if len(self._positions) != 0: del self._positions self._positions = list() del self._tags self._tags = list() del self._data self._data = {q.name: [] for q in self.quantities}
def __repr__(self): return f"Particle(name={self.class_name}, quantities={self.quantities})"