mirror of
https://github.com/pim-n/pg-rad
synced 2026-03-23 21:58:12 +01:00
Compare commits
28 Commits
347ff4c102
...
92789c718f
| Author | SHA1 | Date | |
|---|---|---|---|
| 92789c718f | |||
| 176baa543a | |||
| dba1240e9f | |||
| 9a169da520 | |||
| 5c9841190f | |||
| 561fb1dca1 | |||
| 39572da682 | |||
| a74ea765d7 | |||
| 58de830a39 | |||
| ae0a038948 | |||
| 9944c06466 | |||
| 5615914c7e | |||
| e926338b69 | |||
| bab41c128b | |||
| c18f924348 | |||
| f1bed93ca7 | |||
| 3a92f79a43 | |||
| 80f7b71c38 | |||
| 61dc05073a | |||
| cca514a2ba | |||
| 265d3b0111 | |||
| 8f652875dc | |||
| 5fc806bd39 | |||
| d53f7c5e2f | |||
| fdc11b4076 | |||
| c2b05c63a8 | |||
| 5684525d0f | |||
| 3fd2eafb2a |
1
src/pg_rad/configs/__init__.py
Normal file
1
src/pg_rad/configs/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
__all__ = []
|
||||||
18
src/pg_rad/configs/defaults.py
Normal file
18
src/pg_rad/configs/defaults.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
# --- Physics defaults ---
|
||||||
|
DEFAULT_AIR_DENSITY = 1.243 # kg/m^3, Skåne
|
||||||
|
|
||||||
|
# --- Simulation defaults ---
|
||||||
|
DEFAULT_SEED = None
|
||||||
|
DEFAULT_ACQUISITION_TIME = 1.0
|
||||||
|
|
||||||
|
# --- Source defaults ---
|
||||||
|
DEFAULT_PATH_HEIGHT = 0.0
|
||||||
|
DEFAULT_SOURCE_HEIGHT = 0.0
|
||||||
|
|
||||||
|
# --- Segmented road defaults ---
|
||||||
|
DEFAULT_MIN_TURN_ANGLE = 30.
|
||||||
|
DEFAULT_MAX_TURN_ANGLE = 90.
|
||||||
|
|
||||||
|
DEFAULT_FRICTION_COEFF = 0.7 # dry asphalt
|
||||||
|
DEFAULT_GRAVITATIONAL_ACC = 9.81 # m/s^2
|
||||||
|
DEFAULT_ALPHA = 100.
|
||||||
@ -2,7 +2,6 @@ import logging
|
|||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from pg_rad.exceptions import DataLoadError, InvalidCSVError
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -12,18 +11,12 @@ def load_data(filename: str) -> pd.DataFrame:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
df = pd.read_csv(filename, delimiter=',')
|
df = pd.read_csv(filename, delimiter=',')
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
logger.error(f"File not found: {filename}")
|
logger.critical(e)
|
||||||
raise DataLoadError(f"File does not exist: {filename}") from e
|
raise
|
||||||
|
|
||||||
except pd.errors.ParserError as e:
|
except pd.errors.ParserError as e:
|
||||||
logger.error(f"Invalid CSV format: {filename}")
|
logger.critical(e)
|
||||||
raise InvalidCSVError(f"Invalid CSV file: {filename}") from e
|
raise
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(f"Unexpected error while loading {filename}")
|
|
||||||
raise DataLoadError("Unexpected error while loading data") from e
|
|
||||||
|
|
||||||
logger.debug(f"File loaded: {filename}")
|
logger.debug(f"File loaded: {filename}")
|
||||||
return df
|
return df
|
||||||
|
|||||||
@ -12,3 +12,22 @@ class InvalidCSVError(DataLoadError):
|
|||||||
|
|
||||||
class OutOfBoundsError(Exception):
|
class OutOfBoundsError(Exception):
|
||||||
"""Raised when an object is attempted to be placed out of bounds."""
|
"""Raised when an object is attempted to be placed out of bounds."""
|
||||||
|
|
||||||
|
|
||||||
|
class MissingConfigKeyError(KeyError):
|
||||||
|
"""Raised when a (nested) config key is missing in the config."""
|
||||||
|
def __init__(self, key, subkey=None):
|
||||||
|
if subkey:
|
||||||
|
self.message = f"Missing key in {key}: {', '.join(list(subkey))}"
|
||||||
|
else:
|
||||||
|
self.message = f"Missing key: {key}"
|
||||||
|
|
||||||
|
super().__init__(self.message)
|
||||||
|
|
||||||
|
|
||||||
|
class DimensionError(ValueError):
|
||||||
|
"""Raised if dimensions or coordinates do not match the system."""
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidIsotopeError(ValueError):
|
||||||
|
"""Raised if attempting to load an isotope that is not valid."""
|
||||||
|
|||||||
316
src/pg_rad/inputparser/parser.py
Normal file
316
src/pg_rad/inputparser/parser.py
Normal file
@ -0,0 +1,316 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, List, Union
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import MissingConfigKeyError, DimensionError
|
||||||
|
from pg_rad.configs import defaults
|
||||||
|
|
||||||
|
from .specs import (
|
||||||
|
MetadataSpec,
|
||||||
|
RuntimeSpec,
|
||||||
|
SimulationOptionsSpec,
|
||||||
|
SegmentSpec,
|
||||||
|
PathSpec,
|
||||||
|
ProceduralPathSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
SourceSpec,
|
||||||
|
AbsolutePointSourceSpec,
|
||||||
|
RelativePointSourceSpec,
|
||||||
|
SimulationSpec,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigParser:
|
||||||
|
_ALLOWED_ROOT_KEYS = {
|
||||||
|
"name",
|
||||||
|
"speed",
|
||||||
|
"acquisition_time",
|
||||||
|
"path",
|
||||||
|
"sources",
|
||||||
|
"options",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, config_source: str):
|
||||||
|
self.config = self._load_yaml(config_source)
|
||||||
|
|
||||||
|
def parse(self) -> SimulationSpec:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="global",
|
||||||
|
provided=set(self.config.keys()),
|
||||||
|
allowed=self._ALLOWED_ROOT_KEYS,
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata = self._parse_metadata()
|
||||||
|
runtime = self._parse_runtime()
|
||||||
|
options = self._parse_options()
|
||||||
|
path = self._parse_path()
|
||||||
|
sources = self._parse_point_sources()
|
||||||
|
|
||||||
|
return SimulationSpec(
|
||||||
|
metadata=metadata,
|
||||||
|
runtime=runtime,
|
||||||
|
options=options,
|
||||||
|
path=path,
|
||||||
|
point_sources=sources,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
|
||||||
|
if os.path.exists(config_source):
|
||||||
|
with open(config_source) as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
else:
|
||||||
|
data = yaml.safe_load(config_source)
|
||||||
|
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise ValueError(
|
||||||
|
"Provided path or string is not a valid YAML representation."
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _parse_metadata(self) -> MetadataSpec:
|
||||||
|
try:
|
||||||
|
return MetadataSpec(
|
||||||
|
name=self.config["name"]
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise MissingConfigKeyError("global", {"name"}) from e
|
||||||
|
|
||||||
|
def _parse_runtime(self) -> RuntimeSpec:
|
||||||
|
required = {"speed", "acquisition_time"}
|
||||||
|
missing = required - self.config.keys()
|
||||||
|
if missing:
|
||||||
|
raise MissingConfigKeyError("global", missing)
|
||||||
|
|
||||||
|
return RuntimeSpec(
|
||||||
|
speed=float(self.config["speed"]),
|
||||||
|
acquisition_time=float(self.config["acquisition_time"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_options(self) -> SimulationOptionsSpec:
|
||||||
|
options = self.config.get("options", {})
|
||||||
|
|
||||||
|
allowed = {"air_density", "seed"}
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="options",
|
||||||
|
provided=set(options.keys()),
|
||||||
|
allowed=allowed,
|
||||||
|
)
|
||||||
|
|
||||||
|
air_density = options.get("air_density", defaults.DEFAULT_AIR_DENSITY)
|
||||||
|
seed = options.get("seed")
|
||||||
|
|
||||||
|
if not isinstance(air_density, float) or air_density <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
"options.air_density must be a positive float in kg/m^3."
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
seed is not None or
|
||||||
|
(isinstance(seed, int) and seed <= 0)
|
||||||
|
):
|
||||||
|
raise ValueError("Seed must be a positive integer value.")
|
||||||
|
|
||||||
|
return SimulationOptionsSpec(
|
||||||
|
air_density=air_density,
|
||||||
|
seed=seed,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_path(self) -> PathSpec:
|
||||||
|
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
|
||||||
|
allowed_proc = {"segments", "length", "z"}
|
||||||
|
|
||||||
|
path = self.config.get("path")
|
||||||
|
if path is None:
|
||||||
|
raise MissingConfigKeyError("global", {"path"})
|
||||||
|
|
||||||
|
if not isinstance(path, dict):
|
||||||
|
raise ValueError("Path must be a dictionary.")
|
||||||
|
|
||||||
|
if "file" in path:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="path (csv)",
|
||||||
|
provided=set(path.keys()),
|
||||||
|
allowed=allowed_csv,
|
||||||
|
)
|
||||||
|
|
||||||
|
return CSVPathSpec(
|
||||||
|
file=path["file"],
|
||||||
|
east_col_name=path["east_col_name"],
|
||||||
|
north_col_name=path["north_col_name"],
|
||||||
|
z=path.get("z", 0),
|
||||||
|
)
|
||||||
|
|
||||||
|
if "segments" in path:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="path (procedural)",
|
||||||
|
provided=set(path.keys()),
|
||||||
|
allowed=allowed_proc,
|
||||||
|
)
|
||||||
|
return self._parse_procedural_path(path)
|
||||||
|
|
||||||
|
raise ValueError("Invalid path configuration.")
|
||||||
|
|
||||||
|
def _parse_procedural_path(
|
||||||
|
self,
|
||||||
|
path: Dict[str, Any]
|
||||||
|
) -> ProceduralPathSpec:
|
||||||
|
raw_segments = path.get("segments")
|
||||||
|
|
||||||
|
if not isinstance(raw_segments, List):
|
||||||
|
raise ValueError("path.segments must be a list of segments.")
|
||||||
|
|
||||||
|
raw_length = path.get("length")
|
||||||
|
|
||||||
|
if raw_length is None:
|
||||||
|
raise MissingConfigKeyError("path", {"length"})
|
||||||
|
|
||||||
|
if isinstance(raw_length, int | float):
|
||||||
|
raw_length = [float(raw_length)]
|
||||||
|
|
||||||
|
segments = self._process_segment_angles(raw_segments)
|
||||||
|
lengths = self._process_segment_lengths(raw_length, len(segments))
|
||||||
|
resolved_segments = self._combine_segments_lengths(segments, lengths)
|
||||||
|
|
||||||
|
return ProceduralPathSpec(
|
||||||
|
segments=resolved_segments,
|
||||||
|
z=path.get("z", defaults.DEFAULT_PATH_HEIGHT),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_segment_angles(
|
||||||
|
self,
|
||||||
|
raw_segments: List[Union[str, dict]]
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
|
||||||
|
normalized = []
|
||||||
|
|
||||||
|
for segment in raw_segments:
|
||||||
|
|
||||||
|
if isinstance(segment, str):
|
||||||
|
normalized.append({"type": segment, "angle": None})
|
||||||
|
|
||||||
|
elif isinstance(segment, dict):
|
||||||
|
if len(segment) != 1:
|
||||||
|
raise ValueError("Invalid segment definition.")
|
||||||
|
seg_type, angle = list(segment.items())[0]
|
||||||
|
normalized.append({"type": seg_type, "angle": angle})
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid segment entry format.")
|
||||||
|
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
def _process_segment_lengths(
|
||||||
|
self,
|
||||||
|
raw_length_list: List[Union[int, float]],
|
||||||
|
num_segments: int
|
||||||
|
) -> List[float]:
|
||||||
|
num_lengths = len(raw_length_list)
|
||||||
|
|
||||||
|
if num_lengths == num_segments or num_lengths == 1:
|
||||||
|
return raw_length_list
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Path length must either be a single number or a list with "
|
||||||
|
"number of elements equal to the number of segments."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _combine_segments_lengths(
|
||||||
|
self,
|
||||||
|
segments: List[Dict[str, Any]],
|
||||||
|
lengths: List[float],
|
||||||
|
) -> List[SegmentSpec]:
|
||||||
|
|
||||||
|
resolved = []
|
||||||
|
|
||||||
|
for seg, length in zip(segments, lengths):
|
||||||
|
angle = seg["angle"]
|
||||||
|
|
||||||
|
if angle is not None and not self._is_turn(seg["type"]):
|
||||||
|
raise ValueError(
|
||||||
|
f"A {seg['type']} segment does not support an angle."
|
||||||
|
)
|
||||||
|
|
||||||
|
resolved.append(
|
||||||
|
SegmentSpec(
|
||||||
|
type=seg["type"],
|
||||||
|
length=length,
|
||||||
|
angle=angle,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_turn(segment_type: str) -> bool:
|
||||||
|
return segment_type in {"turn_left", "turn_right"}
|
||||||
|
|
||||||
|
def _parse_point_sources(self) -> List[SourceSpec]:
|
||||||
|
source_dict = self.config.get("sources", {})
|
||||||
|
specs: List[SourceSpec] = []
|
||||||
|
|
||||||
|
for name, params in source_dict.items():
|
||||||
|
|
||||||
|
required = {"activity_MBq", "isotope", "position"}
|
||||||
|
missing = required - params.keys()
|
||||||
|
if missing:
|
||||||
|
raise MissingConfigKeyError(name, missing)
|
||||||
|
|
||||||
|
activity = params.get("activity_MBq")
|
||||||
|
isotope = params.get("isotope")
|
||||||
|
|
||||||
|
if not isinstance(activity, int | float) or activity <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
f"sources.{name}.activity_MBq must be positive value "
|
||||||
|
"in MegaBequerels."
|
||||||
|
)
|
||||||
|
|
||||||
|
position = params.get("position")
|
||||||
|
|
||||||
|
if isinstance(position, list):
|
||||||
|
if len(position) != 3:
|
||||||
|
raise DimensionError(
|
||||||
|
"Absolute position must be [x, y, z]."
|
||||||
|
)
|
||||||
|
|
||||||
|
specs.append(
|
||||||
|
AbsolutePointSourceSpec(
|
||||||
|
name=name,
|
||||||
|
activity_MBq=float(activity),
|
||||||
|
isotope=isotope,
|
||||||
|
x=float(position[0]),
|
||||||
|
y=float(position[1]),
|
||||||
|
z=float(position[2]),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
elif isinstance(position, dict):
|
||||||
|
specs.append(
|
||||||
|
RelativePointSourceSpec(
|
||||||
|
name=name,
|
||||||
|
activity_MBq=float(activity),
|
||||||
|
isotope=isotope,
|
||||||
|
along_path=float(position["along_path"]),
|
||||||
|
dist_from_path=float(position["dist_from_path"]),
|
||||||
|
side=position["side"],
|
||||||
|
z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid position format for source '{name}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
return specs
|
||||||
|
|
||||||
|
def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
|
||||||
|
unknown = provided - allowed
|
||||||
|
if unknown:
|
||||||
|
logger.warning(
|
||||||
|
f"Unknown keys in '{section}' section: {unknown}"
|
||||||
|
)
|
||||||
76
src/pg_rad/inputparser/specs.py
Normal file
76
src/pg_rad/inputparser/specs.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
from abc import ABC
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MetadataSpec:
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RuntimeSpec:
|
||||||
|
speed: float
|
||||||
|
acquisition_time: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationOptionsSpec:
|
||||||
|
air_density: float = 1.243
|
||||||
|
seed: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SegmentSpec:
|
||||||
|
type: str
|
||||||
|
length: float
|
||||||
|
angle: float | None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PathSpec(ABC):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ProceduralPathSpec(PathSpec):
|
||||||
|
segments: list[SegmentSpec]
|
||||||
|
z: int | float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CSVPathSpec(PathSpec):
|
||||||
|
file: str
|
||||||
|
east_col_name: str
|
||||||
|
north_col_name: str
|
||||||
|
z: int | float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SourceSpec(ABC):
|
||||||
|
activity_MBq: float
|
||||||
|
isotope: str
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AbsolutePointSourceSpec(SourceSpec):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RelativePointSourceSpec(SourceSpec):
|
||||||
|
along_path: float
|
||||||
|
dist_from_path: float
|
||||||
|
side: str
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationSpec:
|
||||||
|
metadata: MetadataSpec
|
||||||
|
runtime: RuntimeSpec
|
||||||
|
options: SimulationOptionsSpec
|
||||||
|
path: PathSpec
|
||||||
|
point_sources: list[SourceSpec]
|
||||||
@ -2,9 +2,8 @@
|
|||||||
__ignore__ = ["logger"]
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
from pg_rad.isotopes import isotope
|
from pg_rad.isotopes import isotope
|
||||||
from pg_rad.isotopes import presets
|
|
||||||
|
|
||||||
from pg_rad.isotopes.isotope import (Isotope,)
|
from pg_rad.isotopes.isotope import (CS137, Isotope, get_isotope,
|
||||||
from pg_rad.isotopes.presets import (CS137,)
|
preset_isotopes,)
|
||||||
|
|
||||||
__all__ = ['CS137', 'Isotope', 'isotope', 'presets']
|
__all__ = ['CS137', 'Isotope', 'get_isotope', 'isotope', 'preset_isotopes']
|
||||||
|
|||||||
@ -1,4 +1,7 @@
|
|||||||
from pg_rad.physics import get_mass_attenuation_coeff
|
from typing import Dict, Type
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import InvalidIsotopeError
|
||||||
|
from pg_rad.physics.attenuation import get_mass_attenuation_coeff
|
||||||
|
|
||||||
|
|
||||||
class Isotope:
|
class Isotope:
|
||||||
@ -25,3 +28,24 @@ class Isotope:
|
|||||||
self.E = E
|
self.E = E
|
||||||
self.b = b
|
self.b = b
|
||||||
self.mu_mass_air = get_mass_attenuation_coeff(E / 1000)
|
self.mu_mass_air = get_mass_attenuation_coeff(E / 1000)
|
||||||
|
|
||||||
|
|
||||||
|
class CS137(Isotope):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
name="Cs-137",
|
||||||
|
E=661.66,
|
||||||
|
b=0.851
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
preset_isotopes: Dict[str, Type[Isotope]] = {
|
||||||
|
"CS137": CS137
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_isotope(isotope_str: str) -> Isotope:
|
||||||
|
"""Lazy factory function to create isotope objects."""
|
||||||
|
if isotope_str not in preset_isotopes:
|
||||||
|
raise InvalidIsotopeError(f"Unknown isotope: {isotope_str}")
|
||||||
|
return preset_isotopes[isotope_str]()
|
||||||
|
|||||||
@ -1,10 +0,0 @@
|
|||||||
from .isotope import Isotope
|
|
||||||
|
|
||||||
|
|
||||||
class CS137(Isotope):
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__(
|
|
||||||
name="Cs-137",
|
|
||||||
E=661.66,
|
|
||||||
b=0.851
|
|
||||||
)
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
# do not expose internal logger when running mkinit
|
|
||||||
__ignore__ = ["logger"]
|
|
||||||
|
|
||||||
from pg_rad.landscape import director
|
|
||||||
from pg_rad.landscape import landscape
|
|
||||||
|
|
||||||
from pg_rad.landscape.director import (LandscapeDirector,)
|
|
||||||
from pg_rad.landscape.landscape import (Landscape, LandscapeBuilder,)
|
|
||||||
|
|
||||||
__all__ = ['Landscape', 'LandscapeBuilder', 'LandscapeDirector', 'director',
|
|
||||||
'landscape']
|
|
||||||
|
|||||||
172
src/pg_rad/landscape/builder.py
Normal file
172
src/pg_rad/landscape/builder.py
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Self
|
||||||
|
|
||||||
|
from .landscape import Landscape
|
||||||
|
from pg_rad.dataloader.dataloader import load_data
|
||||||
|
from pg_rad.objects.sources import PointSource
|
||||||
|
from pg_rad.exceptions.exceptions import OutOfBoundsError
|
||||||
|
from pg_rad.utils.projection import rel_to_abs_source_position
|
||||||
|
from pg_rad.inputparser.specs import (
|
||||||
|
SimulationSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
AbsolutePointSourceSpec,
|
||||||
|
RelativePointSourceSpec
|
||||||
|
)
|
||||||
|
|
||||||
|
from pg_rad.path.path import Path, path_from_RT90
|
||||||
|
|
||||||
|
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class LandscapeBuilder:
|
||||||
|
def __init__(self, name: str = "Unnamed landscape"):
|
||||||
|
self.name = name
|
||||||
|
self._path = None
|
||||||
|
self._point_sources = []
|
||||||
|
self._size = None
|
||||||
|
self._air_density = 1.243
|
||||||
|
|
||||||
|
logger.debug(f"LandscapeBuilder initialized: {self.name}")
|
||||||
|
|
||||||
|
def set_air_density(self, air_density: float | None) -> Self:
|
||||||
|
"""Set the air density of the world."""
|
||||||
|
if air_density and isinstance(air_density, float):
|
||||||
|
self._air_density = air_density
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
|
||||||
|
"""Set the size of the landscape in meters (x,y,z)."""
|
||||||
|
if self._path and any(p > s for p, s in zip(self._path.size, size)):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"Cannot set landscape size smaller than the path."
|
||||||
|
)
|
||||||
|
|
||||||
|
self._size = size
|
||||||
|
logger.debug("Size of the landscape has been updated.")
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_path(self):
|
||||||
|
return self._path
|
||||||
|
|
||||||
|
def set_path_from_segments(
|
||||||
|
self,
|
||||||
|
sim_spec: SimulationSpec
|
||||||
|
):
|
||||||
|
|
||||||
|
segments = sim_spec.path.segments
|
||||||
|
types = [s.type for s in segments]
|
||||||
|
lengths = [s.length for s in segments]
|
||||||
|
angles = [s.angle for s in segments]
|
||||||
|
|
||||||
|
sg = SegmentedRoadGenerator(
|
||||||
|
length=lengths,
|
||||||
|
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
|
||||||
|
velocity=sim_spec.runtime.speed,
|
||||||
|
seed=sim_spec.options.seed
|
||||||
|
)
|
||||||
|
|
||||||
|
x, y = sg.generate(
|
||||||
|
segments=types,
|
||||||
|
lengths=lengths,
|
||||||
|
angles=angles
|
||||||
|
)
|
||||||
|
|
||||||
|
self._path = Path(list(zip(x, y)))
|
||||||
|
self._fit_landscape_to_path()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_path_from_experimental_data(
|
||||||
|
self,
|
||||||
|
spec: CSVPathSpec
|
||||||
|
) -> Self:
|
||||||
|
df = load_data(spec.file)
|
||||||
|
self._path = path_from_RT90(
|
||||||
|
df=df,
|
||||||
|
east_col=spec.east_col_name,
|
||||||
|
north_col=spec.north_col_name,
|
||||||
|
z=spec.z
|
||||||
|
)
|
||||||
|
|
||||||
|
self._fit_landscape_to_path()
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_point_sources(
|
||||||
|
self,
|
||||||
|
*sources: AbsolutePointSourceSpec | RelativePointSourceSpec
|
||||||
|
):
|
||||||
|
"""Add one or more point sources to the world.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
*sources (AbsolutePointSourceSpec | RelativePointSourceSpec):
|
||||||
|
One or more sources, passed as SourceSpecs tuple
|
||||||
|
Raises:
|
||||||
|
OutOfBoundsError: If any source is outside the boundaries of the
|
||||||
|
landscape.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for s in sources:
|
||||||
|
if isinstance(s, AbsolutePointSourceSpec):
|
||||||
|
pos = (s.x, s.y, s.z)
|
||||||
|
elif isinstance(s, RelativePointSourceSpec):
|
||||||
|
path = self.get_path()
|
||||||
|
pos = rel_to_abs_source_position(
|
||||||
|
x_list=path.x_list,
|
||||||
|
y_list=path.y_list,
|
||||||
|
path_z=path.z,
|
||||||
|
along_path=s.along_path,
|
||||||
|
side=s.side,
|
||||||
|
dist_from_path=s.dist_from_path)
|
||||||
|
if any(
|
||||||
|
p < 0 or p >= s for p, s in zip(pos, self._size)
|
||||||
|
):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"One or more sources attempted to "
|
||||||
|
"be placed outside the landscape."
|
||||||
|
)
|
||||||
|
|
||||||
|
self._point_sources.append(PointSource(
|
||||||
|
activity_MBq=s.activity_MBq,
|
||||||
|
isotope=s.isotope,
|
||||||
|
position=pos,
|
||||||
|
name=s.name
|
||||||
|
))
|
||||||
|
|
||||||
|
def _fit_landscape_to_path(self) -> None:
|
||||||
|
"""The size of the landscape will be updated if
|
||||||
|
1) _size is not set, or
|
||||||
|
2) _size is too small to contain the path."""
|
||||||
|
|
||||||
|
needs_resize = (
|
||||||
|
not self._size
|
||||||
|
or any(p > s for p, s in zip(self._path.size, self._size))
|
||||||
|
)
|
||||||
|
|
||||||
|
if needs_resize:
|
||||||
|
if not self._size:
|
||||||
|
logger.debug("Because no Landscape size was set, "
|
||||||
|
"it will now set to path dimensions.")
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Path exceeds current landscape size. "
|
||||||
|
"Landscape size will be expanded to accommodate path."
|
||||||
|
)
|
||||||
|
|
||||||
|
max_size = max(self._path.size)
|
||||||
|
self.set_landscape_size((max_size, max_size))
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
landscape = Landscape(
|
||||||
|
name=self.name,
|
||||||
|
path=self._path,
|
||||||
|
point_sources=self._point_sources,
|
||||||
|
size=self._size,
|
||||||
|
air_density=self._air_density
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Landscape built successfully: {landscape.name}")
|
||||||
|
return landscape
|
||||||
@ -2,22 +2,48 @@ from importlib.resources import files
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from pg_rad.configs.filepaths import TEST_EXP_DATA
|
from pg_rad.configs.filepaths import TEST_EXP_DATA
|
||||||
from pg_rad.isotopes import CS137
|
from .builder import LandscapeBuilder
|
||||||
from pg_rad.landscape.landscape import LandscapeBuilder
|
from pg_rad.inputparser.specs import (
|
||||||
from pg_rad.objects import PointSource
|
SimulationSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
ProceduralPathSpec
|
||||||
|
)
|
||||||
|
from pg_rad.objects.sources import PointSource
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LandscapeDirector:
|
class LandscapeDirector:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.logger = logging.getLogger(__name__)
|
logger.debug("LandscapeDirector initialized.")
|
||||||
self.logger.debug("LandscapeDirector initialized.")
|
|
||||||
|
|
||||||
def build_test_landscape(self):
|
@staticmethod
|
||||||
|
def build_test_landscape():
|
||||||
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
|
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
|
||||||
source = PointSource(activity=100E9, isotope=CS137(), pos=(0, 0, 0))
|
source = PointSource(
|
||||||
|
activity_MBq=100E9,
|
||||||
|
isotope="CS137",
|
||||||
|
position=(0, 0, 0)
|
||||||
|
)
|
||||||
lb = LandscapeBuilder("Test landscape")
|
lb = LandscapeBuilder("Test landscape")
|
||||||
lb.set_air_density(1.243)
|
lb.set_air_density(1.243)
|
||||||
lb.set_path_from_experimental_data(fp, z=0)
|
lb.set_path_from_experimental_data(fp, z=0)
|
||||||
lb.set_point_sources(source)
|
lb.set_point_sources(source)
|
||||||
landscape = lb.build()
|
landscape = lb.build()
|
||||||
return landscape
|
return landscape
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_from_config(config: SimulationSpec):
|
||||||
|
lb = LandscapeBuilder(config.metadata.name)
|
||||||
|
lb.set_air_density(config.options.air_density)
|
||||||
|
|
||||||
|
if isinstance(config.path, CSVPathSpec):
|
||||||
|
lb.set_path_from_experimental_data(spec=config.path)
|
||||||
|
elif isinstance(config.path, ProceduralPathSpec):
|
||||||
|
lb.set_path_from_segments(
|
||||||
|
sim_spec=config,
|
||||||
|
)
|
||||||
|
lb.set_point_sources(*config.point_sources)
|
||||||
|
landscape = lb.build()
|
||||||
|
return landscape
|
||||||
|
|||||||
@ -1,11 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import Self
|
|
||||||
|
|
||||||
from pg_rad.dataloader import load_data
|
from pg_rad.path.path import Path
|
||||||
from pg_rad.exceptions import OutOfBoundsError
|
from pg_rad.objects.sources import PointSource
|
||||||
from pg_rad.objects import PointSource
|
|
||||||
from pg_rad.path import Path, path_from_RT90
|
|
||||||
from pg_rad.physics.fluence import phi_single_source
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -18,22 +15,18 @@ class Landscape:
|
|||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
path: Path,
|
path: Path,
|
||||||
point_sources: list[PointSource] = [],
|
air_density: float,
|
||||||
size: tuple[int, int, int] = [500, 500, 50],
|
point_sources: list[PointSource],
|
||||||
air_density: float = 1.243
|
size: tuple[int, int, int]
|
||||||
):
|
):
|
||||||
"""Initialize a landscape.
|
"""Initialize a landscape.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
path (Path): A Path object.
|
name (str): Name of the landscape.
|
||||||
point_sources (list[PointSource], optional): List of point sources.
|
path (Path): The path of the detector.
|
||||||
air_density (float, optional): Air density in kg/m^3.
|
air_density (float): Air density in kg/m^3.
|
||||||
Defaults to 1.243.
|
point_sources (list[PointSource]): List of point sources.
|
||||||
size (tuple[int, int, int], optional): (x,y,z) dimensions of world
|
size (tuple[int, int, int]): Size of the world.
|
||||||
in meters. Defaults to [500, 500, 50].
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
TypeError: _description_
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
@ -43,119 +36,3 @@ class Landscape:
|
|||||||
self.air_density = air_density
|
self.air_density = air_density
|
||||||
|
|
||||||
logger.debug(f"Landscape created: {self.name}")
|
logger.debug(f"Landscape created: {self.name}")
|
||||||
|
|
||||||
def calculate_fluence_at(self, pos: tuple):
|
|
||||||
total_phi = 0.
|
|
||||||
for source in self.point_sources:
|
|
||||||
r = source.distance_to(pos)
|
|
||||||
phi_source = phi_single_source(
|
|
||||||
r=r,
|
|
||||||
activity=source.activity,
|
|
||||||
branching_ratio=source.isotope.b,
|
|
||||||
mu_mass_air=source.isotope.mu_mass_air,
|
|
||||||
air_density=self.air_density
|
|
||||||
)
|
|
||||||
total_phi += phi_source
|
|
||||||
return total_phi
|
|
||||||
|
|
||||||
def calculate_fluence_along_path(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class LandscapeBuilder:
|
|
||||||
def __init__(self, name: str = "Unnamed landscape"):
|
|
||||||
self.name = name
|
|
||||||
self._path = None
|
|
||||||
self._point_sources = []
|
|
||||||
self._size = None
|
|
||||||
self._air_density = None
|
|
||||||
|
|
||||||
logger.debug(f"LandscapeBuilder initialized: {self.name}")
|
|
||||||
|
|
||||||
def set_air_density(self, air_density) -> Self:
|
|
||||||
"""Set the air density of the world."""
|
|
||||||
self._air_density = air_density
|
|
||||||
return self
|
|
||||||
|
|
||||||
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
|
|
||||||
"""Set the size of the landscape in meters (x,y,z)."""
|
|
||||||
if self._path and any(p > s for p, s in zip(self._path.size, size)):
|
|
||||||
raise OutOfBoundsError(
|
|
||||||
"Cannot set landscape size smaller than the path."
|
|
||||||
)
|
|
||||||
|
|
||||||
self._size = size
|
|
||||||
logger.debug("Size of the landscape has been updated.")
|
|
||||||
|
|
||||||
return self
|
|
||||||
|
|
||||||
def set_path_from_experimental_data(
|
|
||||||
self,
|
|
||||||
filename: str,
|
|
||||||
z: int,
|
|
||||||
east_col: str = "East",
|
|
||||||
north_col: str = "North"
|
|
||||||
) -> Self:
|
|
||||||
df = load_data(filename)
|
|
||||||
self._path = path_from_RT90(
|
|
||||||
df=df,
|
|
||||||
east_col=east_col,
|
|
||||||
north_col=north_col,
|
|
||||||
z=z
|
|
||||||
)
|
|
||||||
|
|
||||||
# The size of the landscape will be updated if
|
|
||||||
# 1) _size is not set, or
|
|
||||||
# 2) _size is too small to contain the path.
|
|
||||||
needs_resize = (
|
|
||||||
not self._size
|
|
||||||
or any(p > s for p, s in zip(self._path.size, self._size))
|
|
||||||
)
|
|
||||||
|
|
||||||
if needs_resize:
|
|
||||||
if not self._size:
|
|
||||||
logger.debug("Because no Landscape size was set, "
|
|
||||||
"it will now set to path dimensions.")
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
"Path exceeds current landscape size. "
|
|
||||||
"Landscape size will be expanded to accommodate path."
|
|
||||||
)
|
|
||||||
|
|
||||||
self.set_landscape_size(self._path.size)
|
|
||||||
|
|
||||||
return self
|
|
||||||
|
|
||||||
def set_point_sources(self, *sources):
|
|
||||||
"""Add one or more point sources to the world.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
*sources (pg_rad.sources.PointSource): One or more sources,
|
|
||||||
passed as Source1, Source2, ...
|
|
||||||
Raises:
|
|
||||||
OutOfBoundsError: If any source is outside the boundaries of the
|
|
||||||
landscape.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if any(
|
|
||||||
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
|
|
||||||
for source in sources
|
|
||||||
):
|
|
||||||
raise OutOfBoundsError(
|
|
||||||
"One or more sources attempted to "
|
|
||||||
"be placed outside the landscape."
|
|
||||||
)
|
|
||||||
|
|
||||||
self._point_sources = sources
|
|
||||||
|
|
||||||
def build(self):
|
|
||||||
landscape = Landscape(
|
|
||||||
name=self.name,
|
|
||||||
path=self._path,
|
|
||||||
point_sources=self._point_sources,
|
|
||||||
size=self._size,
|
|
||||||
air_density=self._air_density
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"Landscape built successfully: {landscape.name}")
|
|
||||||
return landscape
|
|
||||||
|
|||||||
@ -1,7 +1,21 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
from pg_rad.logger import setup_logger
|
from pandas.errors import ParserError
|
||||||
from pg_rad.landscape import LandscapeDirector
|
from yaml import YAMLError
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import (
|
||||||
|
MissingConfigKeyError,
|
||||||
|
OutOfBoundsError,
|
||||||
|
DimensionError,
|
||||||
|
InvalidIsotopeError
|
||||||
|
)
|
||||||
|
from pg_rad.logger.logger import setup_logger
|
||||||
|
from pg_rad.inputparser.parser import ConfigParser
|
||||||
|
from pg_rad.landscape.director import LandscapeDirector
|
||||||
|
from pg_rad.plotting.result_plotter import ResultPlotter
|
||||||
|
from pg_rad.simulator.engine import SimulationEngine
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -9,24 +23,102 @@ def main():
|
|||||||
prog="pg-rad",
|
prog="pg-rad",
|
||||||
description="Primary Gamma RADiation landscape tool"
|
description="Primary Gamma RADiation landscape tool"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--config",
|
||||||
|
help="Build from a config file."
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--test",
|
"--test",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="Load and run the test landscape"
|
help="Load and run the test landscape."
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--loglevel",
|
"--loglevel",
|
||||||
default="INFO",
|
default="INFO",
|
||||||
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--saveplot",
|
||||||
|
action="store_true",
|
||||||
|
help="Save the plot or not."
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
setup_logger(args.loglevel)
|
setup_logger(args.loglevel)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
if args.test:
|
if args.test:
|
||||||
landscape = LandscapeDirector().build_test_landscape()
|
test_yaml = """
|
||||||
print(landscape.name)
|
name: Test landscape
|
||||||
|
speed: 8.33
|
||||||
|
acquisition_time: 1
|
||||||
|
|
||||||
|
path:
|
||||||
|
length: 1000
|
||||||
|
segments:
|
||||||
|
- straight
|
||||||
|
|
||||||
|
sources:
|
||||||
|
test_source:
|
||||||
|
activity_MBq: 1000
|
||||||
|
position: [500, 100, 0]
|
||||||
|
isotope: CS137
|
||||||
|
"""
|
||||||
|
|
||||||
|
cp = ConfigParser(test_yaml).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
|
|
||||||
|
output = SimulationEngine(
|
||||||
|
landscape=landscape,
|
||||||
|
runtime_spec=cp.runtime,
|
||||||
|
sim_spec=cp.options
|
||||||
|
).simulate()
|
||||||
|
|
||||||
|
plotter = ResultPlotter(landscape, output)
|
||||||
|
plotter.plot()
|
||||||
|
|
||||||
|
elif args.config:
|
||||||
|
try:
|
||||||
|
cp = ConfigParser(args.config).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
|
|
||||||
|
output = SimulationEngine(
|
||||||
|
landscape=landscape,
|
||||||
|
runtime_spec=cp.runtime,
|
||||||
|
sim_spec=cp.options
|
||||||
|
).simulate()
|
||||||
|
|
||||||
|
plotter = ResultPlotter(landscape, output)
|
||||||
|
plotter.plot()
|
||||||
|
except (
|
||||||
|
MissingConfigKeyError,
|
||||||
|
KeyError,
|
||||||
|
YAMLError,
|
||||||
|
):
|
||||||
|
logger.critical(
|
||||||
|
"The provided config file is invalid. "
|
||||||
|
"Check the log above. You can consult the documentation for "
|
||||||
|
"an explanation of how to define a config file."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
except (
|
||||||
|
OutOfBoundsError,
|
||||||
|
DimensionError,
|
||||||
|
InvalidIsotopeError,
|
||||||
|
ValueError
|
||||||
|
) as e:
|
||||||
|
logger.critical(e)
|
||||||
|
logger.critical(
|
||||||
|
"One or more items in config are not specified correctly. "
|
||||||
|
"Please consult this log and fix the problem."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
except (
|
||||||
|
FileNotFoundError,
|
||||||
|
ParserError
|
||||||
|
):
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -2,6 +2,8 @@ from typing import Self
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import DimensionError
|
||||||
|
|
||||||
|
|
||||||
class BaseObject:
|
class BaseObject:
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -21,7 +23,7 @@ class BaseObject:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
if len(pos) != 3:
|
if len(pos) != 3:
|
||||||
raise ValueError("Position must be tuple of length 3 (x,y,z).")
|
raise DimensionError("Position must be tuple of length 3 (x,y,z).")
|
||||||
self.pos = pos
|
self.pos = pos
|
||||||
self.name = name
|
self.name = name
|
||||||
self.color = color
|
self.color = color
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from .objects import BaseObject
|
from .objects import BaseObject
|
||||||
from pg_rad.isotopes import Isotope
|
from pg_rad.isotopes.isotope import Isotope, get_isotope
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -11,16 +11,16 @@ class PointSource(BaseObject):
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
activity: int,
|
activity_MBq: int,
|
||||||
isotope: Isotope,
|
isotope: str,
|
||||||
pos: tuple[float, float, float] = (0, 0, 0),
|
position: tuple[float, float, float] = (0, 0, 0),
|
||||||
name: str | None = None,
|
name: str | None = None,
|
||||||
color: str = 'red'
|
color: str = 'red'
|
||||||
):
|
):
|
||||||
"""A point source.
|
"""A point source.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
activity (int): Activity A in MBq.
|
activity_MBq (int): Activity A in MBq.
|
||||||
isotope (Isotope): The isotope.
|
isotope (Isotope): The isotope.
|
||||||
pos (tuple[float, float, float], optional):
|
pos (tuple[float, float, float], optional):
|
||||||
Position of the PointSource.
|
Position of the PointSource.
|
||||||
@ -37,16 +37,17 @@ class PointSource(BaseObject):
|
|||||||
if name is None:
|
if name is None:
|
||||||
name = f"Source {self.id}"
|
name = f"Source {self.id}"
|
||||||
|
|
||||||
super().__init__(pos, name, color)
|
super().__init__(position, name, color)
|
||||||
|
|
||||||
self.activity = activity
|
self.activity = activity_MBq
|
||||||
self.isotope = isotope
|
self.isotope: Isotope = get_isotope(isotope)
|
||||||
|
|
||||||
logger.debug(f"Source created: {self.name}")
|
logger.debug(f"Source created: {self.name}")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
x, y, z = self.position
|
||||||
repr_str = (f"PointSource(name={self.name}, "
|
repr_str = (f"PointSource(name={self.name}, "
|
||||||
+ f"pos={(self.x, self.y, self.z)}, "
|
+ f"pos={(x, y, z)}, "
|
||||||
+ f"A={self.activity} MBq), "
|
+ f"A={self.activity} MBq), "
|
||||||
+ f"isotope={self.isotope.name}.")
|
+ f"isotope={self.isotope.name}.")
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,9 @@ from pg_rad.physics import attenuation
|
|||||||
from pg_rad.physics import fluence
|
from pg_rad.physics import fluence
|
||||||
|
|
||||||
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
|
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
|
||||||
from pg_rad.physics.fluence import (phi_single_source,)
|
from pg_rad.physics.fluence import (calculate_fluence_along_path,
|
||||||
|
calculate_fluence_at, phi,)
|
||||||
|
|
||||||
__all__ = ['attenuation', 'fluence', 'get_mass_attenuation_coeff',
|
__all__ = ['attenuation', 'calculate_fluence_along_path',
|
||||||
'phi_single_source']
|
'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff',
|
||||||
|
'phi']
|
||||||
|
|||||||
@ -1,7 +1,12 @@
|
|||||||
|
from typing import Tuple, TYPE_CHECKING
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
def phi_single_source(
|
|
||||||
|
def phi(
|
||||||
r: float,
|
r: float,
|
||||||
activity: float | int,
|
activity: float | int,
|
||||||
branching_ratio: float,
|
branching_ratio: float,
|
||||||
@ -27,11 +32,68 @@ def phi_single_source(
|
|||||||
mu_mass_air *= 0.1
|
mu_mass_air *= 0.1
|
||||||
mu_air = mu_mass_air * air_density
|
mu_air = mu_mass_air * air_density
|
||||||
|
|
||||||
phi = (
|
phi_r = (
|
||||||
activity
|
activity
|
||||||
* branching_ratio
|
* branching_ratio
|
||||||
* np.exp(-mu_air * r)
|
* np.exp(-mu_air * r)
|
||||||
/ (4 * np.pi * r**2)
|
/ (4 * np.pi * r**2)
|
||||||
)
|
)
|
||||||
|
|
||||||
return phi
|
return phi_r
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
|
||||||
|
"""Compute fluence at an arbitrary position in the landscape.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
landscape (Landscape): The landscape to compute.
|
||||||
|
pos (np.ndarray): (N, 3) array of positions.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
total_phi (np.ndarray): (N,) array of fluences.
|
||||||
|
"""
|
||||||
|
pos = np.atleast_2d(pos)
|
||||||
|
total_phi = np.zeros(pos.shape[0])
|
||||||
|
|
||||||
|
for source in landscape.point_sources:
|
||||||
|
r = np.linalg.norm(pos - np.array(source.pos), axis=1)
|
||||||
|
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
|
||||||
|
|
||||||
|
phi_source = phi(
|
||||||
|
r=r,
|
||||||
|
activity=source.activity * scaling,
|
||||||
|
branching_ratio=source.isotope.b,
|
||||||
|
mu_mass_air=source.isotope.mu_mass_air,
|
||||||
|
air_density=landscape.air_density
|
||||||
|
)
|
||||||
|
|
||||||
|
total_phi += phi_source
|
||||||
|
|
||||||
|
return total_phi
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_fluence_along_path(
|
||||||
|
landscape: "Landscape",
|
||||||
|
points_per_segment: int = 10
|
||||||
|
) -> Tuple[np.ndarray, np.ndarray]:
|
||||||
|
path = landscape.path
|
||||||
|
num_segments = len(path.segments)
|
||||||
|
|
||||||
|
xnew = np.linspace(
|
||||||
|
path.x_list[0],
|
||||||
|
path.x_list[-1],
|
||||||
|
num=num_segments*points_per_segment)
|
||||||
|
|
||||||
|
ynew = np.interp(xnew, path.x_list, path.y_list)
|
||||||
|
|
||||||
|
z = np.full(xnew.shape, path.z)
|
||||||
|
full_positions = np.c_[xnew, ynew, z]
|
||||||
|
phi_result = calculate_fluence_at(landscape, full_positions)
|
||||||
|
|
||||||
|
dist_travelled = np.linspace(
|
||||||
|
full_positions[0, 0],
|
||||||
|
path.length,
|
||||||
|
len(phi_result)
|
||||||
|
)
|
||||||
|
|
||||||
|
return dist_travelled, phi_result
|
||||||
|
|||||||
@ -1,25 +1,38 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from matplotlib import pyplot as plt
|
from matplotlib import pyplot as plt
|
||||||
|
from matplotlib.axes import Axes
|
||||||
from matplotlib.patches import Circle
|
from matplotlib.patches import Circle
|
||||||
|
|
||||||
from pg_rad.landscape import Landscape
|
from numpy import median
|
||||||
|
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LandscapeSlicePlotter:
|
class LandscapeSlicePlotter:
|
||||||
def plot(self, landscape: Landscape, z: int = 0):
|
def plot(
|
||||||
|
self,
|
||||||
|
landscape: Landscape,
|
||||||
|
z: int = 0,
|
||||||
|
show: bool = True,
|
||||||
|
save: bool = False,
|
||||||
|
ax: Axes | None = None
|
||||||
|
):
|
||||||
"""Plot a top-down slice of the landscape at a height z.
|
"""Plot a top-down slice of the landscape at a height z.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
landscape (Landscape): the landscape to plot
|
landscape (Landscape): the landscape to plot
|
||||||
z (int, optional): Height at which to plot slice. Defaults to 0.
|
z (int, optional): Height at which to plot slice. Defaults to 0.
|
||||||
|
show (bool, optional): Show the plot. Defaults to True.
|
||||||
|
save (bool, optional): Save the plot. Defaults to False.
|
||||||
""" """
|
""" """
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.z = z
|
self.z = z
|
||||||
|
|
||||||
|
if not ax:
|
||||||
fig, ax = plt.subplots()
|
fig, ax = plt.subplots()
|
||||||
|
|
||||||
self._draw_base(ax, landscape)
|
self._draw_base(ax, landscape)
|
||||||
@ -27,19 +40,45 @@ class LandscapeSlicePlotter:
|
|||||||
self._draw_point_sources(ax, landscape)
|
self._draw_point_sources(ax, landscape)
|
||||||
|
|
||||||
ax.set_aspect("equal")
|
ax.set_aspect("equal")
|
||||||
|
|
||||||
|
if save and not ax:
|
||||||
|
landscape_name = landscape.name.lower().replace(' ', '_')
|
||||||
|
filename = f"{landscape_name}_z{self.z}.png"
|
||||||
|
plt.savefig(filename)
|
||||||
|
logger.info("Plot saved to file: "+filename)
|
||||||
|
|
||||||
|
if show and not ax:
|
||||||
plt.show()
|
plt.show()
|
||||||
|
|
||||||
def _draw_base(self, ax, landscape):
|
return ax
|
||||||
|
|
||||||
|
def _draw_base(self, ax, landscape: Landscape):
|
||||||
width, height = landscape.size[:2]
|
width, height = landscape.size[:2]
|
||||||
ax.set_xlim(right=width)
|
|
||||||
ax.set_ylim(top=height)
|
ax.set_xlim(right=max(width, .5*height))
|
||||||
|
|
||||||
|
# if the road is very flat, we center it vertically (looks better)
|
||||||
|
if median(landscape.path.y_list) == 0:
|
||||||
|
h = max(height, .5*width)
|
||||||
|
ax.set_ylim(bottom=-h//2,
|
||||||
|
top=h//2)
|
||||||
|
else:
|
||||||
|
ax.set_ylim(top=max(height, .5*width))
|
||||||
|
|
||||||
ax.set_xlabel("X [m]")
|
ax.set_xlabel("X [m]")
|
||||||
ax.set_ylabel("Y [m]")
|
ax.set_ylabel("Y [m]")
|
||||||
ax.set_title(f"Landscape (top-down, z = {self.z})")
|
ax.set_title(f"Landscape (top-down, z = {self.z})")
|
||||||
|
|
||||||
def _draw_path(self, ax, landscape):
|
def _draw_path(self, ax, landscape):
|
||||||
if landscape.path.z < self.z:
|
if landscape.path.z <= self.z:
|
||||||
ax.plot(landscape.path.x_list, landscape.path.y_list, 'bo-')
|
ax.plot(
|
||||||
|
landscape.path.x_list,
|
||||||
|
landscape.path.y_list,
|
||||||
|
linestyle='-',
|
||||||
|
marker='|',
|
||||||
|
markersize=3,
|
||||||
|
linewidth=1
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Path is above the slice height z."
|
"Path is above the slice height z."
|
||||||
@ -48,18 +87,19 @@ class LandscapeSlicePlotter:
|
|||||||
|
|
||||||
def _draw_point_sources(self, ax, landscape):
|
def _draw_point_sources(self, ax, landscape):
|
||||||
for s in landscape.point_sources:
|
for s in landscape.point_sources:
|
||||||
if s.z <= self.z:
|
x, y, z = s.pos
|
||||||
|
if z <= self.z:
|
||||||
dot = Circle(
|
dot = Circle(
|
||||||
(s.x, s.y),
|
(x, y),
|
||||||
radius=5,
|
radius=5,
|
||||||
color=s.color,
|
color=s.color,
|
||||||
zorder=5
|
zorder=5
|
||||||
)
|
)
|
||||||
|
|
||||||
ax.text(
|
ax.text(
|
||||||
s.x + 0.06,
|
x + 0.06,
|
||||||
s.y + 0.06,
|
y + 0.06,
|
||||||
s.name,
|
s.name+", z="+str(z),
|
||||||
color=s.color,
|
color=s.color,
|
||||||
fontsize=10,
|
fontsize=10,
|
||||||
ha="left",
|
ha="left",
|
||||||
|
|||||||
100
src/pg_rad/plotting/result_plotter.py
Normal file
100
src/pg_rad/plotting/result_plotter.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
from matplotlib import pyplot as plt
|
||||||
|
from matplotlib.gridspec import GridSpec
|
||||||
|
|
||||||
|
from .landscape_plotter import LandscapeSlicePlotter
|
||||||
|
from pg_rad.simulator.outputs import SimulationOutput
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
|
|
||||||
|
class ResultPlotter:
|
||||||
|
def __init__(self, landscape: Landscape, output: SimulationOutput):
|
||||||
|
self.landscape = landscape
|
||||||
|
self.count_rate_res = output.count_rate
|
||||||
|
self.source_res = output.sources
|
||||||
|
|
||||||
|
def plot(self, landscape_z: float = 0):
|
||||||
|
fig = plt.figure(figsize=(12, 10), constrained_layout=True)
|
||||||
|
fig.suptitle(self.landscape.name)
|
||||||
|
gs = GridSpec(
|
||||||
|
3,
|
||||||
|
2,
|
||||||
|
width_ratios=[0.5, 0.5],
|
||||||
|
height_ratios=[0.7, 0.15, 0.15],
|
||||||
|
hspace=0.2)
|
||||||
|
|
||||||
|
ax1 = fig.add_subplot(gs[0, 0])
|
||||||
|
self._draw_count_rate(ax1)
|
||||||
|
|
||||||
|
ax2 = fig.add_subplot(gs[0, 1])
|
||||||
|
self._plot_landscape(ax2, landscape_z)
|
||||||
|
|
||||||
|
ax3 = fig.add_subplot(gs[1, :])
|
||||||
|
self._draw_table(ax3)
|
||||||
|
|
||||||
|
ax4 = fig.add_subplot(gs[2, :])
|
||||||
|
self._draw_source_table(ax4)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
def _plot_landscape(self, ax, z):
|
||||||
|
lp = LandscapeSlicePlotter()
|
||||||
|
ax = lp.plot(landscape=self.landscape, z=z, ax=ax, show=False)
|
||||||
|
return ax
|
||||||
|
|
||||||
|
def _draw_count_rate(self, ax):
|
||||||
|
x = self.count_rate_res.arc_length
|
||||||
|
y = self.count_rate_res.count_rate
|
||||||
|
ax.plot(x, y, label='Count rate', color='r')
|
||||||
|
ax.set_title('Count rate')
|
||||||
|
ax.set_xlabel('Arc length s [m]')
|
||||||
|
ax.set_ylabel('Counts')
|
||||||
|
ax.legend()
|
||||||
|
|
||||||
|
def _draw_table(self, ax):
|
||||||
|
ax.set_axis_off()
|
||||||
|
ax.set_title('Simulation parameters')
|
||||||
|
cols = ('Parameter', 'Value')
|
||||||
|
data = [
|
||||||
|
["Air density (kg/m^3)", round(self.landscape.air_density, 3)],
|
||||||
|
["Total path length (m)", round(self.landscape.path.length, 3)]
|
||||||
|
]
|
||||||
|
|
||||||
|
ax.table(
|
||||||
|
cellText=data,
|
||||||
|
colLabels=cols,
|
||||||
|
loc='center'
|
||||||
|
)
|
||||||
|
|
||||||
|
return ax
|
||||||
|
|
||||||
|
def _draw_source_table(self, ax):
|
||||||
|
ax.set_axis_off()
|
||||||
|
ax.set_title('Point sources')
|
||||||
|
|
||||||
|
cols = (
|
||||||
|
'Name',
|
||||||
|
'Isotope',
|
||||||
|
'Activity (MBq)',
|
||||||
|
'Position (m)',
|
||||||
|
'Dist. to path (m)'
|
||||||
|
)
|
||||||
|
|
||||||
|
# this formats position to tuple
|
||||||
|
data = [
|
||||||
|
[
|
||||||
|
s.name,
|
||||||
|
s.isotope,
|
||||||
|
s.activity,
|
||||||
|
"("+", ".join(f"{val:.2f}" for val in s.position)+")",
|
||||||
|
round(s.dist_from_path, 2)
|
||||||
|
]
|
||||||
|
for s in self.source_res
|
||||||
|
]
|
||||||
|
ax.table(
|
||||||
|
cellText=data,
|
||||||
|
colLabels=cols,
|
||||||
|
loc='center'
|
||||||
|
)
|
||||||
|
|
||||||
|
return ax
|
||||||
0
src/pg_rad/simulator/__init__.py
Normal file
0
src/pg_rad/simulator/__init__.py
Normal file
63
src/pg_rad/simulator/engine.py
Normal file
63
src/pg_rad/simulator/engine.py
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
from typing import List
|
||||||
|
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
from pg_rad.simulator.outputs import (
|
||||||
|
CountRateOutput,
|
||||||
|
SimulationOutput,
|
||||||
|
SourceOutput
|
||||||
|
)
|
||||||
|
from pg_rad.physics.fluence import calculate_fluence_along_path
|
||||||
|
from pg_rad.utils.projection import minimal_distance_to_path
|
||||||
|
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
|
||||||
|
|
||||||
|
|
||||||
|
class SimulationEngine:
|
||||||
|
"""Takes a fully built landscape and produces results."""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
landscape: Landscape,
|
||||||
|
runtime_spec=RuntimeSpec,
|
||||||
|
sim_spec=SimulationOptionsSpec
|
||||||
|
):
|
||||||
|
|
||||||
|
self.landscape = landscape
|
||||||
|
self.runtime_spec = runtime_spec
|
||||||
|
self.sim_spec = sim_spec
|
||||||
|
|
||||||
|
def simulate(self) -> SimulationOutput:
|
||||||
|
"""Compute everything and return structured output."""
|
||||||
|
|
||||||
|
count_rate_results = self._calculate_count_rate_along_path()
|
||||||
|
source_results = self._calculate_point_source_distance_to_path()
|
||||||
|
|
||||||
|
return SimulationOutput(
|
||||||
|
name=self.landscape.name,
|
||||||
|
count_rate=count_rate_results,
|
||||||
|
sources=source_results
|
||||||
|
)
|
||||||
|
|
||||||
|
def _calculate_count_rate_along_path(self) -> CountRateOutput:
|
||||||
|
arc_length, phi = calculate_fluence_along_path(self.landscape)
|
||||||
|
return CountRateOutput(arc_length, phi)
|
||||||
|
|
||||||
|
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:
|
||||||
|
|
||||||
|
path = self.landscape.path
|
||||||
|
source_output = []
|
||||||
|
for s in self.landscape.point_sources:
|
||||||
|
dist_to_path = minimal_distance_to_path(
|
||||||
|
path.x_list,
|
||||||
|
path.y_list,
|
||||||
|
path.z,
|
||||||
|
s.pos)
|
||||||
|
|
||||||
|
source_output.append(
|
||||||
|
SourceOutput(
|
||||||
|
s.name,
|
||||||
|
s.isotope.name,
|
||||||
|
s.activity,
|
||||||
|
s.pos,
|
||||||
|
dist_to_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
return source_output
|
||||||
25
src/pg_rad/simulator/outputs.py
Normal file
25
src/pg_rad/simulator/outputs.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CountRateOutput:
|
||||||
|
arc_length: List[float]
|
||||||
|
count_rate: List[float]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SourceOutput:
|
||||||
|
name: str
|
||||||
|
isotope: str
|
||||||
|
activity: float
|
||||||
|
position: Tuple[float, float, float]
|
||||||
|
dist_from_path: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationOutput:
|
||||||
|
name: str
|
||||||
|
count_rate: CountRateOutput
|
||||||
|
sources: List[SourceOutput]
|
||||||
0
src/pg_rad/utils/__init__.py
Normal file
0
src/pg_rad/utils/__init__.py
Normal file
168
src/pg_rad/utils/projection.py
Normal file
168
src/pg_rad/utils/projection.py
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import OutOfBoundsError
|
||||||
|
|
||||||
|
|
||||||
|
def rel_to_abs_source_position(
|
||||||
|
x_list: List,
|
||||||
|
y_list: List,
|
||||||
|
path_z: int | float,
|
||||||
|
along_path: int | float,
|
||||||
|
side: str,
|
||||||
|
dist_from_path: int | float,
|
||||||
|
z_rel: int | float = 0
|
||||||
|
):
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
arc_length = np.cumsum(segment_lengths)
|
||||||
|
arc_length = np.insert(arc_length, 0, 0)
|
||||||
|
|
||||||
|
# find index of the segment where the source should be placed
|
||||||
|
s_i = np.searchsorted(arc_length, along_path, side='right') - 1
|
||||||
|
if not 0 <= s_i < len(segments):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"along_path must be less than the length of the path!"
|
||||||
|
)
|
||||||
|
|
||||||
|
# fraction of segment length where to place the point source
|
||||||
|
segment_fraction = (
|
||||||
|
(along_path - arc_length[s_i])
|
||||||
|
/ segment_lengths[s_i]
|
||||||
|
)
|
||||||
|
|
||||||
|
# tangential vector at point where point source to be placed
|
||||||
|
point_on_path = path[s_i] + segment_fraction * segments[s_i]
|
||||||
|
tangent_vec = segments[s_i] / segment_lengths[s_i]
|
||||||
|
|
||||||
|
if side not in {"left", "right"}:
|
||||||
|
raise ValueError("side must be 'left' or 'right'")
|
||||||
|
|
||||||
|
orientation = 1 if side == "left" else -1
|
||||||
|
perp_vec = orientation * np.array([-tangent_vec[1], tangent_vec[0]])
|
||||||
|
|
||||||
|
x_abs, y_abs = point_on_path + dist_from_path * perp_vec
|
||||||
|
z_abs = path_z + z_rel
|
||||||
|
|
||||||
|
return x_abs, y_abs, z_abs
|
||||||
|
|
||||||
|
|
||||||
|
def minimal_distance_to_path(
|
||||||
|
x_list: List[float],
|
||||||
|
y_list: List[float],
|
||||||
|
path_z: float,
|
||||||
|
pos: Tuple[float, float, float]
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Compute minimal Euclidean distance from pos (x,y,z)
|
||||||
|
to interpolated polyline path at height path_z.
|
||||||
|
"""
|
||||||
|
x, y, z = pos
|
||||||
|
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
point_xy = np.array([x, y])
|
||||||
|
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
min_dist = np.inf
|
||||||
|
|
||||||
|
for p0, seg, seg_len in zip(path[:-1], segments, segment_lengths):
|
||||||
|
if seg_len == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
t = np.dot(point_xy - p0, seg) / (seg_len ** 2)
|
||||||
|
t = np.clip(t, 0.0, 1.0)
|
||||||
|
|
||||||
|
projection_xy = p0 + t * seg
|
||||||
|
|
||||||
|
dx, dy = point_xy - projection_xy
|
||||||
|
dz = z - path_z
|
||||||
|
|
||||||
|
dist = np.sqrt(dx**2 + dy**2 + dz**2)
|
||||||
|
|
||||||
|
if dist < min_dist:
|
||||||
|
min_dist = dist
|
||||||
|
|
||||||
|
return min_dist
|
||||||
|
|
||||||
|
|
||||||
|
def abs_to_rel_source_position(
|
||||||
|
x_list: List,
|
||||||
|
y_list: List,
|
||||||
|
path_z: int | float,
|
||||||
|
pos: Tuple[float, float, float],
|
||||||
|
) -> Tuple[float, float, str, float]:
|
||||||
|
"""
|
||||||
|
Convert absolute (x,y,z) position into:
|
||||||
|
along_path,
|
||||||
|
dist_from_path,
|
||||||
|
side ("left" or "right"),
|
||||||
|
z_rel
|
||||||
|
"""
|
||||||
|
|
||||||
|
x, y, z = pos
|
||||||
|
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
point = np.array([x, y])
|
||||||
|
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
arc_length = np.cumsum(segment_lengths)
|
||||||
|
arc_length = np.insert(arc_length, 0, 0)
|
||||||
|
|
||||||
|
min_dist = np.inf
|
||||||
|
best_projection = None
|
||||||
|
best_s_i = None
|
||||||
|
best_fraction = None
|
||||||
|
best_signed_dist = None
|
||||||
|
|
||||||
|
for (i, (p0, seg, seg_len)) in enumerate(
|
||||||
|
zip(path[:-1], segments, segment_lengths)
|
||||||
|
):
|
||||||
|
if seg_len == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# project point onto infinite line
|
||||||
|
t = np.dot(point - p0, seg) / (seg_len ** 2)
|
||||||
|
|
||||||
|
# clamp to segment
|
||||||
|
t_clamped = np.clip(t, 0.0, 1.0)
|
||||||
|
|
||||||
|
projection = p0 + t_clamped * seg
|
||||||
|
diff_vec = point - projection
|
||||||
|
dist = np.linalg.norm(diff_vec)
|
||||||
|
|
||||||
|
if dist < min_dist:
|
||||||
|
min_dist = dist
|
||||||
|
best_projection = projection
|
||||||
|
best_s_i = i
|
||||||
|
best_fraction = t_clamped
|
||||||
|
|
||||||
|
# tangent and perpendicular
|
||||||
|
tangent_vec = seg / seg_len
|
||||||
|
perp_vec = np.array([-tangent_vec[1], tangent_vec[0]])
|
||||||
|
|
||||||
|
signed_dist = np.dot(diff_vec, perp_vec)
|
||||||
|
best_signed_dist = signed_dist
|
||||||
|
|
||||||
|
if best_projection is None:
|
||||||
|
raise ValueError("Could not project point onto path.")
|
||||||
|
|
||||||
|
# Compute along_path
|
||||||
|
along_path = (
|
||||||
|
arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Side and distance
|
||||||
|
side = "left" if best_signed_dist > 0 else "right"
|
||||||
|
dist_from_path = abs(best_signed_dist)
|
||||||
|
|
||||||
|
# z relative
|
||||||
|
z_rel = z - path_z
|
||||||
|
|
||||||
|
return along_path, dist_from_path, side, z_rel
|
||||||
0
src/road_gen/__init__.py
Normal file
0
src/road_gen/__init__.py
Normal file
0
src/road_gen/generators/__init__.py
Normal file
0
src/road_gen/generators/__init__.py
Normal file
55
src/road_gen/generators/base_road_generator.py
Normal file
55
src/road_gen/generators/base_road_generator.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import secrets
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class BaseRoadGenerator:
|
||||||
|
"""A base generator object for generating a road of a specified length."""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
length: int | float,
|
||||||
|
ds: int | float,
|
||||||
|
velocity: int | float,
|
||||||
|
mu: float = 0.7,
|
||||||
|
g: float = 9.81,
|
||||||
|
seed: int | None = None
|
||||||
|
):
|
||||||
|
"""Initialize a BaseGenerator with a given or random seed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length (int | float): The total length of the road in meters.
|
||||||
|
ds (int | float): The step size in meters.
|
||||||
|
velocity (int | float): Velocity in meters per second.
|
||||||
|
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
|
||||||
|
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
|
||||||
|
seed (int | None, optional): Set a seed for the generator.
|
||||||
|
Defaults to a random seed.
|
||||||
|
"""
|
||||||
|
if seed is None:
|
||||||
|
seed = secrets.randbits(32)
|
||||||
|
|
||||||
|
if not isinstance(seed, int):
|
||||||
|
raise TypeError("seed must be an integer or None.")
|
||||||
|
|
||||||
|
if not isinstance(length, int | float):
|
||||||
|
raise TypeError("Length must be an integer or float in meters.")
|
||||||
|
|
||||||
|
if not isinstance(ds, int | float):
|
||||||
|
raise TypeError("Step size must be integer or float in meters.")
|
||||||
|
|
||||||
|
if not isinstance(velocity, int | float):
|
||||||
|
raise TypeError(
|
||||||
|
"Velocity must be integer or float in meters per second."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.length = length
|
||||||
|
self.ds = ds
|
||||||
|
|
||||||
|
self.velocity = velocity
|
||||||
|
self.min_radius = (velocity ** 2) / (g * mu)
|
||||||
|
|
||||||
|
self.seed = seed
|
||||||
|
self._rng = np.random.default_rng(seed)
|
||||||
|
|
||||||
|
def generate(self):
|
||||||
|
pass
|
||||||
143
src/road_gen/generators/segmented_road_generator.py
Normal file
143
src/road_gen/generators/segmented_road_generator.py
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .base_road_generator import BaseRoadGenerator
|
||||||
|
from road_gen.prefabs import prefabs
|
||||||
|
from road_gen.integrator.integrator import integrate_road
|
||||||
|
|
||||||
|
from pg_rad.configs import defaults
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SegmentedRoadGenerator(BaseRoadGenerator):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
length: int | float | list[int | float],
|
||||||
|
ds: int | float,
|
||||||
|
velocity: int | float,
|
||||||
|
mu: float = 0.7,
|
||||||
|
g: float = 9.81,
|
||||||
|
seed: int | None = None
|
||||||
|
):
|
||||||
|
"""Initialize a SegmentedRoadGenerator with given or random seed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length (int | float): The total length of the road in meters.
|
||||||
|
ds (int | float): The step size in meters.
|
||||||
|
velocity (int | float): Velocity in meters per second.
|
||||||
|
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
|
||||||
|
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
|
||||||
|
seed (int | None, optional): Set a seed for the generator.
|
||||||
|
Defaults to random seed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if isinstance(length, list):
|
||||||
|
length = sum(
|
||||||
|
[seg_len for seg_len in length if seg_len is not None]
|
||||||
|
)
|
||||||
|
super().__init__(length, ds, velocity, mu, g, seed)
|
||||||
|
|
||||||
|
def generate(
|
||||||
|
self,
|
||||||
|
segments: list[str],
|
||||||
|
lengths: list[int | float] | None = None,
|
||||||
|
angles: list[int | float] | None = None,
|
||||||
|
alpha: float = defaults.DEFAULT_ALPHA,
|
||||||
|
min_turn_angle: float = defaults.DEFAULT_MIN_TURN_ANGLE,
|
||||||
|
max_turn_angle: float = defaults.DEFAULT_MAX_TURN_ANGLE
|
||||||
|
) -> Tuple[np.ndarray, np.ndarray]:
|
||||||
|
"""Generate a curvature profile from a list of segments.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
segments (list[str]): List of segments.
|
||||||
|
alpha (float, optional): Dirichlet concentration parameter.
|
||||||
|
A higher value leads to more uniform apportionment of the
|
||||||
|
length amongst the segments, while a lower value allows more
|
||||||
|
random apportionment. Defaults to 1.0.
|
||||||
|
min_turn_angle (float, optional): Minimum turn angle in degrees for
|
||||||
|
random sampling of turn radius. Does nothing if `angle_list` is
|
||||||
|
provided or no `turn_*` segement is specified in the `segments`
|
||||||
|
list.
|
||||||
|
min_turn_angle (float, optional): Maximum turn angle in degrees for
|
||||||
|
random sampling of turn radius. Does nothing if `angle_list` is
|
||||||
|
provided or no `turn_*` segement is specified in the `segments`
|
||||||
|
list.
|
||||||
|
Raises:
|
||||||
|
ValueError: Raised when a turn
|
||||||
|
is too tight given its segment length and the velocity.
|
||||||
|
To fix this, you can try to reduce the amount of segments or
|
||||||
|
increase length. Increasing alpha
|
||||||
|
(Dirichlet concentration parameter) can also help because this
|
||||||
|
reduces the odds of very small lengths being assigned to
|
||||||
|
turn segments.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple[np.ndarray, np.ndarray]: x and y coordinates of the
|
||||||
|
waypoints describing the random road.
|
||||||
|
"""
|
||||||
|
existing_prefabs = prefabs.PREFABS.keys()
|
||||||
|
if not all(segment in existing_prefabs for segment in segments):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid segment type provided. Available choices"
|
||||||
|
f"{existing_prefabs}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.segments = segments
|
||||||
|
self.alpha = alpha
|
||||||
|
num_points = np.ceil(self.length / self.ds).astype(int)
|
||||||
|
|
||||||
|
# divide num_points into len(segments) randomly sized parts.
|
||||||
|
if isinstance(self.length, list):
|
||||||
|
parts = self.length
|
||||||
|
else:
|
||||||
|
parts = self._rng.dirichlet(
|
||||||
|
np.full(len(segments), alpha),
|
||||||
|
size=1)[0]
|
||||||
|
parts = parts * num_points
|
||||||
|
parts = np.round(parts).astype(int)
|
||||||
|
|
||||||
|
# correct round off so the sum of parts is still total length L.
|
||||||
|
if sum(parts) != num_points:
|
||||||
|
parts[0] += num_points - sum(parts)
|
||||||
|
|
||||||
|
curvature = np.zeros(num_points)
|
||||||
|
current_index = 0
|
||||||
|
|
||||||
|
for seg_name, seg_length in zip(segments, parts):
|
||||||
|
seg_function = prefabs.PREFABS[seg_name]
|
||||||
|
|
||||||
|
if seg_name == 'straight':
|
||||||
|
curvature_s = seg_function(seg_length)
|
||||||
|
else:
|
||||||
|
R_min_angle = seg_length / np.deg2rad(max_turn_angle)
|
||||||
|
R_max_angle = seg_length / np.deg2rad(min_turn_angle)
|
||||||
|
|
||||||
|
# physics limit
|
||||||
|
R_min = max(self.min_radius, R_min_angle)
|
||||||
|
|
||||||
|
if R_min > R_max_angle:
|
||||||
|
raise ValueError(
|
||||||
|
f"{seg_name} with length {seg_length} does not have "
|
||||||
|
"a possible radius. The minimum for the provided "
|
||||||
|
"velocity and friction coefficient is "
|
||||||
|
f"{self.min_radius}, but the possible range is "
|
||||||
|
f"({R_min}, {R_max_angle})"
|
||||||
|
)
|
||||||
|
|
||||||
|
rand_radius = self._rng.uniform(R_min, R_max_angle)
|
||||||
|
|
||||||
|
if seg_name.startswith("u_turn"):
|
||||||
|
curvature_s = seg_function(rand_radius)
|
||||||
|
else:
|
||||||
|
curvature_s = seg_function(seg_length, rand_radius)
|
||||||
|
|
||||||
|
curvature[current_index:(current_index + seg_length)] = curvature_s
|
||||||
|
current_index += seg_length
|
||||||
|
|
||||||
|
x, y = integrate_road(curvature)
|
||||||
|
|
||||||
|
return x * self.ds, y * self.ds
|
||||||
0
src/road_gen/integrator/__init__.py
Normal file
0
src/road_gen/integrator/__init__.py
Normal file
21
src/road_gen/integrator/integrator.py
Normal file
21
src/road_gen/integrator/integrator.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def integrate_road(curvature: np.ndarray, ds: float = 1.0):
|
||||||
|
"""Integrate along the curvature field to obtain X and Y coordinates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
curvature (np.ndarray): The curvature field.
|
||||||
|
ds (float, optional): _description_. Defaults to 1.0.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
x, y (np.ndarray): X and Y waypoints.
|
||||||
|
"""
|
||||||
|
|
||||||
|
theta = np.zeros(len(curvature))
|
||||||
|
theta[1:] = np.cumsum(curvature[:-1] * ds)
|
||||||
|
|
||||||
|
x = np.cumsum(np.cos(theta) * ds)
|
||||||
|
y = np.cumsum(np.sin(theta) * ds)
|
||||||
|
|
||||||
|
return x, y
|
||||||
0
src/road_gen/prefabs/__init__.py
Normal file
0
src/road_gen/prefabs/__init__.py
Normal file
20
src/road_gen/prefabs/prefabs.py
Normal file
20
src/road_gen/prefabs/prefabs.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def straight(length: int) -> np.ndarray:
|
||||||
|
return np.zeros(length)
|
||||||
|
|
||||||
|
|
||||||
|
def turn_left(length: int, radius: float) -> np.ndarray:
|
||||||
|
return np.full(length, 1.0 / radius)
|
||||||
|
|
||||||
|
|
||||||
|
def turn_right(length: int, radius: float) -> np.ndarray:
|
||||||
|
return -turn_left(length, radius)
|
||||||
|
|
||||||
|
|
||||||
|
PREFABS = {
|
||||||
|
'straight': straight,
|
||||||
|
'turn_left': turn_left,
|
||||||
|
'turn_right': turn_right,
|
||||||
|
}
|
||||||
@ -1,34 +1,53 @@
|
|||||||
from math import dist, exp, pi
|
import numpy as np
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from pg_rad.landscape import LandscapeDirector
|
from pg_rad.inputparser.parser import ConfigParser
|
||||||
|
from pg_rad.landscape.director import LandscapeDirector
|
||||||
|
from pg_rad.physics import calculate_fluence_at
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def phi_ref():
|
def phi_ref(test_landscape):
|
||||||
A = 100 # MBq
|
source = test_landscape.point_sources[0]
|
||||||
b = 0.851
|
|
||||||
mu_mass_air = 0.0778 # cm^2/g
|
|
||||||
air_density = 1.243 # kg/m^3
|
|
||||||
r = dist((0, 0, 0), (10, 10, 0)) # m
|
|
||||||
|
|
||||||
A *= 1E9 # Convert to Bq
|
r = np.linalg.norm(np.array([10, 10, 0]) - np.array(source.pos))
|
||||||
mu_mass_air *= 0.1 # Convert to m^2/kg
|
|
||||||
|
|
||||||
mu_air = mu_mass_air * air_density # [m^2/kg] x [kg/m^3] = [m^-1]
|
A = source.activity * 1E6
|
||||||
|
b = source.isotope.b
|
||||||
|
mu_air = source.isotope.mu_mass_air * test_landscape.air_density
|
||||||
|
mu_air *= 0.1
|
||||||
|
|
||||||
# [s^-1] x exp([m^-1] x [m]) / [m^-2] = [s^-1 m^-2]
|
return A * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
|
||||||
phi = A * b * exp(-mu_air * r) / (4 * pi * r**2)
|
|
||||||
return phi
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def test_landscape():
|
def test_landscape():
|
||||||
landscape = LandscapeDirector().build_test_landscape()
|
|
||||||
|
test_yaml = """
|
||||||
|
name: Test landscape
|
||||||
|
speed: 8.33
|
||||||
|
acquisition_time: 1
|
||||||
|
|
||||||
|
path:
|
||||||
|
length: 1000
|
||||||
|
segments:
|
||||||
|
- straight
|
||||||
|
|
||||||
|
sources:
|
||||||
|
test_source:
|
||||||
|
activity_MBq: 100
|
||||||
|
position: [0, 0, 0]
|
||||||
|
isotope: CS137
|
||||||
|
"""
|
||||||
|
|
||||||
|
cp = ConfigParser(test_yaml).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
return landscape
|
return landscape
|
||||||
|
|
||||||
|
|
||||||
def test_single_source_fluence(phi_ref, test_landscape):
|
def test_single_source_fluence(phi_ref, test_landscape):
|
||||||
phi = test_landscape.calculate_fluence_at((10, 10, 0))
|
phi = calculate_fluence_at(
|
||||||
assert pytest.approx(phi, rel=1E-3) == phi_ref
|
test_landscape,
|
||||||
|
np.array([10, 10, 0]),
|
||||||
|
)
|
||||||
|
assert pytest.approx(phi[0], rel=1E-3) == phi_ref
|
||||||
|
|||||||
@ -2,17 +2,16 @@ import numpy as np
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from pg_rad.objects import PointSource
|
from pg_rad.objects import PointSource
|
||||||
from pg_rad.isotopes import CS137
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def test_sources():
|
def test_sources():
|
||||||
iso = CS137()
|
iso = "CS137"
|
||||||
pos_a = np.random.rand(3)
|
pos_a = np.random.rand(3)
|
||||||
pos_b = np.random.rand(3)
|
pos_b = np.random.rand(3)
|
||||||
|
|
||||||
a = PointSource(pos=pos_a, activity=None, isotope=iso)
|
a = PointSource(position=pos_a, activity_MBq=None, isotope=iso)
|
||||||
b = PointSource(pos=pos_b, activity=None, isotope=iso)
|
b = PointSource(position=pos_b, activity_MBq=None, isotope=iso)
|
||||||
|
|
||||||
return pos_a, pos_b, a, b
|
return pos_a, pos_b, a, b
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user