Move fluence calcs to physics from landscape. Update LandScapeBuilder to accommodate config and segments

This commit is contained in:
Pim Nelissen
2026-02-20 11:40:36 +01:00
parent fdc11b4076
commit d53f7c5e2f
3 changed files with 121 additions and 41 deletions

View File

@ -1,11 +1,14 @@
import logging import logging
from typing import Self from typing import List, Self
from pg_rad.dataloader.dataloader import load_data from pg_rad.dataloader.dataloader import load_data
from pg_rad.exceptions.exceptions import OutOfBoundsError from pg_rad.exceptions.exceptions import OutOfBoundsError
from pg_rad.objects.sources import PointSource from pg_rad.objects.sources import PointSource
from pg_rad.path.path import Path, path_from_RT90 from pg_rad.path.path import Path, path_from_RT90
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -71,24 +74,77 @@ class LandscapeBuilder:
return self return self
def get_path(self):
return self._path
def set_path_from_segments(
self,
length: int | float,
speed: int | float,
acquisition_time: int,
segments: List,
angles: List,
alpha: int | float = None
):
sg = SegmentedRoadGenerator(
length=length,
ds=speed*acquisition_time,
velocity=speed
)
x, y = sg.generate(
segments=segments
)
self._path = Path(list(zip(x, y)))
self._fit_landscape_to_path()
return self
def set_path_from_experimental_data( def set_path_from_experimental_data(
self, self,
filename: str, file: str,
z: int, z: int,
east_col: str = "East", east_col_name: str = "East",
north_col: str = "North" north_col_name: str = "North"
) -> Self: ) -> Self:
df = load_data(filename) df = load_data(file)
self._path = path_from_RT90( self._path = path_from_RT90(
df=df, df=df,
east_col=east_col, east_col=east_col_name,
north_col=north_col, north_col=north_col_name,
z=z z=z
) )
# The size of the landscape will be updated if self._fit_landscape_to_path()
# 1) _size is not set, or
# 2) _size is too small to contain the path. return self
def set_point_sources(self, *sources):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
if any(
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
for source in sources
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources = sources
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
2) _size is too small to contain the path."""
needs_resize = ( needs_resize = (
not self._size not self._size
or any(p > s for p, s in zip(self._path.size, self._size)) or any(p > s for p, s in zip(self._path.size, self._size))
@ -104,31 +160,8 @@ class LandscapeBuilder:
"Landscape size will be expanded to accommodate path." "Landscape size will be expanded to accommodate path."
) )
self.set_landscape_size(self._path.size) max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size))
return self
def set_point_sources(self, *sources):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
if any(
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
for source in sources
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources = sources
def build(self): def build(self):
landscape = Landscape( landscape = Landscape(

View File

@ -4,7 +4,9 @@ from pg_rad.physics import attenuation
from pg_rad.physics import fluence from pg_rad.physics import fluence
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,) from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
from pg_rad.physics.fluence import (phi_single_source,) from pg_rad.physics.fluence import (calculate_fluence_along_path,
calculate_fluence_at, phi,)
__all__ = ['attenuation', 'fluence', 'get_mass_attenuation_coeff', __all__ = ['attenuation', 'calculate_fluence_along_path',
'phi_single_source'] 'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff',
'phi']

View File

@ -1,7 +1,12 @@
from typing import TYPE_CHECKING
import numpy as np import numpy as np
if TYPE_CHECKING:
from pg_rad.landscape.landscape import Landscape
def phi_single_source(
def phi(
r: float, r: float,
activity: float | int, activity: float | int,
branching_ratio: float, branching_ratio: float,
@ -27,11 +32,51 @@ def phi_single_source(
mu_mass_air *= 0.1 mu_mass_air *= 0.1
mu_air = mu_mass_air * air_density mu_air = mu_mass_air * air_density
phi = ( phi_r = (
activity activity
* branching_ratio * branching_ratio
* np.exp(-mu_air * r) * np.exp(-mu_air * r)
/ (4 * np.pi * r**2) / (4 * np.pi * r**2)
) )
return phi return phi_r
def calculate_fluence_at(landscape: "Landscape", pos: tuple):
total_phi = 0.
for source in landscape.point_sources:
r = source.distance_to(pos)
phi_source = phi(
r=r,
activity=source.activity,
branching_ratio=source.isotope.b,
mu_mass_air=source.isotope.mu_mass_air,
air_density=landscape.air_density
)
total_phi += phi_source
return total_phi
def calculate_fluence_along_path(
landscape: "Landscape",
points_per_segment: int = 10
):
phi_result = []
path = landscape.path
waypoints = list(zip(path.x_list, path.y_list))
for w, wp1 in zip(waypoints, waypoints[1:]):
x_ref, y_ref = zip(w, wp1)
x = np.linspace(x_ref[0], x_ref[1], points_per_segment)
y = np.interp(x, x_ref, y_ref)
z = np.full(x.shape, fill_value=path.z)
for pos in zip(x, y, z):
phi_segment = calculate_fluence_at(landscape, pos)
phi_result.append(phi_segment)
return phi_result