improve landscape architecture. builder is separate file. cleaned up hardcoded defaults

This commit is contained in:
Pim Nelissen
2026-02-25 14:26:49 +01:00
parent a74ea765d7
commit 39572da682
4 changed files with 200 additions and 192 deletions

View File

@ -1,13 +0,0 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.landscape import director
from pg_rad.landscape import landscape
from pg_rad.landscape import config_parser
from pg_rad.landscape.director import (LandscapeDirector,)
from pg_rad.landscape.landscape import (Landscape, LandscapeBuilder,)
from pg_rad.landscape.config_parser import ConfigParser
__all__ = ['Landscape', 'LandscapeBuilder', 'LandscapeDirector', 'director',
'landscape', 'config_parser', 'ConfigParser']

View File

@ -0,0 +1,172 @@
import logging
from typing import Self
from .landscape import Landscape
from pg_rad.dataloader.dataloader import load_data
from pg_rad.objects.sources import PointSource
from pg_rad.exceptions.exceptions import OutOfBoundsError
from pg_rad.utils.projection import rel_to_abs_source_position
from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec
)
from pg_rad.path.path import Path, path_from_RT90
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
logger = logging.getLogger(__name__)
class LandscapeBuilder:
def __init__(self, name: str = "Unnamed landscape"):
self.name = name
self._path = None
self._point_sources = []
self._size = None
self._air_density = 1.243
logger.debug(f"LandscapeBuilder initialized: {self.name}")
def set_air_density(self, air_density: float | None) -> Self:
"""Set the air density of the world."""
if air_density and isinstance(air_density, float):
self._air_density = air_density
return self
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def get_path(self):
return self._path
def set_path_from_segments(
self,
sim_spec: SimulationSpec
):
segments = sim_spec.path.segments
types = [s.type for s in segments]
lengths = [s.length for s in segments]
angles = [s.angle for s in segments]
sg = SegmentedRoadGenerator(
length=lengths,
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
velocity=sim_spec.runtime.speed,
seed=sim_spec.options.seed
)
x, y = sg.generate(
segments=types,
lengths=lengths,
angles=angles
)
self._path = Path(list(zip(x, y)))
self._fit_landscape_to_path()
return self
def set_path_from_experimental_data(
self,
spec: CSVPathSpec
) -> Self:
df = load_data(spec.file)
self._path = path_from_RT90(
df=df,
east_col=spec.east_col_name,
north_col=spec.north_col_name,
z=spec.z
)
self._fit_landscape_to_path()
return self
def set_point_sources(
self,
*sources: AbsolutePointSourceSpec | RelativePointSourceSpec
):
"""Add one or more point sources to the world.
Args:
*sources (AbsolutePointSourceSpec | RelativePointSourceSpec):
One or more sources, passed as SourceSpecs tuple
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
for s in sources:
if isinstance(s, AbsolutePointSourceSpec):
pos = (s.x, s.y, s.z)
elif isinstance(s, RelativePointSourceSpec):
path = self.get_path()
pos = rel_to_abs_source_position(
x_list=path.x_list,
y_list=path.y_list,
path_z=path.z,
along_path=s.along_path,
side=s.side,
dist_from_path=s.dist_from_path)
if any(
p < 0 or p >= s for p, s in zip(pos, self._size)
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources.append(PointSource(
activity_MBq=s.activity_MBq,
isotope=s.isotope,
position=pos,
name=s.name
))
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
2) _size is too small to contain the path."""
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.debug("Because no Landscape size was set, "
"it will now set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Landscape size will be expanded to accommodate path."
)
max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size))
def build(self):
landscape = Landscape(
name=self.name,
path=self._path,
point_sources=self._point_sources,
size=self._size,
air_density=self._air_density
)
logger.info(f"Landscape built successfully: {landscape.name}")
return landscape

View File

@ -2,10 +2,13 @@ from importlib.resources import files
import logging import logging
from pg_rad.configs.filepaths import TEST_EXP_DATA from pg_rad.configs.filepaths import TEST_EXP_DATA
from .landscape import LandscapeBuilder from .builder import LandscapeBuilder
from .config_parser import ConfigParser from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
ProceduralPathSpec
)
from pg_rad.objects.sources import PointSource from pg_rad.objects.sources import PointSource
from pg_rad.utils.positional import rel_to_abs_source_position
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,7 +21,11 @@ class LandscapeDirector:
@staticmethod @staticmethod
def build_test_landscape(): def build_test_landscape():
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA) fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
source = PointSource(activity=100E9, isotope="CS137", pos=(0, 0, 0)) source = PointSource(
activity_MBq=100E9,
isotope="CS137",
position=(0, 0, 0)
)
lb = LandscapeBuilder("Test landscape") lb = LandscapeBuilder("Test landscape")
lb.set_air_density(1.243) lb.set_air_density(1.243)
lb.set_path_from_experimental_data(fp, z=0) lb.set_path_from_experimental_data(fp, z=0)
@ -27,36 +34,16 @@ class LandscapeDirector:
return landscape return landscape
@staticmethod @staticmethod
def build_from_config(path_to_config): def build_from_config(config: SimulationSpec):
conf = ConfigParser(path_to_config) lb = LandscapeBuilder(config.metadata.name)
conf.parse() lb.set_air_density(config.options.air_density)
lb = LandscapeBuilder(conf.name) if isinstance(config.path, CSVPathSpec):
lb.set_path_from_experimental_data(spec=config.path)
if conf.path_type == 'csv': elif isinstance(config.path, ProceduralPathSpec):
lb.set_path_from_experimental_data(**conf.path)
elif conf.path_type == 'procedural':
lb.set_path_from_segments( lb.set_path_from_segments(
speed=conf.speed, sim_spec=config,
acquisition_time=conf.acquisition_time,
**conf.path
) )
lb.set_point_sources(*config.point_sources)
sources = []
for s_name, s_params in conf.sources.items():
if isinstance(s_params['position'], dict):
path = lb.get_path()
x_abs, y_abs, z_abs = rel_to_abs_source_position(
x_list=path.x_list,
y_list=path.y_list,
path_z=path.z,
**s_params['position']
)
s_params['position'] = [x_abs, y_abs, z_abs]
sources.append(PointSource(name=s_name, **s_params))
lb.set_point_sources(*sources)
landscape = lb.build() landscape = lb.build()
return landscape return landscape

View File

@ -1,12 +1,7 @@
import logging import logging
from typing import List, Self
from pg_rad.dataloader.dataloader import load_data from pg_rad.path.path import Path
from pg_rad.exceptions.exceptions import OutOfBoundsError
from pg_rad.objects.sources import PointSource from pg_rad.objects.sources import PointSource
from pg_rad.path.path import Path, path_from_RT90
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -20,22 +15,18 @@ class Landscape:
self, self,
name: str, name: str,
path: Path, path: Path,
point_sources: list[PointSource] = [], air_density: float,
size: tuple[int, int, int] = [500, 500, 50], point_sources: list[PointSource],
air_density: float = 1.243 size: tuple[int, int, int]
): ):
"""Initialize a landscape. """Initialize a landscape.
Args: Args:
path (Path): A Path object. name (str): Name of the landscape.
point_sources (list[PointSource], optional): List of point sources. path (Path): The path of the detector.
air_density (float, optional): Air density in kg/m^3. air_density (float): Air density in kg/m^3.
Defaults to 1.243. point_sources (list[PointSource]): List of point sources.
size (tuple[int, int, int], optional): (x,y,z) dimensions of world size (tuple[int, int, int]): Size of the world.
in meters. Defaults to [500, 500, 50].
Raises:
TypeError: _description_
""" """
self.name = name self.name = name
@ -45,132 +36,3 @@ class Landscape:
self.air_density = air_density self.air_density = air_density
logger.debug(f"Landscape created: {self.name}") logger.debug(f"Landscape created: {self.name}")
class LandscapeBuilder:
def __init__(self, name: str = "Unnamed landscape"):
self.name = name
self._path = None
self._point_sources = []
self._size = None
self._air_density = None
logger.debug(f"LandscapeBuilder initialized: {self.name}")
def set_air_density(self, air_density) -> Self:
"""Set the air density of the world."""
self._air_density = air_density
return self
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def get_path(self):
return self._path
def set_path_from_segments(
self,
length: int | float,
speed: int | float,
acquisition_time: int,
segments: List,
angles: List,
alpha: int | float = None
):
sg = SegmentedRoadGenerator(
length=length,
ds=speed*acquisition_time,
velocity=speed
)
x, y = sg.generate(
segments=segments
)
self._path = Path(list(zip(x, y)))
self._fit_landscape_to_path()
return self
def set_path_from_experimental_data(
self,
file: str,
z: int,
east_col_name: str = "East",
north_col_name: str = "North"
) -> Self:
df = load_data(file)
self._path = path_from_RT90(
df=df,
east_col=east_col_name,
north_col=north_col_name,
z=z
)
self._fit_landscape_to_path()
return self
def set_point_sources(self, *sources):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
if any(
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
for source in sources
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources = sources
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
2) _size is too small to contain the path."""
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.debug("Because no Landscape size was set, "
"it will now set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Landscape size will be expanded to accommodate path."
)
max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size))
def build(self):
landscape = Landscape(
name=self.name,
path=self._path,
point_sources=self._point_sources,
size=self._size,
air_density=self._air_density
)
logger.info(f"Landscape built successfully: {landscape.name}")
return landscape