Add flip direction. Change mean to Trapezoidal rule for integration along path. Scale count rate properly with acquisition time

This commit is contained in:
Pim Nelissen
2026-03-10 20:44:18 +01:00
parent b882f20358
commit b82196e431
8 changed files with 137 additions and 52 deletions

View File

@ -7,7 +7,8 @@ import yaml
from pg_rad.exceptions.exceptions import (
MissingConfigKeyError,
DimensionError,
InvalidConfigValueError
InvalidConfigValueError,
InvalidYAMLError
)
from pg_rad.configs import defaults
@ -45,7 +46,7 @@ class ConfigParser:
def parse(self) -> SimulationSpec:
self._warn_unknown_keys(
section="global",
section="root",
provided=set(self.config.keys()),
allowed=self._ALLOWED_ROOT_KEYS,
)
@ -74,7 +75,7 @@ class ConfigParser:
data = yaml.safe_load(config_source)
if not isinstance(data, dict):
raise ValueError(
raise InvalidYAMLError(
"Provided path or string is not a valid YAML representation."
)
@ -86,13 +87,13 @@ class ConfigParser:
name=self.config["name"]
)
except KeyError as e:
raise MissingConfigKeyError("global", {"name"}) from e
raise MissingConfigKeyError("root", {"name"}) from e
def _parse_runtime(self) -> RuntimeSpec:
required = {"speed", "acquisition_time"}
missing = required - self.config.keys()
if missing:
raise MissingConfigKeyError("global", missing)
raise MissingConfigKeyError("root", missing)
return RuntimeSpec(
speed=float(self.config["speed"]),
@ -116,7 +117,7 @@ class ConfigParser:
seed = options.get("seed")
if not isinstance(air_density, float) or air_density <= 0:
raise ValueError(
raise InvalidConfigValueError(
"options.air_density_kg_per_m3 must be a positive float "
"in kg/m^3."
)
@ -124,7 +125,9 @@ class ConfigParser:
seed is not None or
(isinstance(seed, int) and seed <= 0)
):
raise ValueError("Seed must be a positive integer value.")
raise InvalidConfigValueError(
"Seed must be a positive integer value."
)
return SimulationOptionsSpec(
air_density=air_density,
@ -133,14 +136,26 @@ class ConfigParser:
def _parse_path(self) -> PathSpec:
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
allowed_proc = {"segments", "length", "z", "alpha"}
allowed_proc = {"segments", "length", "z", "alpha", "direction"}
path = self.config.get("path")
direction = path.get("direction", 'positive')
if direction == 'positive':
opposite_direction = False
elif direction == 'negative':
opposite_direction = True
else:
raise InvalidConfigValueError(
"Direction must be positive or negative."
)
if path is None:
raise MissingConfigKeyError("global", {"path"})
raise MissingConfigKeyError("path")
if not isinstance(path, dict):
raise ValueError("Path must be a dictionary.")
raise InvalidConfigValueError("Path must be a dictionary.")
if "file" in path:
self._warn_unknown_keys(
@ -154,26 +169,31 @@ class ConfigParser:
east_col_name=path["east_col_name"],
north_col_name=path["north_col_name"],
z=path.get("z", 0),
opposite_direction=opposite_direction
)
if "segments" in path:
elif "segments" in path:
self._warn_unknown_keys(
section="path (procedural)",
provided=set(path.keys()),
allowed=allowed_proc,
)
return self._parse_procedural_path(path)
return self._parse_procedural_path(path, opposite_direction)
raise ValueError("Invalid path configuration.")
else:
raise InvalidConfigValueError("Invalid path configuration.")
def _parse_procedural_path(
self,
path: Dict[str, Any]
path: Dict[str, Any],
opposite_direction: bool
) -> ProceduralPathSpec:
raw_segments = path.get("segments")
if not isinstance(raw_segments, List):
raise ValueError("path.segments must be a list of segments.")
raise InvalidConfigValueError(
"path.segments must be a list of segments."
)
raw_length = path.get("length")
@ -191,7 +211,8 @@ class ConfigParser:
angles=angles,
lengths=lengths,
z=path.get("z", defaults.DEFAULT_PATH_HEIGHT),
alpha=path.get("alpha", defaults.DEFAULT_ALPHA)
alpha=path.get("alpha", defaults.DEFAULT_ALPHA),
opposite_direction=opposite_direction
)
def _process_segment_angles(
@ -209,13 +230,17 @@ class ConfigParser:
elif isinstance(segment, dict):
if len(segment) != 1:
raise ValueError("Invalid segment definition.")
raise InvalidConfigValueError(
"Invalid segment definition."
)
seg_type, angle = list(segment.items())[0]
segments.append(seg_type)
angles.append(angle)
else:
raise ValueError("Invalid segment entry format.")
raise InvalidConfigValueError(
"Invalid segment entry format."
)
return segments, angles
@ -239,12 +264,25 @@ class ConfigParser:
return segment_type in {"turn_left", "turn_right"}
def _parse_point_sources(self) -> List[PointSourceSpec]:
source_dict = self.config.get("sources", {})
source_dict = self.config.get("sources")
if source_dict is None:
raise MissingConfigKeyError("sources")
if not isinstance(source_dict, dict):
raise InvalidConfigValueError(
"sources must have subkeys representing point source names."
)
specs: List[PointSourceSpec] = []
for name, params in source_dict.items():
required = {"activity_MBq", "isotope", "position"}
if not isinstance(params, dict):
raise InvalidConfigValueError(
f"sources.{name} is not defined correctly."
f" Must have subkeys {required}"
)
missing = required - params.keys()
if missing:
raise MissingConfigKeyError(name, missing)
@ -309,8 +347,13 @@ class ConfigParser:
return specs
def _parse_detector(self) -> DetectorSpec:
det_dict = self.config.get("detector", {})
det_dict = self.config.get("detector")
required = {"name", "is_isotropic"}
if not isinstance(det_dict, dict):
raise InvalidConfigValueError(
"detector is not specified correctly. Must contain at least"
f"the subkeys {required}"
)
missing = required - det_dict.keys()
if missing:
@ -327,7 +370,7 @@ class ConfigParser:
elif eff:
pass
else:
raise ValueError(
raise InvalidConfigValueError(
f"The detector {name} not found in library. Either "
f"specify {name}.efficiency or "
"choose a detector from the following list: "

View File

@ -22,7 +22,8 @@ class SimulationOptionsSpec:
@dataclass
class PathSpec(ABC):
pass
z: int | float
opposite_direction: bool
@dataclass
@ -30,7 +31,6 @@ class ProceduralPathSpec(PathSpec):
segments: list[str]
angles: list[float]
lengths: list[int | None]
z: int | float
alpha: float
@ -39,7 +39,6 @@ class CSVPathSpec(PathSpec):
file: str
east_col_name: str
north_col_name: str
z: int | float
@dataclass

View File

@ -12,7 +12,8 @@ from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec
RelativePointSourceSpec,
DetectorSpec
)
from pg_rad.path.path import Path, path_from_RT90
@ -30,6 +31,7 @@ class LandscapeBuilder:
self._point_sources = []
self._size = None
self._air_density = 1.243
self._detector = None
logger.debug(f"LandscapeBuilder initialized: {self.name}")
@ -58,10 +60,12 @@ class LandscapeBuilder:
self,
sim_spec: SimulationSpec
):
segments = sim_spec.path.segments
lengths = sim_spec.path.lengths
angles = sim_spec.path.angles
alpha = sim_spec.path.alpha
path = sim_spec.path
segments = path.segments
lengths = path.lengths
angles = path.angles
alpha = path.alpha
z = path.z
sg = SegmentedRoadGenerator(
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
@ -76,7 +80,11 @@ class LandscapeBuilder:
alpha=alpha
)
self._path = Path(list(zip(x, y)))
self._path = Path(
list(zip(x, y)),
z=z,
opposite_direction=path.opposite_direction
)
self._fit_landscape_to_path()
return self
@ -89,7 +97,8 @@ class LandscapeBuilder:
df=df,
east_col=spec.east_col_name,
north_col=spec.north_col_name,
z=spec.z
z=spec.z,
opposite_direction=spec.opposite_direction
)
self._fit_landscape_to_path()
@ -151,6 +160,10 @@ class LandscapeBuilder:
name=s.name
))
def set_detector(self, spec: DetectorSpec) -> Self:
self._detector = spec
return self
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
@ -221,6 +234,7 @@ class LandscapeBuilder:
name=self.name,
path=self._path,
point_sources=self._point_sources,
detector=self._detector,
size=self._size,
air_density=self._air_density
)

View File

@ -45,5 +45,6 @@ class LandscapeDirector:
sim_spec=config,
)
lb.set_point_sources(*config.point_sources)
lb.set_detector(config.detector)
landscape = lb.build()
return landscape

View File

@ -1,6 +1,7 @@
import logging
from pg_rad.path.path import Path
from pg_rad.detector.detectors import AngularDetector, IsotropicDetector
from pg_rad.objects.sources import PointSource
@ -17,6 +18,7 @@ class Landscape:
path: Path,
air_density: float,
point_sources: list[PointSource],
detector: IsotropicDetector | AngularDetector,
size: tuple[int, int, int]
):
"""Initialize a landscape.
@ -34,5 +36,6 @@ class Landscape:
self.point_sources = point_sources
self.size = size
self.air_density = air_density
self.detector = detector
logger.debug(f"Landscape created: {self.name}")

View File

@ -42,6 +42,7 @@ class Path:
def __init__(
self,
coord_list: Sequence[tuple[float, float]],
opposite_direction: bool,
z: float = 0.,
z_box: float = 50.
):
@ -74,6 +75,8 @@ class Path:
]
self.z = z
self.opposite_direction = opposite_direction
self.size = (
np.ceil(max(self.x_list)),
np.ceil(max(self.y_list)),

View File

@ -47,14 +47,14 @@ def phi(
return phi_r
def calculate_fluence_at(
def calculate_count_rate_per_second(
landscape: "Landscape",
pos: np.ndarray,
detector: IsotropicDetector | AngularDetector,
tangent_vectors: np.ndarray,
scaling=1E6
):
"""Compute fluence at an arbitrary position in the landscape.
"""Compute count rate in s^-1 m^-2 at a position in the landscape.
Args:
landscape (Landscape): The landscape to compute.
@ -63,7 +63,7 @@ def calculate_fluence_at(
Detector object, needed to compute correct efficiency.
Returns:
total_phi (np.ndarray): (N,) array of fluences.
total_phi (np.ndarray): (N,) array of count rates per second.
"""
pos = np.atleast_2d(pos)
total_phi = np.zeros(pos.shape[0])
@ -103,30 +103,42 @@ def calculate_fluence_at(
def calculate_fluence_along_path(
landscape: "Landscape",
detector: "IsotropicDetector | AngularDetector",
points_per_segment: int = 10
acquisition_time: int,
points_per_segment: int = 10,
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute the fluence along a full path in the landscape.
Args:
landscape (Landscape): _description_
detector (IsotropicDetector | AngularDetector): _description_
points_per_segment (int, optional): _description_. Defaults to 100.
Returns:
Tuple[np.ndarray, np.ndarray]: _description_
"""
path = landscape.path
num_points = len(path.x_list)
num_segments = len(path.segments)
dx = np.diff(path.x_list)
dy = np.diff(path.y_list)
segment_lengths = np.sqrt(dx**2 + dy**2)
segment_lengths = np.array([seg.length for seg in path.segments])
ds = segment_lengths[0]
original_distances = np.zeros(num_points)
original_distances[1:] = np.cumsum(segment_lengths)
# arc lengths at which to evaluate the path
s = np.linspace(
0,
original_distances[-1],
num=num_points * points_per_segment)
total_subpoints = num_segments * points_per_segment
s = np.linspace(0, original_distances[-1], total_subpoints)
# Interpolate x and y as functions of arc length
xnew = np.interp(s, original_distances, path.x_list)
ynew = np.interp(s, original_distances, path.y_list)
z = np.full(xnew.shape, path.z)
z = np.full_like(xnew, path.z)
full_positions = np.c_[xnew, ynew, z]
if path.opposite_direction:
full_positions = np.flip(full_positions, axis=0)
# to compute the angle between sources and the direction of travel, we
# compute tangent vectors along the path.
dx_ds = np.gradient(xnew, s)
@ -134,10 +146,19 @@ def calculate_fluence_along_path(
tangent_vectors = np.c_[dx_ds, dy_ds, np.zeros_like(dx_ds)]
tangent_vectors /= np.linalg.norm(tangent_vectors, axis=1, keepdims=True)
phi_result = calculate_fluence_at(
landscape,
full_positions,
detector,
tangent_vectors)
count_rate = calculate_count_rate_per_second(
landscape, full_positions, detector, tangent_vectors
)
return s, phi_result
count_rate *= (acquisition_time / points_per_segment)
count_rate_segs = count_rate.reshape(num_segments, points_per_segment)
integrated = np.trapezoid(
count_rate_segs,
dx=ds/points_per_segment,
axis=1
)
result = np.zeros(num_points)
result[1:] = integrated
return original_distances, result

View File

@ -42,7 +42,8 @@ class SimulationEngine:
def _calculate_count_rate_along_path(self) -> CountRateOutput:
arc_length, phi = calculate_fluence_along_path(
self.landscape,
self.detector
self.detector,
acquisition_time=self.runtime_spec.acquisition_time
)
return CountRateOutput(arc_length, phi)