From 39572da6826a1e3c0e6596750cd48e31b6483604 Mon Sep 17 00:00:00 2001 From: Pim Nelissen Date: Wed, 25 Feb 2026 14:26:49 +0100 Subject: [PATCH] improve landscape architecture. builder is separate file. cleaned up hardcoded defaults --- src/pg_rad/landscape/__init__.py | 13 --- src/pg_rad/landscape/builder.py | 172 ++++++++++++++++++++++++++++++ src/pg_rad/landscape/director.py | 51 ++++----- src/pg_rad/landscape/landscape.py | 156 ++------------------------- 4 files changed, 200 insertions(+), 192 deletions(-) create mode 100644 src/pg_rad/landscape/builder.py diff --git a/src/pg_rad/landscape/__init__.py b/src/pg_rad/landscape/__init__.py index 9e24f3e..e69de29 100644 --- a/src/pg_rad/landscape/__init__.py +++ b/src/pg_rad/landscape/__init__.py @@ -1,13 +0,0 @@ -# do not expose internal logger when running mkinit -__ignore__ = ["logger"] - -from pg_rad.landscape import director -from pg_rad.landscape import landscape -from pg_rad.landscape import config_parser - -from pg_rad.landscape.director import (LandscapeDirector,) -from pg_rad.landscape.landscape import (Landscape, LandscapeBuilder,) -from pg_rad.landscape.config_parser import ConfigParser - -__all__ = ['Landscape', 'LandscapeBuilder', 'LandscapeDirector', 'director', - 'landscape', 'config_parser', 'ConfigParser'] diff --git a/src/pg_rad/landscape/builder.py b/src/pg_rad/landscape/builder.py new file mode 100644 index 0000000..6912abc --- /dev/null +++ b/src/pg_rad/landscape/builder.py @@ -0,0 +1,172 @@ +import logging +from typing import Self + +from .landscape import Landscape +from pg_rad.dataloader.dataloader import load_data +from pg_rad.objects.sources import PointSource +from pg_rad.exceptions.exceptions import OutOfBoundsError +from pg_rad.utils.projection import rel_to_abs_source_position +from pg_rad.inputparser.specs import ( + SimulationSpec, + CSVPathSpec, + AbsolutePointSourceSpec, + RelativePointSourceSpec +) + +from pg_rad.path.path import Path, path_from_RT90 + +from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator + + +logger = logging.getLogger(__name__) + + +class LandscapeBuilder: + def __init__(self, name: str = "Unnamed landscape"): + self.name = name + self._path = None + self._point_sources = [] + self._size = None + self._air_density = 1.243 + + logger.debug(f"LandscapeBuilder initialized: {self.name}") + + def set_air_density(self, air_density: float | None) -> Self: + """Set the air density of the world.""" + if air_density and isinstance(air_density, float): + self._air_density = air_density + return self + + def set_landscape_size(self, size: tuple[int, int, int]) -> Self: + """Set the size of the landscape in meters (x,y,z).""" + if self._path and any(p > s for p, s in zip(self._path.size, size)): + raise OutOfBoundsError( + "Cannot set landscape size smaller than the path." + ) + + self._size = size + logger.debug("Size of the landscape has been updated.") + + return self + + def get_path(self): + return self._path + + def set_path_from_segments( + self, + sim_spec: SimulationSpec + ): + + segments = sim_spec.path.segments + types = [s.type for s in segments] + lengths = [s.length for s in segments] + angles = [s.angle for s in segments] + + sg = SegmentedRoadGenerator( + length=lengths, + ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time, + velocity=sim_spec.runtime.speed, + seed=sim_spec.options.seed + ) + + x, y = sg.generate( + segments=types, + lengths=lengths, + angles=angles + ) + + self._path = Path(list(zip(x, y))) + self._fit_landscape_to_path() + return self + + def set_path_from_experimental_data( + self, + spec: CSVPathSpec + ) -> Self: + df = load_data(spec.file) + self._path = path_from_RT90( + df=df, + east_col=spec.east_col_name, + north_col=spec.north_col_name, + z=spec.z + ) + + self._fit_landscape_to_path() + + return self + + def set_point_sources( + self, + *sources: AbsolutePointSourceSpec | RelativePointSourceSpec + ): + """Add one or more point sources to the world. + + Args: + *sources (AbsolutePointSourceSpec | RelativePointSourceSpec): + One or more sources, passed as SourceSpecs tuple + Raises: + OutOfBoundsError: If any source is outside the boundaries of the + landscape. + """ + + for s in sources: + if isinstance(s, AbsolutePointSourceSpec): + pos = (s.x, s.y, s.z) + elif isinstance(s, RelativePointSourceSpec): + path = self.get_path() + pos = rel_to_abs_source_position( + x_list=path.x_list, + y_list=path.y_list, + path_z=path.z, + along_path=s.along_path, + side=s.side, + dist_from_path=s.dist_from_path) + if any( + p < 0 or p >= s for p, s in zip(pos, self._size) + ): + raise OutOfBoundsError( + "One or more sources attempted to " + "be placed outside the landscape." + ) + + self._point_sources.append(PointSource( + activity_MBq=s.activity_MBq, + isotope=s.isotope, + position=pos, + name=s.name + )) + + def _fit_landscape_to_path(self) -> None: + """The size of the landscape will be updated if + 1) _size is not set, or + 2) _size is too small to contain the path.""" + + needs_resize = ( + not self._size + or any(p > s for p, s in zip(self._path.size, self._size)) + ) + + if needs_resize: + if not self._size: + logger.debug("Because no Landscape size was set, " + "it will now set to path dimensions.") + else: + logger.warning( + "Path exceeds current landscape size. " + "Landscape size will be expanded to accommodate path." + ) + + max_size = max(self._path.size) + self.set_landscape_size((max_size, max_size)) + + def build(self): + landscape = Landscape( + name=self.name, + path=self._path, + point_sources=self._point_sources, + size=self._size, + air_density=self._air_density + ) + + logger.info(f"Landscape built successfully: {landscape.name}") + return landscape diff --git a/src/pg_rad/landscape/director.py b/src/pg_rad/landscape/director.py index 54b354b..cd163b4 100644 --- a/src/pg_rad/landscape/director.py +++ b/src/pg_rad/landscape/director.py @@ -2,10 +2,13 @@ from importlib.resources import files import logging from pg_rad.configs.filepaths import TEST_EXP_DATA -from .landscape import LandscapeBuilder -from .config_parser import ConfigParser +from .builder import LandscapeBuilder +from pg_rad.inputparser.specs import ( + SimulationSpec, + CSVPathSpec, + ProceduralPathSpec +) from pg_rad.objects.sources import PointSource -from pg_rad.utils.positional import rel_to_abs_source_position logger = logging.getLogger(__name__) @@ -18,7 +21,11 @@ class LandscapeDirector: @staticmethod def build_test_landscape(): fp = files('pg_rad.data').joinpath(TEST_EXP_DATA) - source = PointSource(activity=100E9, isotope="CS137", pos=(0, 0, 0)) + source = PointSource( + activity_MBq=100E9, + isotope="CS137", + position=(0, 0, 0) + ) lb = LandscapeBuilder("Test landscape") lb.set_air_density(1.243) lb.set_path_from_experimental_data(fp, z=0) @@ -27,36 +34,16 @@ class LandscapeDirector: return landscape @staticmethod - def build_from_config(path_to_config): - conf = ConfigParser(path_to_config) - conf.parse() + def build_from_config(config: SimulationSpec): + lb = LandscapeBuilder(config.metadata.name) + lb.set_air_density(config.options.air_density) - lb = LandscapeBuilder(conf.name) - - if conf.path_type == 'csv': - lb.set_path_from_experimental_data(**conf.path) - elif conf.path_type == 'procedural': + if isinstance(config.path, CSVPathSpec): + lb.set_path_from_experimental_data(spec=config.path) + elif isinstance(config.path, ProceduralPathSpec): lb.set_path_from_segments( - speed=conf.speed, - acquisition_time=conf.acquisition_time, - **conf.path + sim_spec=config, ) - - sources = [] - for s_name, s_params in conf.sources.items(): - if isinstance(s_params['position'], dict): - path = lb.get_path() - x_abs, y_abs, z_abs = rel_to_abs_source_position( - x_list=path.x_list, - y_list=path.y_list, - path_z=path.z, - **s_params['position'] - ) - - s_params['position'] = [x_abs, y_abs, z_abs] - - sources.append(PointSource(name=s_name, **s_params)) - - lb.set_point_sources(*sources) + lb.set_point_sources(*config.point_sources) landscape = lb.build() return landscape diff --git a/src/pg_rad/landscape/landscape.py b/src/pg_rad/landscape/landscape.py index 0f7f0fa..057c761 100644 --- a/src/pg_rad/landscape/landscape.py +++ b/src/pg_rad/landscape/landscape.py @@ -1,12 +1,7 @@ import logging -from typing import List, Self -from pg_rad.dataloader.dataloader import load_data -from pg_rad.exceptions.exceptions import OutOfBoundsError +from pg_rad.path.path import Path from pg_rad.objects.sources import PointSource -from pg_rad.path.path import Path, path_from_RT90 - -from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator logger = logging.getLogger(__name__) @@ -20,22 +15,18 @@ class Landscape: self, name: str, path: Path, - point_sources: list[PointSource] = [], - size: tuple[int, int, int] = [500, 500, 50], - air_density: float = 1.243 + air_density: float, + point_sources: list[PointSource], + size: tuple[int, int, int] ): """Initialize a landscape. Args: - path (Path): A Path object. - point_sources (list[PointSource], optional): List of point sources. - air_density (float, optional): Air density in kg/m^3. - Defaults to 1.243. - size (tuple[int, int, int], optional): (x,y,z) dimensions of world - in meters. Defaults to [500, 500, 50]. - - Raises: - TypeError: _description_ + name (str): Name of the landscape. + path (Path): The path of the detector. + air_density (float): Air density in kg/m^3. + point_sources (list[PointSource]): List of point sources. + size (tuple[int, int, int]): Size of the world. """ self.name = name @@ -45,132 +36,3 @@ class Landscape: self.air_density = air_density logger.debug(f"Landscape created: {self.name}") - - -class LandscapeBuilder: - def __init__(self, name: str = "Unnamed landscape"): - self.name = name - self._path = None - self._point_sources = [] - self._size = None - self._air_density = None - - logger.debug(f"LandscapeBuilder initialized: {self.name}") - - def set_air_density(self, air_density) -> Self: - """Set the air density of the world.""" - self._air_density = air_density - return self - - def set_landscape_size(self, size: tuple[int, int, int]) -> Self: - """Set the size of the landscape in meters (x,y,z).""" - if self._path and any(p > s for p, s in zip(self._path.size, size)): - raise OutOfBoundsError( - "Cannot set landscape size smaller than the path." - ) - - self._size = size - logger.debug("Size of the landscape has been updated.") - - return self - - def get_path(self): - return self._path - - def set_path_from_segments( - self, - length: int | float, - speed: int | float, - acquisition_time: int, - segments: List, - angles: List, - alpha: int | float = None - ): - sg = SegmentedRoadGenerator( - length=length, - ds=speed*acquisition_time, - velocity=speed - ) - - x, y = sg.generate( - segments=segments - ) - - self._path = Path(list(zip(x, y))) - self._fit_landscape_to_path() - return self - - def set_path_from_experimental_data( - self, - file: str, - z: int, - east_col_name: str = "East", - north_col_name: str = "North" - ) -> Self: - df = load_data(file) - self._path = path_from_RT90( - df=df, - east_col=east_col_name, - north_col=north_col_name, - z=z - ) - - self._fit_landscape_to_path() - - return self - - def set_point_sources(self, *sources): - """Add one or more point sources to the world. - - Args: - *sources (pg_rad.sources.PointSource): One or more sources, - passed as Source1, Source2, ... - Raises: - OutOfBoundsError: If any source is outside the boundaries of the - landscape. - """ - if any( - any(p < 0 or p >= s for p, s in zip(source.pos, self._size)) - for source in sources - ): - raise OutOfBoundsError( - "One or more sources attempted to " - "be placed outside the landscape." - ) - - self._point_sources = sources - - def _fit_landscape_to_path(self) -> None: - """The size of the landscape will be updated if - 1) _size is not set, or - 2) _size is too small to contain the path.""" - - needs_resize = ( - not self._size - or any(p > s for p, s in zip(self._path.size, self._size)) - ) - - if needs_resize: - if not self._size: - logger.debug("Because no Landscape size was set, " - "it will now set to path dimensions.") - else: - logger.warning( - "Path exceeds current landscape size. " - "Landscape size will be expanded to accommodate path." - ) - - max_size = max(self._path.size) - self.set_landscape_size((max_size, max_size)) - - def build(self): - landscape = Landscape( - name=self.name, - path=self._path, - point_sources=self._point_sources, - size=self._size, - air_density=self._air_density - ) - - logger.info(f"Landscape built successfully: {landscape.name}") - return landscape