From d53f7c5e2f79989daca3098b598b43feac2bf776 Mon Sep 17 00:00:00 2001 From: Pim Nelissen Date: Fri, 20 Feb 2026 11:40:36 +0100 Subject: [PATCH] Move fluence calcs to physics from landscape. Update LandScapeBuilder to accommodate config and segments --- src/pg_rad/landscape/landscape.py | 103 ++++++++++++++++++++---------- src/pg_rad/physics/__init__.py | 8 ++- src/pg_rad/physics/fluence.py | 51 ++++++++++++++- 3 files changed, 121 insertions(+), 41 deletions(-) diff --git a/src/pg_rad/landscape/landscape.py b/src/pg_rad/landscape/landscape.py index 3af4733..8df0226 100644 --- a/src/pg_rad/landscape/landscape.py +++ b/src/pg_rad/landscape/landscape.py @@ -1,11 +1,14 @@ import logging -from typing import Self +from typing import List, Self from pg_rad.dataloader.dataloader import load_data from pg_rad.exceptions.exceptions import OutOfBoundsError from pg_rad.objects.sources import PointSource from pg_rad.path.path import Path, path_from_RT90 +from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator + + logger = logging.getLogger(__name__) @@ -71,24 +74,77 @@ class LandscapeBuilder: return self + def get_path(self): + return self._path + + def set_path_from_segments( + self, + length: int | float, + speed: int | float, + acquisition_time: int, + segments: List, + angles: List, + alpha: int | float = None + ): + sg = SegmentedRoadGenerator( + length=length, + ds=speed*acquisition_time, + velocity=speed + ) + + x, y = sg.generate( + segments=segments + ) + + self._path = Path(list(zip(x, y))) + self._fit_landscape_to_path() + return self + def set_path_from_experimental_data( self, - filename: str, + file: str, z: int, - east_col: str = "East", - north_col: str = "North" + east_col_name: str = "East", + north_col_name: str = "North" ) -> Self: - df = load_data(filename) + df = load_data(file) self._path = path_from_RT90( df=df, - east_col=east_col, - north_col=north_col, + east_col=east_col_name, + north_col=north_col_name, z=z ) + + self._fit_landscape_to_path() + + return self + + def set_point_sources(self, *sources): + """Add one or more point sources to the world. + + Args: + *sources (pg_rad.sources.PointSource): One or more sources, + passed as Source1, Source2, ... + Raises: + OutOfBoundsError: If any source is outside the boundaries of the + landscape. + """ + if any( + any(p < 0 or p >= s for p, s in zip(source.pos, self._size)) + for source in sources + ): + raise OutOfBoundsError( + "One or more sources attempted to " + "be placed outside the landscape." + ) + + self._point_sources = sources + + def _fit_landscape_to_path(self) -> None: + """The size of the landscape will be updated if + 1) _size is not set, or + 2) _size is too small to contain the path.""" - # The size of the landscape will be updated if - # 1) _size is not set, or - # 2) _size is too small to contain the path. needs_resize = ( not self._size or any(p > s for p, s in zip(self._path.size, self._size)) @@ -104,31 +160,8 @@ class LandscapeBuilder: "Landscape size will be expanded to accommodate path." ) - self.set_landscape_size(self._path.size) - - return self - - def set_point_sources(self, *sources): - """Add one or more point sources to the world. - - Args: - *sources (pg_rad.sources.PointSource): One or more sources, - passed as Source1, Source2, ... - Raises: - OutOfBoundsError: If any source is outside the boundaries of the - landscape. - """ - - if any( - any(p < 0 or p >= s for p, s in zip(source.pos, self._size)) - for source in sources - ): - raise OutOfBoundsError( - "One or more sources attempted to " - "be placed outside the landscape." - ) - - self._point_sources = sources + max_size = max(self._path.size) + self.set_landscape_size((max_size, max_size)) def build(self): landscape = Landscape( diff --git a/src/pg_rad/physics/__init__.py b/src/pg_rad/physics/__init__.py index fe9b62d..dd98ebe 100644 --- a/src/pg_rad/physics/__init__.py +++ b/src/pg_rad/physics/__init__.py @@ -4,7 +4,9 @@ from pg_rad.physics import attenuation from pg_rad.physics import fluence from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,) -from pg_rad.physics.fluence import (phi_single_source,) +from pg_rad.physics.fluence import (calculate_fluence_along_path, + calculate_fluence_at, phi,) -__all__ = ['attenuation', 'fluence', 'get_mass_attenuation_coeff', - 'phi_single_source'] +__all__ = ['attenuation', 'calculate_fluence_along_path', + 'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff', + 'phi'] diff --git a/src/pg_rad/physics/fluence.py b/src/pg_rad/physics/fluence.py index 8bae5f3..09eaf08 100644 --- a/src/pg_rad/physics/fluence.py +++ b/src/pg_rad/physics/fluence.py @@ -1,7 +1,12 @@ +from typing import TYPE_CHECKING + import numpy as np +if TYPE_CHECKING: + from pg_rad.landscape.landscape import Landscape -def phi_single_source( + +def phi( r: float, activity: float | int, branching_ratio: float, @@ -27,11 +32,51 @@ def phi_single_source( mu_mass_air *= 0.1 mu_air = mu_mass_air * air_density - phi = ( + phi_r = ( activity * branching_ratio * np.exp(-mu_air * r) / (4 * np.pi * r**2) ) - return phi + return phi_r + + +def calculate_fluence_at(landscape: "Landscape", pos: tuple): + total_phi = 0. + for source in landscape.point_sources: + r = source.distance_to(pos) + phi_source = phi( + r=r, + activity=source.activity, + branching_ratio=source.isotope.b, + mu_mass_air=source.isotope.mu_mass_air, + air_density=landscape.air_density + ) + total_phi += phi_source + return total_phi + + +def calculate_fluence_along_path( + landscape: "Landscape", + points_per_segment: int = 10 +): + + phi_result = [] + + path = landscape.path + waypoints = list(zip(path.x_list, path.y_list)) + + for w, wp1 in zip(waypoints, waypoints[1:]): + x_ref, y_ref = zip(w, wp1) + + x = np.linspace(x_ref[0], x_ref[1], points_per_segment) + y = np.interp(x, x_ref, y_ref) + z = np.full(x.shape, fill_value=path.z) + + for pos in zip(x, y, z): + phi_segment = calculate_fluence_at(landscape, pos) + phi_result.append(phi_segment) + + return phi_result + \ No newline at end of file