Merge pull request #38 from pim-n/feature-plotter

Config parser, plotting, simulation engine
This commit is contained in:
Pim Nelissen
2026-03-02 11:42:07 +01:00
committed by GitHub
35 changed files with 1548 additions and 236 deletions

View File

@ -0,0 +1 @@
__all__ = []

View File

@ -0,0 +1,18 @@
# --- Physics defaults ---
DEFAULT_AIR_DENSITY = 1.243 # kg/m^3, Skåne
# --- Simulation defaults ---
DEFAULT_SEED = None
DEFAULT_ACQUISITION_TIME = 1.0
# --- Source defaults ---
DEFAULT_PATH_HEIGHT = 0.0
DEFAULT_SOURCE_HEIGHT = 0.0
# --- Segmented road defaults ---
DEFAULT_MIN_TURN_ANGLE = 30.
DEFAULT_MAX_TURN_ANGLE = 90.
DEFAULT_FRICTION_COEFF = 0.7 # dry asphalt
DEFAULT_GRAVITATIONAL_ACC = 9.81 # m/s^2
DEFAULT_ALPHA = 100.

View File

@ -2,7 +2,6 @@ import logging
import pandas as pd
from pg_rad.exceptions import DataLoadError, InvalidCSVError
logger = logging.getLogger(__name__)
@ -12,18 +11,12 @@ def load_data(filename: str) -> pd.DataFrame:
try:
df = pd.read_csv(filename, delimiter=',')
except FileNotFoundError as e:
logger.error(f"File not found: {filename}")
raise DataLoadError(f"File does not exist: {filename}") from e
logger.critical(e)
raise
except pd.errors.ParserError as e:
logger.error(f"Invalid CSV format: {filename}")
raise InvalidCSVError(f"Invalid CSV file: {filename}") from e
except Exception as e:
logger.exception(f"Unexpected error while loading {filename}")
raise DataLoadError("Unexpected error while loading data") from e
logger.critical(e)
raise
logger.debug(f"File loaded: {filename}")
return df

View File

@ -12,3 +12,22 @@ class InvalidCSVError(DataLoadError):
class OutOfBoundsError(Exception):
"""Raised when an object is attempted to be placed out of bounds."""
class MissingConfigKeyError(KeyError):
"""Raised when a (nested) config key is missing in the config."""
def __init__(self, key, subkey=None):
if subkey:
self.message = f"Missing key in {key}: {', '.join(list(subkey))}"
else:
self.message = f"Missing key: {key}"
super().__init__(self.message)
class DimensionError(ValueError):
"""Raised if dimensions or coordinates do not match the system."""
class InvalidIsotopeError(ValueError):
"""Raised if attempting to load an isotope that is not valid."""

View File

@ -0,0 +1,316 @@
import logging
import os
from typing import Any, Dict, List, Union
import yaml
from pg_rad.exceptions.exceptions import MissingConfigKeyError, DimensionError
from pg_rad.configs import defaults
from .specs import (
MetadataSpec,
RuntimeSpec,
SimulationOptionsSpec,
SegmentSpec,
PathSpec,
ProceduralPathSpec,
CSVPathSpec,
SourceSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec,
SimulationSpec,
)
logger = logging.getLogger(__name__)
class ConfigParser:
_ALLOWED_ROOT_KEYS = {
"name",
"speed",
"acquisition_time",
"path",
"sources",
"options",
}
def __init__(self, config_source: str):
self.config = self._load_yaml(config_source)
def parse(self) -> SimulationSpec:
self._warn_unknown_keys(
section="global",
provided=set(self.config.keys()),
allowed=self._ALLOWED_ROOT_KEYS,
)
metadata = self._parse_metadata()
runtime = self._parse_runtime()
options = self._parse_options()
path = self._parse_path()
sources = self._parse_point_sources()
return SimulationSpec(
metadata=metadata,
runtime=runtime,
options=options,
path=path,
point_sources=sources,
)
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
if os.path.exists(config_source):
with open(config_source) as f:
data = yaml.safe_load(f)
else:
data = yaml.safe_load(config_source)
if not isinstance(data, dict):
raise ValueError(
"Provided path or string is not a valid YAML representation."
)
return data
def _parse_metadata(self) -> MetadataSpec:
try:
return MetadataSpec(
name=self.config["name"]
)
except KeyError as e:
raise MissingConfigKeyError("global", {"name"}) from e
def _parse_runtime(self) -> RuntimeSpec:
required = {"speed", "acquisition_time"}
missing = required - self.config.keys()
if missing:
raise MissingConfigKeyError("global", missing)
return RuntimeSpec(
speed=float(self.config["speed"]),
acquisition_time=float(self.config["acquisition_time"]),
)
def _parse_options(self) -> SimulationOptionsSpec:
options = self.config.get("options", {})
allowed = {"air_density", "seed"}
self._warn_unknown_keys(
section="options",
provided=set(options.keys()),
allowed=allowed,
)
air_density = options.get("air_density", defaults.DEFAULT_AIR_DENSITY)
seed = options.get("seed")
if not isinstance(air_density, float) or air_density <= 0:
raise ValueError(
"options.air_density must be a positive float in kg/m^3."
)
if (
seed is not None or
(isinstance(seed, int) and seed <= 0)
):
raise ValueError("Seed must be a positive integer value.")
return SimulationOptionsSpec(
air_density=air_density,
seed=seed,
)
def _parse_path(self) -> PathSpec:
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
allowed_proc = {"segments", "length", "z"}
path = self.config.get("path")
if path is None:
raise MissingConfigKeyError("global", {"path"})
if not isinstance(path, dict):
raise ValueError("Path must be a dictionary.")
if "file" in path:
self._warn_unknown_keys(
section="path (csv)",
provided=set(path.keys()),
allowed=allowed_csv,
)
return CSVPathSpec(
file=path["file"],
east_col_name=path["east_col_name"],
north_col_name=path["north_col_name"],
z=path.get("z", 0),
)
if "segments" in path:
self._warn_unknown_keys(
section="path (procedural)",
provided=set(path.keys()),
allowed=allowed_proc,
)
return self._parse_procedural_path(path)
raise ValueError("Invalid path configuration.")
def _parse_procedural_path(
self,
path: Dict[str, Any]
) -> ProceduralPathSpec:
raw_segments = path.get("segments")
if not isinstance(raw_segments, List):
raise ValueError("path.segments must be a list of segments.")
raw_length = path.get("length")
if raw_length is None:
raise MissingConfigKeyError("path", {"length"})
if isinstance(raw_length, int | float):
raw_length = [float(raw_length)]
segments = self._process_segment_angles(raw_segments)
lengths = self._process_segment_lengths(raw_length, len(segments))
resolved_segments = self._combine_segments_lengths(segments, lengths)
return ProceduralPathSpec(
segments=resolved_segments,
z=path.get("z", defaults.DEFAULT_PATH_HEIGHT),
)
def _process_segment_angles(
self,
raw_segments: List[Union[str, dict]]
) -> List[Dict[str, Any]]:
normalized = []
for segment in raw_segments:
if isinstance(segment, str):
normalized.append({"type": segment, "angle": None})
elif isinstance(segment, dict):
if len(segment) != 1:
raise ValueError("Invalid segment definition.")
seg_type, angle = list(segment.items())[0]
normalized.append({"type": seg_type, "angle": angle})
else:
raise ValueError("Invalid segment entry format.")
return normalized
def _process_segment_lengths(
self,
raw_length_list: List[Union[int, float]],
num_segments: int
) -> List[float]:
num_lengths = len(raw_length_list)
if num_lengths == num_segments or num_lengths == 1:
return raw_length_list
else:
raise ValueError(
"Path length must either be a single number or a list with "
"number of elements equal to the number of segments."
)
def _combine_segments_lengths(
self,
segments: List[Dict[str, Any]],
lengths: List[float],
) -> List[SegmentSpec]:
resolved = []
for seg, length in zip(segments, lengths):
angle = seg["angle"]
if angle is not None and not self._is_turn(seg["type"]):
raise ValueError(
f"A {seg['type']} segment does not support an angle."
)
resolved.append(
SegmentSpec(
type=seg["type"],
length=length,
angle=angle,
)
)
return resolved
@staticmethod
def _is_turn(segment_type: str) -> bool:
return segment_type in {"turn_left", "turn_right"}
def _parse_point_sources(self) -> List[SourceSpec]:
source_dict = self.config.get("sources", {})
specs: List[SourceSpec] = []
for name, params in source_dict.items():
required = {"activity_MBq", "isotope", "position"}
missing = required - params.keys()
if missing:
raise MissingConfigKeyError(name, missing)
activity = params.get("activity_MBq")
isotope = params.get("isotope")
if not isinstance(activity, int | float) or activity <= 0:
raise ValueError(
f"sources.{name}.activity_MBq must be positive value "
"in MegaBequerels."
)
position = params.get("position")
if isinstance(position, list):
if len(position) != 3:
raise DimensionError(
"Absolute position must be [x, y, z]."
)
specs.append(
AbsolutePointSourceSpec(
name=name,
activity_MBq=float(activity),
isotope=isotope,
x=float(position[0]),
y=float(position[1]),
z=float(position[2]),
)
)
elif isinstance(position, dict):
specs.append(
RelativePointSourceSpec(
name=name,
activity_MBq=float(activity),
isotope=isotope,
along_path=float(position["along_path"]),
dist_from_path=float(position["dist_from_path"]),
side=position["side"],
z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT)
)
)
else:
raise ValueError(
f"Invalid position format for source '{name}'."
)
return specs
def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
unknown = provided - allowed
if unknown:
logger.warning(
f"Unknown keys in '{section}' section: {unknown}"
)

View File

@ -0,0 +1,76 @@
from abc import ABC
from dataclasses import dataclass
@dataclass
class MetadataSpec:
name: str
@dataclass
class RuntimeSpec:
speed: float
acquisition_time: float
@dataclass
class SimulationOptionsSpec:
air_density: float = 1.243
seed: int | None = None
@dataclass
class SegmentSpec:
type: str
length: float
angle: float | None
@dataclass
class PathSpec(ABC):
pass
@dataclass
class ProceduralPathSpec(PathSpec):
segments: list[SegmentSpec]
z: int | float
@dataclass
class CSVPathSpec(PathSpec):
file: str
east_col_name: str
north_col_name: str
z: int | float
@dataclass
class SourceSpec(ABC):
activity_MBq: float
isotope: str
name: str
@dataclass
class AbsolutePointSourceSpec(SourceSpec):
x: float
y: float
z: float
@dataclass
class RelativePointSourceSpec(SourceSpec):
along_path: float
dist_from_path: float
side: str
z: float
@dataclass
class SimulationSpec:
metadata: MetadataSpec
runtime: RuntimeSpec
options: SimulationOptionsSpec
path: PathSpec
point_sources: list[SourceSpec]

View File

@ -2,9 +2,8 @@
__ignore__ = ["logger"]
from pg_rad.isotopes import isotope
from pg_rad.isotopes import presets
from pg_rad.isotopes.isotope import (Isotope,)
from pg_rad.isotopes.presets import (CS137,)
from pg_rad.isotopes.isotope import (CS137, Isotope, get_isotope,
preset_isotopes,)
__all__ = ['CS137', 'Isotope', 'isotope', 'presets']
__all__ = ['CS137', 'Isotope', 'get_isotope', 'isotope', 'preset_isotopes']

View File

@ -1,4 +1,7 @@
from pg_rad.physics import get_mass_attenuation_coeff
from typing import Dict, Type
from pg_rad.exceptions.exceptions import InvalidIsotopeError
from pg_rad.physics.attenuation import get_mass_attenuation_coeff
class Isotope:
@ -25,3 +28,24 @@ class Isotope:
self.E = E
self.b = b
self.mu_mass_air = get_mass_attenuation_coeff(E / 1000)
class CS137(Isotope):
def __init__(self):
super().__init__(
name="Cs-137",
E=661.66,
b=0.851
)
preset_isotopes: Dict[str, Type[Isotope]] = {
"CS137": CS137
}
def get_isotope(isotope_str: str) -> Isotope:
"""Lazy factory function to create isotope objects."""
if isotope_str not in preset_isotopes:
raise InvalidIsotopeError(f"Unknown isotope: {isotope_str}")
return preset_isotopes[isotope_str]()

View File

@ -1,10 +0,0 @@
from .isotope import Isotope
class CS137(Isotope):
def __init__(self):
super().__init__(
name="Cs-137",
E=661.66,
b=0.851
)

View File

@ -1,11 +0,0 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.landscape import director
from pg_rad.landscape import landscape
from pg_rad.landscape.director import (LandscapeDirector,)
from pg_rad.landscape.landscape import (Landscape, LandscapeBuilder,)
__all__ = ['Landscape', 'LandscapeBuilder', 'LandscapeDirector', 'director',
'landscape']

View File

@ -0,0 +1,172 @@
import logging
from typing import Self
from .landscape import Landscape
from pg_rad.dataloader.dataloader import load_data
from pg_rad.objects.sources import PointSource
from pg_rad.exceptions.exceptions import OutOfBoundsError
from pg_rad.utils.projection import rel_to_abs_source_position
from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec
)
from pg_rad.path.path import Path, path_from_RT90
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
logger = logging.getLogger(__name__)
class LandscapeBuilder:
def __init__(self, name: str = "Unnamed landscape"):
self.name = name
self._path = None
self._point_sources = []
self._size = None
self._air_density = 1.243
logger.debug(f"LandscapeBuilder initialized: {self.name}")
def set_air_density(self, air_density: float | None) -> Self:
"""Set the air density of the world."""
if air_density and isinstance(air_density, float):
self._air_density = air_density
return self
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def get_path(self):
return self._path
def set_path_from_segments(
self,
sim_spec: SimulationSpec
):
segments = sim_spec.path.segments
types = [s.type for s in segments]
lengths = [s.length for s in segments]
angles = [s.angle for s in segments]
sg = SegmentedRoadGenerator(
length=lengths,
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
velocity=sim_spec.runtime.speed,
seed=sim_spec.options.seed
)
x, y = sg.generate(
segments=types,
lengths=lengths,
angles=angles
)
self._path = Path(list(zip(x, y)))
self._fit_landscape_to_path()
return self
def set_path_from_experimental_data(
self,
spec: CSVPathSpec
) -> Self:
df = load_data(spec.file)
self._path = path_from_RT90(
df=df,
east_col=spec.east_col_name,
north_col=spec.north_col_name,
z=spec.z
)
self._fit_landscape_to_path()
return self
def set_point_sources(
self,
*sources: AbsolutePointSourceSpec | RelativePointSourceSpec
):
"""Add one or more point sources to the world.
Args:
*sources (AbsolutePointSourceSpec | RelativePointSourceSpec):
One or more sources, passed as SourceSpecs tuple
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
for s in sources:
if isinstance(s, AbsolutePointSourceSpec):
pos = (s.x, s.y, s.z)
elif isinstance(s, RelativePointSourceSpec):
path = self.get_path()
pos = rel_to_abs_source_position(
x_list=path.x_list,
y_list=path.y_list,
path_z=path.z,
along_path=s.along_path,
side=s.side,
dist_from_path=s.dist_from_path)
if any(
p < 0 or p >= s for p, s in zip(pos, self._size)
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources.append(PointSource(
activity_MBq=s.activity_MBq,
isotope=s.isotope,
position=pos,
name=s.name
))
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
2) _size is too small to contain the path."""
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.debug("Because no Landscape size was set, "
"it will now set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Landscape size will be expanded to accommodate path."
)
max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size))
def build(self):
landscape = Landscape(
name=self.name,
path=self._path,
point_sources=self._point_sources,
size=self._size,
air_density=self._air_density
)
logger.info(f"Landscape built successfully: {landscape.name}")
return landscape

View File

@ -2,22 +2,48 @@ from importlib.resources import files
import logging
from pg_rad.configs.filepaths import TEST_EXP_DATA
from pg_rad.isotopes import CS137
from pg_rad.landscape.landscape import LandscapeBuilder
from pg_rad.objects import PointSource
from .builder import LandscapeBuilder
from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
ProceduralPathSpec
)
from pg_rad.objects.sources import PointSource
logger = logging.getLogger(__name__)
class LandscapeDirector:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.logger.debug("LandscapeDirector initialized.")
logger.debug("LandscapeDirector initialized.")
def build_test_landscape(self):
@staticmethod
def build_test_landscape():
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
source = PointSource(activity=100E9, isotope=CS137(), pos=(0, 0, 0))
source = PointSource(
activity_MBq=100E9,
isotope="CS137",
position=(0, 0, 0)
)
lb = LandscapeBuilder("Test landscape")
lb.set_air_density(1.243)
lb.set_path_from_experimental_data(fp, z=0)
lb.set_point_sources(source)
landscape = lb.build()
return landscape
@staticmethod
def build_from_config(config: SimulationSpec):
lb = LandscapeBuilder(config.metadata.name)
lb.set_air_density(config.options.air_density)
if isinstance(config.path, CSVPathSpec):
lb.set_path_from_experimental_data(spec=config.path)
elif isinstance(config.path, ProceduralPathSpec):
lb.set_path_from_segments(
sim_spec=config,
)
lb.set_point_sources(*config.point_sources)
landscape = lb.build()
return landscape

View File

@ -1,11 +1,8 @@
import logging
from typing import Self
from pg_rad.dataloader import load_data
from pg_rad.exceptions import OutOfBoundsError
from pg_rad.objects import PointSource
from pg_rad.path import Path, path_from_RT90
from pg_rad.physics.fluence import phi_single_source
from pg_rad.path.path import Path
from pg_rad.objects.sources import PointSource
logger = logging.getLogger(__name__)
@ -18,22 +15,18 @@ class Landscape:
self,
name: str,
path: Path,
point_sources: list[PointSource] = [],
size: tuple[int, int, int] = [500, 500, 50],
air_density: float = 1.243
air_density: float,
point_sources: list[PointSource],
size: tuple[int, int, int]
):
"""Initialize a landscape.
Args:
path (Path): A Path object.
point_sources (list[PointSource], optional): List of point sources.
air_density (float, optional): Air density in kg/m^3.
Defaults to 1.243.
size (tuple[int, int, int], optional): (x,y,z) dimensions of world
in meters. Defaults to [500, 500, 50].
Raises:
TypeError: _description_
name (str): Name of the landscape.
path (Path): The path of the detector.
air_density (float): Air density in kg/m^3.
point_sources (list[PointSource]): List of point sources.
size (tuple[int, int, int]): Size of the world.
"""
self.name = name
@ -43,119 +36,3 @@ class Landscape:
self.air_density = air_density
logger.debug(f"Landscape created: {self.name}")
def calculate_fluence_at(self, pos: tuple):
total_phi = 0.
for source in self.point_sources:
r = source.distance_to(pos)
phi_source = phi_single_source(
r=r,
activity=source.activity,
branching_ratio=source.isotope.b,
mu_mass_air=source.isotope.mu_mass_air,
air_density=self.air_density
)
total_phi += phi_source
return total_phi
def calculate_fluence_along_path(self):
pass
class LandscapeBuilder:
def __init__(self, name: str = "Unnamed landscape"):
self.name = name
self._path = None
self._point_sources = []
self._size = None
self._air_density = None
logger.debug(f"LandscapeBuilder initialized: {self.name}")
def set_air_density(self, air_density) -> Self:
"""Set the air density of the world."""
self._air_density = air_density
return self
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def set_path_from_experimental_data(
self,
filename: str,
z: int,
east_col: str = "East",
north_col: str = "North"
) -> Self:
df = load_data(filename)
self._path = path_from_RT90(
df=df,
east_col=east_col,
north_col=north_col,
z=z
)
# The size of the landscape will be updated if
# 1) _size is not set, or
# 2) _size is too small to contain the path.
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.debug("Because no Landscape size was set, "
"it will now set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Landscape size will be expanded to accommodate path."
)
self.set_landscape_size(self._path.size)
return self
def set_point_sources(self, *sources):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
if any(
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
for source in sources
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources = sources
def build(self):
landscape = Landscape(
name=self.name,
path=self._path,
point_sources=self._point_sources,
size=self._size,
air_density=self._air_density
)
logger.info(f"Landscape built successfully: {landscape.name}")
return landscape

View File

@ -1,7 +1,21 @@
import argparse
import logging
import sys
from pg_rad.logger import setup_logger
from pg_rad.landscape import LandscapeDirector
from pandas.errors import ParserError
from yaml import YAMLError
from pg_rad.exceptions.exceptions import (
MissingConfigKeyError,
OutOfBoundsError,
DimensionError,
InvalidIsotopeError
)
from pg_rad.logger.logger import setup_logger
from pg_rad.inputparser.parser import ConfigParser
from pg_rad.landscape.director import LandscapeDirector
from pg_rad.plotting.result_plotter import ResultPlotter
from pg_rad.simulator.engine import SimulationEngine
def main():
@ -9,24 +23,102 @@ def main():
prog="pg-rad",
description="Primary Gamma RADiation landscape tool"
)
parser.add_argument(
"--config",
help="Build from a config file."
)
parser.add_argument(
"--test",
action="store_true",
help="Load and run the test landscape"
help="Load and run the test landscape."
)
parser.add_argument(
"--loglevel",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"--saveplot",
action="store_true",
help="Save the plot or not."
)
args = parser.parse_args()
setup_logger(args.loglevel)
logger = logging.getLogger(__name__)
if args.test:
landscape = LandscapeDirector().build_test_landscape()
print(landscape.name)
test_yaml = """
name: Test landscape
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
sources:
test_source:
activity_MBq: 1000
position: [500, 100, 0]
isotope: CS137
"""
cp = ConfigParser(test_yaml).parse()
landscape = LandscapeDirector.build_from_config(cp)
output = SimulationEngine(
landscape=landscape,
runtime_spec=cp.runtime,
sim_spec=cp.options
).simulate()
plotter = ResultPlotter(landscape, output)
plotter.plot()
elif args.config:
try:
cp = ConfigParser(args.config).parse()
landscape = LandscapeDirector.build_from_config(cp)
output = SimulationEngine(
landscape=landscape,
runtime_spec=cp.runtime,
sim_spec=cp.options
).simulate()
plotter = ResultPlotter(landscape, output)
plotter.plot()
except (
MissingConfigKeyError,
KeyError,
YAMLError,
):
logger.critical(
"The provided config file is invalid. "
"Check the log above. You can consult the documentation for "
"an explanation of how to define a config file."
)
sys.exit(1)
except (
OutOfBoundsError,
DimensionError,
InvalidIsotopeError,
ValueError
) as e:
logger.critical(e)
logger.critical(
"One or more items in config are not specified correctly. "
"Please consult this log and fix the problem."
)
sys.exit(1)
except (
FileNotFoundError,
ParserError
):
sys.exit(1)
if __name__ == "__main__":

View File

@ -2,6 +2,8 @@ from typing import Self
import numpy as np
from pg_rad.exceptions.exceptions import DimensionError
class BaseObject:
def __init__(
@ -21,7 +23,7 @@ class BaseObject:
"""
if len(pos) != 3:
raise ValueError("Position must be tuple of length 3 (x,y,z).")
raise DimensionError("Position must be tuple of length 3 (x,y,z).")
self.pos = pos
self.name = name
self.color = color

View File

@ -1,7 +1,7 @@
import logging
from .objects import BaseObject
from pg_rad.isotopes import Isotope
from pg_rad.isotopes.isotope import Isotope, get_isotope
logger = logging.getLogger(__name__)
@ -11,16 +11,16 @@ class PointSource(BaseObject):
def __init__(
self,
activity: int,
isotope: Isotope,
pos: tuple[float, float, float] = (0, 0, 0),
activity_MBq: int,
isotope: str,
position: tuple[float, float, float] = (0, 0, 0),
name: str | None = None,
color: str = 'red'
):
"""A point source.
Args:
activity (int): Activity A in MBq.
activity_MBq (int): Activity A in MBq.
isotope (Isotope): The isotope.
pos (tuple[float, float, float], optional):
Position of the PointSource.
@ -37,16 +37,17 @@ class PointSource(BaseObject):
if name is None:
name = f"Source {self.id}"
super().__init__(pos, name, color)
super().__init__(position, name, color)
self.activity = activity
self.isotope = isotope
self.activity = activity_MBq
self.isotope: Isotope = get_isotope(isotope)
logger.debug(f"Source created: {self.name}")
def __repr__(self):
x, y, z = self.position
repr_str = (f"PointSource(name={self.name}, "
+ f"pos={(self.x, self.y, self.z)}, "
+ f"pos={(x, y, z)}, "
+ f"A={self.activity} MBq), "
+ f"isotope={self.isotope.name}.")

View File

@ -4,7 +4,9 @@ from pg_rad.physics import attenuation
from pg_rad.physics import fluence
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
from pg_rad.physics.fluence import (phi_single_source,)
from pg_rad.physics.fluence import (calculate_fluence_along_path,
calculate_fluence_at, phi,)
__all__ = ['attenuation', 'fluence', 'get_mass_attenuation_coeff',
'phi_single_source']
__all__ = ['attenuation', 'calculate_fluence_along_path',
'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff',
'phi']

View File

@ -1,7 +1,12 @@
from typing import Tuple, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from pg_rad.landscape.landscape import Landscape
def phi_single_source(
def phi(
r: float,
activity: float | int,
branching_ratio: float,
@ -27,11 +32,68 @@ def phi_single_source(
mu_mass_air *= 0.1
mu_air = mu_mass_air * air_density
phi = (
phi_r = (
activity
* branching_ratio
* np.exp(-mu_air * r)
/ (4 * np.pi * r**2)
)
return phi
return phi_r
def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
"""Compute fluence at an arbitrary position in the landscape.
Args:
landscape (Landscape): The landscape to compute.
pos (np.ndarray): (N, 3) array of positions.
Returns:
total_phi (np.ndarray): (N,) array of fluences.
"""
pos = np.atleast_2d(pos)
total_phi = np.zeros(pos.shape[0])
for source in landscape.point_sources:
r = np.linalg.norm(pos - np.array(source.pos), axis=1)
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
phi_source = phi(
r=r,
activity=source.activity * scaling,
branching_ratio=source.isotope.b,
mu_mass_air=source.isotope.mu_mass_air,
air_density=landscape.air_density
)
total_phi += phi_source
return total_phi
def calculate_fluence_along_path(
landscape: "Landscape",
points_per_segment: int = 10
) -> Tuple[np.ndarray, np.ndarray]:
path = landscape.path
num_segments = len(path.segments)
xnew = np.linspace(
path.x_list[0],
path.x_list[-1],
num=num_segments*points_per_segment)
ynew = np.interp(xnew, path.x_list, path.y_list)
z = np.full(xnew.shape, path.z)
full_positions = np.c_[xnew, ynew, z]
phi_result = calculate_fluence_at(landscape, full_positions)
dist_travelled = np.linspace(
full_positions[0, 0],
path.length,
len(phi_result)
)
return dist_travelled, phi_result

View File

@ -1,45 +1,84 @@
import logging
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.patches import Circle
from pg_rad.landscape import Landscape
from numpy import median
from pg_rad.landscape.landscape import Landscape
logger = logging.getLogger(__name__)
class LandscapeSlicePlotter:
def plot(self, landscape: Landscape, z: int = 0):
def plot(
self,
landscape: Landscape,
z: int = 0,
show: bool = True,
save: bool = False,
ax: Axes | None = None
):
"""Plot a top-down slice of the landscape at a height z.
Args:
landscape (Landscape): the landscape to plot
z (int, optional): Height at which to plot slice. Defaults to 0.
show (bool, optional): Show the plot. Defaults to True.
save (bool, optional): Save the plot. Defaults to False.
""" """
"""
self.z = z
fig, ax = plt.subplots()
if not ax:
fig, ax = plt.subplots()
self._draw_base(ax, landscape)
self._draw_path(ax, landscape)
self._draw_point_sources(ax, landscape)
ax.set_aspect("equal")
plt.show()
def _draw_base(self, ax, landscape):
if save and not ax:
landscape_name = landscape.name.lower().replace(' ', '_')
filename = f"{landscape_name}_z{self.z}.png"
plt.savefig(filename)
logger.info("Plot saved to file: "+filename)
if show and not ax:
plt.show()
return ax
def _draw_base(self, ax, landscape: Landscape):
width, height = landscape.size[:2]
ax.set_xlim(right=width)
ax.set_ylim(top=height)
ax.set_xlim(right=max(width, .5*height))
# if the road is very flat, we center it vertically (looks better)
if median(landscape.path.y_list) == 0:
h = max(height, .5*width)
ax.set_ylim(bottom=-h//2,
top=h//2)
else:
ax.set_ylim(top=max(height, .5*width))
ax.set_xlabel("X [m]")
ax.set_ylabel("Y [m]")
ax.set_title(f"Landscape (top-down, z = {self.z})")
def _draw_path(self, ax, landscape):
if landscape.path.z < self.z:
ax.plot(landscape.path.x_list, landscape.path.y_list, 'bo-')
if landscape.path.z <= self.z:
ax.plot(
landscape.path.x_list,
landscape.path.y_list,
linestyle='-',
marker='|',
markersize=3,
linewidth=1
)
else:
logger.warning(
"Path is above the slice height z."
@ -48,18 +87,19 @@ class LandscapeSlicePlotter:
def _draw_point_sources(self, ax, landscape):
for s in landscape.point_sources:
if s.z <= self.z:
x, y, z = s.pos
if z <= self.z:
dot = Circle(
(s.x, s.y),
(x, y),
radius=5,
color=s.color,
zorder=5
)
ax.text(
s.x + 0.06,
s.y + 0.06,
s.name,
x + 0.06,
y + 0.06,
s.name+", z="+str(z),
color=s.color,
fontsize=10,
ha="left",

View File

@ -0,0 +1,100 @@
from matplotlib import pyplot as plt
from matplotlib.gridspec import GridSpec
from .landscape_plotter import LandscapeSlicePlotter
from pg_rad.simulator.outputs import SimulationOutput
from pg_rad.landscape.landscape import Landscape
class ResultPlotter:
def __init__(self, landscape: Landscape, output: SimulationOutput):
self.landscape = landscape
self.count_rate_res = output.count_rate
self.source_res = output.sources
def plot(self, landscape_z: float = 0):
fig = plt.figure(figsize=(12, 10), constrained_layout=True)
fig.suptitle(self.landscape.name)
gs = GridSpec(
3,
2,
width_ratios=[0.5, 0.5],
height_ratios=[0.7, 0.15, 0.15],
hspace=0.2)
ax1 = fig.add_subplot(gs[0, 0])
self._draw_count_rate(ax1)
ax2 = fig.add_subplot(gs[0, 1])
self._plot_landscape(ax2, landscape_z)
ax3 = fig.add_subplot(gs[1, :])
self._draw_table(ax3)
ax4 = fig.add_subplot(gs[2, :])
self._draw_source_table(ax4)
plt.tight_layout()
plt.show()
def _plot_landscape(self, ax, z):
lp = LandscapeSlicePlotter()
ax = lp.plot(landscape=self.landscape, z=z, ax=ax, show=False)
return ax
def _draw_count_rate(self, ax):
x = self.count_rate_res.arc_length
y = self.count_rate_res.count_rate
ax.plot(x, y, label='Count rate', color='r')
ax.set_title('Count rate')
ax.set_xlabel('Arc length s [m]')
ax.set_ylabel('Counts')
ax.legend()
def _draw_table(self, ax):
ax.set_axis_off()
ax.set_title('Simulation parameters')
cols = ('Parameter', 'Value')
data = [
["Air density (kg/m^3)", round(self.landscape.air_density, 3)],
["Total path length (m)", round(self.landscape.path.length, 3)]
]
ax.table(
cellText=data,
colLabels=cols,
loc='center'
)
return ax
def _draw_source_table(self, ax):
ax.set_axis_off()
ax.set_title('Point sources')
cols = (
'Name',
'Isotope',
'Activity (MBq)',
'Position (m)',
'Dist. to path (m)'
)
# this formats position to tuple
data = [
[
s.name,
s.isotope,
s.activity,
"("+", ".join(f"{val:.2f}" for val in s.position)+")",
round(s.dist_from_path, 2)
]
for s in self.source_res
]
ax.table(
cellText=data,
colLabels=cols,
loc='center'
)
return ax

View File

View File

@ -0,0 +1,63 @@
from typing import List
from pg_rad.landscape.landscape import Landscape
from pg_rad.simulator.outputs import (
CountRateOutput,
SimulationOutput,
SourceOutput
)
from pg_rad.physics.fluence import calculate_fluence_along_path
from pg_rad.utils.projection import minimal_distance_to_path
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
class SimulationEngine:
"""Takes a fully built landscape and produces results."""
def __init__(
self,
landscape: Landscape,
runtime_spec=RuntimeSpec,
sim_spec=SimulationOptionsSpec
):
self.landscape = landscape
self.runtime_spec = runtime_spec
self.sim_spec = sim_spec
def simulate(self) -> SimulationOutput:
"""Compute everything and return structured output."""
count_rate_results = self._calculate_count_rate_along_path()
source_results = self._calculate_point_source_distance_to_path()
return SimulationOutput(
name=self.landscape.name,
count_rate=count_rate_results,
sources=source_results
)
def _calculate_count_rate_along_path(self) -> CountRateOutput:
arc_length, phi = calculate_fluence_along_path(self.landscape)
return CountRateOutput(arc_length, phi)
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:
path = self.landscape.path
source_output = []
for s in self.landscape.point_sources:
dist_to_path = minimal_distance_to_path(
path.x_list,
path.y_list,
path.z,
s.pos)
source_output.append(
SourceOutput(
s.name,
s.isotope.name,
s.activity,
s.pos,
dist_to_path)
)
return source_output

View File

@ -0,0 +1,25 @@
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class CountRateOutput:
arc_length: List[float]
count_rate: List[float]
@dataclass
class SourceOutput:
name: str
isotope: str
activity: float
position: Tuple[float, float, float]
dist_from_path: float
@dataclass
class SimulationOutput:
name: str
count_rate: CountRateOutput
sources: List[SourceOutput]

View File

View File

@ -0,0 +1,168 @@
from typing import List, Tuple
import numpy as np
from pg_rad.exceptions.exceptions import OutOfBoundsError
def rel_to_abs_source_position(
x_list: List,
y_list: List,
path_z: int | float,
along_path: int | float,
side: str,
dist_from_path: int | float,
z_rel: int | float = 0
):
path = np.column_stack((x_list, y_list))
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
arc_length = np.cumsum(segment_lengths)
arc_length = np.insert(arc_length, 0, 0)
# find index of the segment where the source should be placed
s_i = np.searchsorted(arc_length, along_path, side='right') - 1
if not 0 <= s_i < len(segments):
raise OutOfBoundsError(
"along_path must be less than the length of the path!"
)
# fraction of segment length where to place the point source
segment_fraction = (
(along_path - arc_length[s_i])
/ segment_lengths[s_i]
)
# tangential vector at point where point source to be placed
point_on_path = path[s_i] + segment_fraction * segments[s_i]
tangent_vec = segments[s_i] / segment_lengths[s_i]
if side not in {"left", "right"}:
raise ValueError("side must be 'left' or 'right'")
orientation = 1 if side == "left" else -1
perp_vec = orientation * np.array([-tangent_vec[1], tangent_vec[0]])
x_abs, y_abs = point_on_path + dist_from_path * perp_vec
z_abs = path_z + z_rel
return x_abs, y_abs, z_abs
def minimal_distance_to_path(
x_list: List[float],
y_list: List[float],
path_z: float,
pos: Tuple[float, float, float]
) -> float:
"""
Compute minimal Euclidean distance from pos (x,y,z)
to interpolated polyline path at height path_z.
"""
x, y, z = pos
path = np.column_stack((x_list, y_list))
point_xy = np.array([x, y])
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
min_dist = np.inf
for p0, seg, seg_len in zip(path[:-1], segments, segment_lengths):
if seg_len == 0:
continue
t = np.dot(point_xy - p0, seg) / (seg_len ** 2)
t = np.clip(t, 0.0, 1.0)
projection_xy = p0 + t * seg
dx, dy = point_xy - projection_xy
dz = z - path_z
dist = np.sqrt(dx**2 + dy**2 + dz**2)
if dist < min_dist:
min_dist = dist
return min_dist
def abs_to_rel_source_position(
x_list: List,
y_list: List,
path_z: int | float,
pos: Tuple[float, float, float],
) -> Tuple[float, float, str, float]:
"""
Convert absolute (x,y,z) position into:
along_path,
dist_from_path,
side ("left" or "right"),
z_rel
"""
x, y, z = pos
path = np.column_stack((x_list, y_list))
point = np.array([x, y])
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
arc_length = np.cumsum(segment_lengths)
arc_length = np.insert(arc_length, 0, 0)
min_dist = np.inf
best_projection = None
best_s_i = None
best_fraction = None
best_signed_dist = None
for (i, (p0, seg, seg_len)) in enumerate(
zip(path[:-1], segments, segment_lengths)
):
if seg_len == 0:
continue
# project point onto infinite line
t = np.dot(point - p0, seg) / (seg_len ** 2)
# clamp to segment
t_clamped = np.clip(t, 0.0, 1.0)
projection = p0 + t_clamped * seg
diff_vec = point - projection
dist = np.linalg.norm(diff_vec)
if dist < min_dist:
min_dist = dist
best_projection = projection
best_s_i = i
best_fraction = t_clamped
# tangent and perpendicular
tangent_vec = seg / seg_len
perp_vec = np.array([-tangent_vec[1], tangent_vec[0]])
signed_dist = np.dot(diff_vec, perp_vec)
best_signed_dist = signed_dist
if best_projection is None:
raise ValueError("Could not project point onto path.")
# Compute along_path
along_path = (
arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
)
# Side and distance
side = "left" if best_signed_dist > 0 else "right"
dist_from_path = abs(best_signed_dist)
# z relative
z_rel = z - path_z
return along_path, dist_from_path, side, z_rel

0
src/road_gen/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,55 @@
import secrets
import numpy as np
class BaseRoadGenerator:
"""A base generator object for generating a road of a specified length."""
def __init__(
self,
length: int | float,
ds: int | float,
velocity: int | float,
mu: float = 0.7,
g: float = 9.81,
seed: int | None = None
):
"""Initialize a BaseGenerator with a given or random seed.
Args:
length (int | float): The total length of the road in meters.
ds (int | float): The step size in meters.
velocity (int | float): Velocity in meters per second.
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
seed (int | None, optional): Set a seed for the generator.
Defaults to a random seed.
"""
if seed is None:
seed = secrets.randbits(32)
if not isinstance(seed, int):
raise TypeError("seed must be an integer or None.")
if not isinstance(length, int | float):
raise TypeError("Length must be an integer or float in meters.")
if not isinstance(ds, int | float):
raise TypeError("Step size must be integer or float in meters.")
if not isinstance(velocity, int | float):
raise TypeError(
"Velocity must be integer or float in meters per second."
)
self.length = length
self.ds = ds
self.velocity = velocity
self.min_radius = (velocity ** 2) / (g * mu)
self.seed = seed
self._rng = np.random.default_rng(seed)
def generate(self):
pass

View File

@ -0,0 +1,143 @@
import logging
from typing import Tuple
import numpy as np
from .base_road_generator import BaseRoadGenerator
from road_gen.prefabs import prefabs
from road_gen.integrator.integrator import integrate_road
from pg_rad.configs import defaults
logger = logging.getLogger(__name__)
class SegmentedRoadGenerator(BaseRoadGenerator):
def __init__(
self,
length: int | float | list[int | float],
ds: int | float,
velocity: int | float,
mu: float = 0.7,
g: float = 9.81,
seed: int | None = None
):
"""Initialize a SegmentedRoadGenerator with given or random seed.
Args:
length (int | float): The total length of the road in meters.
ds (int | float): The step size in meters.
velocity (int | float): Velocity in meters per second.
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
seed (int | None, optional): Set a seed for the generator.
Defaults to random seed.
"""
if isinstance(length, list):
length = sum(
[seg_len for seg_len in length if seg_len is not None]
)
super().__init__(length, ds, velocity, mu, g, seed)
def generate(
self,
segments: list[str],
lengths: list[int | float] | None = None,
angles: list[int | float] | None = None,
alpha: float = defaults.DEFAULT_ALPHA,
min_turn_angle: float = defaults.DEFAULT_MIN_TURN_ANGLE,
max_turn_angle: float = defaults.DEFAULT_MAX_TURN_ANGLE
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate a curvature profile from a list of segments.
Args:
segments (list[str]): List of segments.
alpha (float, optional): Dirichlet concentration parameter.
A higher value leads to more uniform apportionment of the
length amongst the segments, while a lower value allows more
random apportionment. Defaults to 1.0.
min_turn_angle (float, optional): Minimum turn angle in degrees for
random sampling of turn radius. Does nothing if `angle_list` is
provided or no `turn_*` segement is specified in the `segments`
list.
min_turn_angle (float, optional): Maximum turn angle in degrees for
random sampling of turn radius. Does nothing if `angle_list` is
provided or no `turn_*` segement is specified in the `segments`
list.
Raises:
ValueError: Raised when a turn
is too tight given its segment length and the velocity.
To fix this, you can try to reduce the amount of segments or
increase length. Increasing alpha
(Dirichlet concentration parameter) can also help because this
reduces the odds of very small lengths being assigned to
turn segments.
Returns:
Tuple[np.ndarray, np.ndarray]: x and y coordinates of the
waypoints describing the random road.
"""
existing_prefabs = prefabs.PREFABS.keys()
if not all(segment in existing_prefabs for segment in segments):
raise ValueError(
"Invalid segment type provided. Available choices"
f"{existing_prefabs}"
)
self.segments = segments
self.alpha = alpha
num_points = np.ceil(self.length / self.ds).astype(int)
# divide num_points into len(segments) randomly sized parts.
if isinstance(self.length, list):
parts = self.length
else:
parts = self._rng.dirichlet(
np.full(len(segments), alpha),
size=1)[0]
parts = parts * num_points
parts = np.round(parts).astype(int)
# correct round off so the sum of parts is still total length L.
if sum(parts) != num_points:
parts[0] += num_points - sum(parts)
curvature = np.zeros(num_points)
current_index = 0
for seg_name, seg_length in zip(segments, parts):
seg_function = prefabs.PREFABS[seg_name]
if seg_name == 'straight':
curvature_s = seg_function(seg_length)
else:
R_min_angle = seg_length / np.deg2rad(max_turn_angle)
R_max_angle = seg_length / np.deg2rad(min_turn_angle)
# physics limit
R_min = max(self.min_radius, R_min_angle)
if R_min > R_max_angle:
raise ValueError(
f"{seg_name} with length {seg_length} does not have "
"a possible radius. The minimum for the provided "
"velocity and friction coefficient is "
f"{self.min_radius}, but the possible range is "
f"({R_min}, {R_max_angle})"
)
rand_radius = self._rng.uniform(R_min, R_max_angle)
if seg_name.startswith("u_turn"):
curvature_s = seg_function(rand_radius)
else:
curvature_s = seg_function(seg_length, rand_radius)
curvature[current_index:(current_index + seg_length)] = curvature_s
current_index += seg_length
x, y = integrate_road(curvature)
return x * self.ds, y * self.ds

View File

View File

@ -0,0 +1,21 @@
import numpy as np
def integrate_road(curvature: np.ndarray, ds: float = 1.0):
"""Integrate along the curvature field to obtain X and Y coordinates.
Args:
curvature (np.ndarray): The curvature field.
ds (float, optional): _description_. Defaults to 1.0.
Returns:
x, y (np.ndarray): X and Y waypoints.
"""
theta = np.zeros(len(curvature))
theta[1:] = np.cumsum(curvature[:-1] * ds)
x = np.cumsum(np.cos(theta) * ds)
y = np.cumsum(np.sin(theta) * ds)
return x, y

View File

View File

@ -0,0 +1,20 @@
import numpy as np
def straight(length: int) -> np.ndarray:
return np.zeros(length)
def turn_left(length: int, radius: float) -> np.ndarray:
return np.full(length, 1.0 / radius)
def turn_right(length: int, radius: float) -> np.ndarray:
return -turn_left(length, radius)
PREFABS = {
'straight': straight,
'turn_left': turn_left,
'turn_right': turn_right,
}

View File

@ -1,34 +1,53 @@
from math import dist, exp, pi
import numpy as np
import pytest
from pg_rad.landscape import LandscapeDirector
from pg_rad.inputparser.parser import ConfigParser
from pg_rad.landscape.director import LandscapeDirector
from pg_rad.physics import calculate_fluence_at
@pytest.fixture
def phi_ref():
A = 100 # MBq
b = 0.851
mu_mass_air = 0.0778 # cm^2/g
air_density = 1.243 # kg/m^3
r = dist((0, 0, 0), (10, 10, 0)) # m
def phi_ref(test_landscape):
source = test_landscape.point_sources[0]
A *= 1E9 # Convert to Bq
mu_mass_air *= 0.1 # Convert to m^2/kg
r = np.linalg.norm(np.array([10, 10, 0]) - np.array(source.pos))
mu_air = mu_mass_air * air_density # [m^2/kg] x [kg/m^3] = [m^-1]
A = source.activity * 1E6
b = source.isotope.b
mu_air = source.isotope.mu_mass_air * test_landscape.air_density
mu_air *= 0.1
# [s^-1] x exp([m^-1] x [m]) / [m^-2] = [s^-1 m^-2]
phi = A * b * exp(-mu_air * r) / (4 * pi * r**2)
return phi
return A * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
@pytest.fixture
def test_landscape():
landscape = LandscapeDirector().build_test_landscape()
test_yaml = """
name: Test landscape
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
sources:
test_source:
activity_MBq: 100
position: [0, 0, 0]
isotope: CS137
"""
cp = ConfigParser(test_yaml).parse()
landscape = LandscapeDirector.build_from_config(cp)
return landscape
def test_single_source_fluence(phi_ref, test_landscape):
phi = test_landscape.calculate_fluence_at((10, 10, 0))
assert pytest.approx(phi, rel=1E-3) == phi_ref
phi = calculate_fluence_at(
test_landscape,
np.array([10, 10, 0]),
)
assert pytest.approx(phi[0], rel=1E-3) == phi_ref

View File

@ -2,17 +2,16 @@ import numpy as np
import pytest
from pg_rad.objects import PointSource
from pg_rad.isotopes import CS137
@pytest.fixture
def test_sources():
iso = CS137()
iso = "CS137"
pos_a = np.random.rand(3)
pos_b = np.random.rand(3)
a = PointSource(pos=pos_a, activity=None, isotope=iso)
b = PointSource(pos=pos_b, activity=None, isotope=iso)
a = PointSource(position=pos_a, activity_MBq=None, isotope=iso)
b = PointSource(position=pos_b, activity_MBq=None, isotope=iso)
return pos_a, pos_b, a, b