Improve error handling. Add alignment feature for point sources

This commit is contained in:
Pim Nelissen
2026-03-03 21:01:51 +01:00
parent 7612f74bcb
commit 1c8cc41e3c
5 changed files with 113 additions and 26 deletions

View File

@ -31,3 +31,7 @@ class DimensionError(ValueError):
class InvalidIsotopeError(ValueError): class InvalidIsotopeError(ValueError):
"""Raised if attempting to load an isotope that is not valid.""" """Raised if attempting to load an isotope that is not valid."""
class InvalidConfigValueError(ValueError):
"""Raised if a config key has an incorrect type or value."""

View File

@ -4,7 +4,11 @@ from typing import Any, Dict, List, Union
import yaml import yaml
from pg_rad.exceptions.exceptions import MissingConfigKeyError, DimensionError from pg_rad.exceptions.exceptions import (
MissingConfigKeyError,
DimensionError,
InvalidConfigValueError
)
from pg_rad.configs import defaults from pg_rad.configs import defaults
from .specs import ( from .specs import (
@ -14,7 +18,7 @@ from .specs import (
PathSpec, PathSpec,
ProceduralPathSpec, ProceduralPathSpec,
CSVPathSpec, CSVPathSpec,
SourceSpec, PointSourceSpec,
AbsolutePointSourceSpec, AbsolutePointSourceSpec,
RelativePointSourceSpec, RelativePointSourceSpec,
DetectorSpec, DetectorSpec,
@ -98,19 +102,23 @@ class ConfigParser:
def _parse_options(self) -> SimulationOptionsSpec: def _parse_options(self) -> SimulationOptionsSpec:
options = self.config.get("options", {}) options = self.config.get("options", {})
allowed = {"air_density", "seed"} allowed = {"air_density_kg_per_m3", "seed"}
self._warn_unknown_keys( self._warn_unknown_keys(
section="options", section="options",
provided=set(options.keys()), provided=set(options.keys()),
allowed=allowed, allowed=allowed,
) )
air_density = options.get("air_density", defaults.DEFAULT_AIR_DENSITY) air_density = options.get(
"air_density_kg_per_m3",
defaults.DEFAULT_AIR_DENSITY
)
seed = options.get("seed") seed = options.get("seed")
if not isinstance(air_density, float) or air_density <= 0: if not isinstance(air_density, float) or air_density <= 0:
raise ValueError( raise ValueError(
"options.air_density must be a positive float in kg/m^3." "options.air_density_kg_per_m3 must be a positive float "
"in kg/m^3."
) )
if ( if (
seed is not None or seed is not None or
@ -230,9 +238,9 @@ class ConfigParser:
def _is_turn(segment_type: str) -> bool: def _is_turn(segment_type: str) -> bool:
return segment_type in {"turn_left", "turn_right"} return segment_type in {"turn_left", "turn_right"}
def _parse_point_sources(self) -> List[SourceSpec]: def _parse_point_sources(self) -> List[PointSourceSpec]:
source_dict = self.config.get("sources", {}) source_dict = self.config.get("sources", {})
specs: List[SourceSpec] = [] specs: List[PointSourceSpec] = []
for name, params in source_dict.items(): for name, params in source_dict.items():
@ -245,7 +253,7 @@ class ConfigParser:
isotope = params.get("isotope") isotope = params.get("isotope")
if not isinstance(activity, int | float) or activity <= 0: if not isinstance(activity, int | float) or activity <= 0:
raise ValueError( raise InvalidConfigValueError(
f"sources.{name}.activity_MBq must be positive value " f"sources.{name}.activity_MBq must be positive value "
"in MegaBequerels." "in MegaBequerels."
) )
@ -270,6 +278,16 @@ class ConfigParser:
) )
elif isinstance(position, dict): elif isinstance(position, dict):
alignment = position.get("acquisition_alignment")
if alignment not in {'best', 'worst', None}:
raise InvalidConfigValueError(
f"sources.{name}.acquisition_alignment must be "
"'best' or 'worst', with 'best' aligning source "
f"{name} in the middle of the two nearest acquisition "
"points, and 'worst' aligning exactly perpendicular "
"to the nearest acquisition point."
)
specs.append( specs.append(
RelativePointSourceSpec( RelativePointSourceSpec(
name=name, name=name,
@ -278,12 +296,13 @@ class ConfigParser:
along_path=float(position["along_path"]), along_path=float(position["along_path"]),
dist_from_path=float(position["dist_from_path"]), dist_from_path=float(position["dist_from_path"]),
side=position["side"], side=position["side"],
z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT) z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT),
alignment=alignment
) )
) )
else: else:
raise ValueError( raise InvalidConfigValueError(
f"Invalid position format for source '{name}'." f"Invalid position format for source '{name}'."
) )
@ -295,7 +314,7 @@ class ConfigParser:
missing = required - det_dict.keys() missing = required - det_dict.keys()
if missing: if missing:
raise MissingConfigKeyError(missing) raise MissingConfigKeyError("detector", missing)
name = det_dict.get("name") name = det_dict.get("name")
is_isotropic = det_dict.get("is_isotropic") is_isotropic = det_dict.get("is_isotropic")
@ -303,19 +322,23 @@ class ConfigParser:
default_detectors = defaults.DETECTOR_EFFICIENCIES default_detectors = defaults.DETECTOR_EFFICIENCIES
if eff is None and name in default_detectors.keys(): if name in default_detectors.keys() and not eff:
eff = default_detectors[name] eff = default_detectors[name]
elif eff is not None: elif eff:
pass pass
else: else:
raise ValueError( raise ValueError(
f"The detector {name} is not in the library, and no " f"The detector {name} not found in library. Either "
"efficiency was defined. Either specify detector efficiency " f"specify {name}.efficiency or "
"or choose one from the following list: " "choose a detector from the following list: "
f"{default_detectors.keys()}" f"{default_detectors.keys()}."
) )
return DetectorSpec(name=name, eff=eff, is_isotropic=is_isotropic) return DetectorSpec(
name=name,
efficiency=eff,
is_isotropic=is_isotropic
)
def _warn_unknown_keys(self, section: str, provided: set, allowed: set): def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
unknown = provided - allowed unknown = provided - allowed

View File

@ -1,5 +1,6 @@
from abc import ABC from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal
@dataclass @dataclass
@ -42,31 +43,32 @@ class CSVPathSpec(PathSpec):
@dataclass @dataclass
class SourceSpec(ABC): class PointSourceSpec(ABC):
activity_MBq: float activity_MBq: float
isotope: str isotope: str
name: str name: str
@dataclass @dataclass
class AbsolutePointSourceSpec(SourceSpec): class AbsolutePointSourceSpec(PointSourceSpec):
x: float x: float
y: float y: float
z: float z: float
@dataclass @dataclass
class RelativePointSourceSpec(SourceSpec): class RelativePointSourceSpec(PointSourceSpec):
along_path: float along_path: float
dist_from_path: float dist_from_path: float
side: str side: str
z: float z: float
alignment: Literal["best", "worst"] | None
@dataclass @dataclass
class DetectorSpec: class DetectorSpec:
name: str name: str
eff: float | None efficiency: float
is_isotropic: bool is_isotropic: bool
@ -76,5 +78,5 @@ class SimulationSpec:
runtime: RuntimeSpec runtime: RuntimeSpec
options: SimulationOptionsSpec options: SimulationOptionsSpec
path: PathSpec path: PathSpec
point_sources: list[SourceSpec] point_sources: list[PointSourceSpec]
detector: DetectorSpec detector: DetectorSpec

View File

@ -1,5 +1,7 @@
import logging import logging
from typing import Self from typing import Literal, Self
import numpy as np
from .landscape import Landscape from .landscape import Landscape
from pg_rad.dataloader.dataloader import load_data from pg_rad.dataloader.dataloader import load_data
@ -113,11 +115,25 @@ class LandscapeBuilder:
pos = (s.x, s.y, s.z) pos = (s.x, s.y, s.z)
elif isinstance(s, RelativePointSourceSpec): elif isinstance(s, RelativePointSourceSpec):
path = self.get_path() path = self.get_path()
if s.alignment:
along_path = self._align_relative_source(
s.along_path,
path,
s.alignment
)
logger.info(
f"Because source {s.name} was set to align with path "
f"({s.alignment} alignment), it was moved to be at "
f"{along_path} m along the path from {s.along_path} m."
)
else:
along_path = s.along_path
pos = rel_to_abs_source_position( pos = rel_to_abs_source_position(
x_list=path.x_list, x_list=path.x_list,
y_list=path.y_list, y_list=path.y_list,
path_z=path.z, path_z=path.z,
along_path=s.along_path, along_path=along_path,
side=s.side, side=s.side,
dist_from_path=s.dist_from_path) dist_from_path=s.dist_from_path)
if any( if any(
@ -158,6 +174,48 @@ class LandscapeBuilder:
max_size = max(self._path.size) max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size)) self.set_landscape_size((max_size, max_size))
def _align_relative_source(
self,
along_path: float,
path: "Path",
mode: Literal["best", "worst"],
) -> tuple[float, float, float]:
"""Given the arc length at which the point source is placed,
align the source relative to the waypoints of the path. Here,
'best' means the point source is moved such that it is
perpendicular to the midpoint between two acuisition points.
'worst' means the point source is moved such that it is
perpendicular to the nearest acquisition point.
The distance to the path is not affected by this algorithm.
For more details on alignment, see
Fig. 4 and page 24 in Bukartas (2021).
Args:
along_path (float): Current arc length position of the source.
path (Path): The path to align to.
mode (Literal["best", "worst"]): Alignment mode.
Returns:
along_new (float): The updated arc length position.
"""
ds = np.hypot(
path.x_list[1] - path.x_list[0],
path.y_list[1] - path.y_list[0],
)
if mode == "worst":
along_new = round(along_path / ds) * ds
elif mode == "best":
along_new = (round(along_path / ds - 0.5) + 0.5) * ds
else:
raise ValueError(f"Unknown alignment mode: {mode}")
return along_new
def build(self): def build(self):
landscape = Landscape( landscape = Landscape(
name=self.name, name=self.name,

View File

@ -22,7 +22,7 @@ class LandscapeDirector:
def build_test_landscape(): def build_test_landscape():
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA) fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
source = PointSource( source = PointSource(
activity_MBq=100E9, activity_MBq=100E6,
isotope="CS137", isotope="CS137",
position=(0, 0, 0) position=(0, 0, 0)
) )