update fluence and simulation to work with detectors and correct count rates

This commit is contained in:
Pim Nelissen
2026-03-20 09:25:09 +01:00
parent a133b8b1c7
commit b1d781714b
5 changed files with 47 additions and 70 deletions

View File

@ -4,7 +4,7 @@ from pandas import read_csv
from pg_rad.configs.filepaths import ISOTOPE_TABLE from pg_rad.configs.filepaths import ISOTOPE_TABLE
from pg_rad.exceptions.exceptions import InvalidIsotopeError from pg_rad.exceptions.exceptions import InvalidIsotopeError
from pg_rad.physics.attenuation import get_mass_attenuation_coeff from pg_rad.utils.interpolators import get_mass_attenuation_coeff
class Isotope: class Isotope:

View File

@ -1,17 +0,0 @@
from importlib.resources import files
from pandas import read_csv
from scipy.interpolate import interp1d
from pg_rad.configs.filepaths import ATTENUATION_TABLE
def get_mass_attenuation_coeff(
*args
) -> float:
csv = files('pg_rad.data').joinpath(ATTENUATION_TABLE)
data = read_csv(csv)
x = data["energy_mev"].to_numpy()
y = data["mu"].to_numpy()
f = interp1d(x, y)
return f(*args)

View File

@ -2,11 +2,9 @@ from typing import Tuple, TYPE_CHECKING
import numpy as np import numpy as np
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
if TYPE_CHECKING: if TYPE_CHECKING:
from pg_rad.landscape.landscape import Landscape from pg_rad.landscape.landscape import Landscape
from pg_rad.detector.detector import Detector
def phi( def phi(
@ -33,16 +31,14 @@ def phi(
""" """
# Linear photon attenuation coefficient in m^-1. # Linear photon attenuation coefficient in m^-1.
mu_mass_air *= 0.1 mu_air = 0.1 * mu_mass_air * air_density
mu_air = mu_mass_air * air_density
phi_r = ( phi_r = (
activity activity
* eff * eff
* branching_ratio * branching_ratio
* np.exp(-mu_air * r) * np.exp(-mu_air * r)
/ (4 * np.pi * r**2) ) / (4 * np.pi * r**2)
)
return phi_r return phi_r
@ -50,8 +46,7 @@ def phi(
def calculate_count_rate_per_second( def calculate_count_rate_per_second(
landscape: "Landscape", landscape: "Landscape",
pos: np.ndarray, pos: np.ndarray,
detector: IsotropicDetector | AngularDetector, detector: "Detector",
tangent_vectors: np.ndarray,
scaling=1E6 scaling=1E6
): ):
"""Compute count rate in s^-1 m^-2 at a position in the landscape. """Compute count rate in s^-1 m^-2 at a position in the landscape.
@ -59,7 +54,7 @@ def calculate_count_rate_per_second(
Args: Args:
landscape (Landscape): The landscape to compute. landscape (Landscape): The landscape to compute.
pos (np.ndarray): (N, 3) array of positions. pos (np.ndarray): (N, 3) array of positions.
detector (IsotropicDetector | AngularDetector): detector (Detector):
Detector object, needed to compute correct efficiency. Detector object, needed to compute correct efficiency.
Returns: Returns:
@ -69,22 +64,29 @@ def calculate_count_rate_per_second(
total_phi = np.zeros(pos.shape[0]) total_phi = np.zeros(pos.shape[0])
for source in landscape.point_sources: for source in landscape.point_sources:
# See Bukartas (2021) page 25 for incidence angle math
source_to_detector = pos - np.array(source.pos) source_to_detector = pos - np.array(source.pos)
r = np.linalg.norm(source_to_detector, axis=1) r = np.linalg.norm(source_to_detector, axis=1)
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
if isinstance(detector, AngularDetector): if not detector.is_isotropic:
cos_theta = ( v = np.zeros_like(pos)
np.sum(tangent_vectors * source_to_detector, axis=1) / ( v[1:] = pos[1:] - pos[:-1]
np.linalg.norm(source_to_detector, axis=1) * v[0] = v[1] # handle first point
np.linalg.norm(tangent_vectors, axis=1) vx, vy = v[:, 0], v[:, 1]
)
) r_vec = pos - np.array(source.pos)
cos_theta = np.clip(cos_theta, -1, 1) rx, ry = r_vec[:, 0], r_vec[:, 1]
theta = np.arccos(cos_theta)
eff = detector.get_efficiency(theta, energy=source.isotope.E) theta = np.arctan2(vy, vx) - np.arctan2(ry, rx)
# normalise to [-pi, pi] and convert to degrees
theta = (theta + np.pi) % (2 * np.pi) - np.pi
theta_deg = np.degrees(theta)
eff = detector.get_efficiency(source.isotope.E, theta_deg)
else: else:
eff = detector.get_efficiency(energy=source.isotope.E) eff = detector.get_efficiency(source.isotope.E)
phi_source = phi( phi_source = phi(
r=r, r=r,
@ -102,15 +104,15 @@ def calculate_count_rate_per_second(
def calculate_counts_along_path( def calculate_counts_along_path(
landscape: "Landscape", landscape: "Landscape",
detector: "IsotropicDetector | AngularDetector", detector: "Detector",
acquisition_time: int, velocity: float,
points_per_segment: int = 10, points_per_segment: int = 10,
) -> Tuple[np.ndarray, np.ndarray]: ) -> Tuple[np.ndarray, np.ndarray]:
"""Compute the counts recorded in each acquisition period in the landscape. """Compute the counts recorded in each acquisition period in the landscape.
Args: Args:
landscape (Landscape): _description_ landscape (Landscape): _description_
detector (IsotropicDetector | AngularDetector): _description_ detector (Detector): _description_
points_per_segment (int, optional): _description_. Defaults to 100. points_per_segment (int, optional): _description_. Defaults to 100.
Returns: Returns:
@ -123,7 +125,6 @@ def calculate_counts_along_path(
num_segments = len(path.segments) num_segments = len(path.segments)
segment_lengths = np.array([seg.length for seg in path.segments]) segment_lengths = np.array([seg.length for seg in path.segments])
ds = segment_lengths[0]
original_distances = np.zeros(num_points) original_distances = np.zeros(num_points)
original_distances[1:] = np.cumsum(segment_lengths) original_distances[1:] = np.cumsum(segment_lengths)
@ -140,26 +141,17 @@ def calculate_counts_along_path(
if path.opposite_direction: if path.opposite_direction:
full_positions = np.flip(full_positions, axis=0) full_positions = np.flip(full_positions, axis=0)
# to compute the angle between sources and the direction of travel, we # [counts/s]
# compute tangent vectors along the path. cps = calculate_count_rate_per_second(
dx_ds = np.gradient(xnew, s) landscape, full_positions, detector
dy_ds = np.gradient(ynew, s)
tangent_vectors = np.c_[dx_ds, dy_ds, np.zeros_like(dx_ds)]
tangent_vectors /= np.linalg.norm(tangent_vectors, axis=1, keepdims=True)
count_rate = calculate_count_rate_per_second(
landscape, full_positions, detector, tangent_vectors
) )
count_rate *= (acquisition_time / points_per_segment) # reshape so each segment is on a row
cps_per_seg = cps.reshape(num_segments, points_per_segment)
count_rate_segs = count_rate.reshape(num_segments, points_per_segment) du = s[1] - s[0]
integrated = np.trapezoid( integrated_counts = np.trapezoid(cps_per_seg, dx=du, axis=1) / velocity
count_rate_segs, int_counts_result = np.zeros(num_points)
dx=ds/points_per_segment, int_counts_result[1:] = integrated_counts
axis=1
)
result = np.zeros(num_points) return original_distances, s, cps, int_counts_result
result[1:] = integrated
return original_distances, result

View File

@ -6,7 +6,7 @@ from pg_rad.simulator.outputs import (
SimulationOutput, SimulationOutput,
SourceOutput SourceOutput
) )
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
from pg_rad.physics.fluence import calculate_counts_along_path from pg_rad.physics.fluence import calculate_counts_along_path
from pg_rad.utils.projection import minimal_distance_to_path from pg_rad.utils.projection import minimal_distance_to_path
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
@ -17,13 +17,12 @@ class SimulationEngine:
def __init__( def __init__(
self, self,
landscape: Landscape, landscape: Landscape,
detector: IsotropicDetector | AngularDetector,
runtime_spec: RuntimeSpec, runtime_spec: RuntimeSpec,
sim_spec: SimulationOptionsSpec, sim_spec: SimulationOptionsSpec,
): ):
self.landscape = landscape self.landscape = landscape
self.detector = detector self.detector = self.landscape.detector
self.runtime_spec = runtime_spec self.runtime_spec = runtime_spec
self.sim_spec = sim_spec self.sim_spec = sim_spec
@ -40,12 +39,13 @@ class SimulationEngine:
) )
def _calculate_count_rate_along_path(self) -> CountRateOutput: def _calculate_count_rate_along_path(self) -> CountRateOutput:
arc_length, phi = calculate_counts_along_path( acq_points, sub_points, cps, int_counts = calculate_counts_along_path(
self.landscape, self.landscape,
self.detector, self.detector,
acquisition_time=self.runtime_spec.acquisition_time velocity=self.runtime_spec.speed
) )
return CountRateOutput(arc_length, phi)
return CountRateOutput(acq_points, sub_points, cps, int_counts)
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]: def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:

View File

@ -5,8 +5,10 @@ from dataclasses import dataclass
@dataclass @dataclass
class CountRateOutput: class CountRateOutput:
arc_length: List[float] acquisition_points: List[float]
count_rate: List[float] sub_points: List[float]
cps: List[float]
integrated_counts: List[float]
@dataclass @dataclass