mirror of
https://github.com/pim-n/pg-rad
synced 2026-03-23 21:58:12 +01:00
Add detector architecture + isotropic detectors
This commit is contained in:
@ -16,3 +16,10 @@ DEFAULT_MAX_TURN_ANGLE = 90.
|
||||
DEFAULT_FRICTION_COEFF = 0.7 # dry asphalt
|
||||
DEFAULT_GRAVITATIONAL_ACC = 9.81 # m/s^2
|
||||
DEFAULT_ALPHA = 100.
|
||||
|
||||
# --- Detector efficiencies ---
|
||||
DETECTOR_EFFICIENCIES = {
|
||||
"dummy": 1.0,
|
||||
"NaIR": 0.0216,
|
||||
"NaIF": 0.0254
|
||||
}
|
||||
|
||||
0
src/pg_rad/detector/__init__.py
Normal file
0
src/pg_rad/detector/__init__.py
Normal file
20
src/pg_rad/detector/builder.py
Normal file
20
src/pg_rad/detector/builder.py
Normal file
@ -0,0 +1,20 @@
|
||||
from pg_rad.inputparser.specs import DetectorSpec
|
||||
|
||||
from .detectors import IsotropicDetector, AngularDetector
|
||||
|
||||
|
||||
class DetectorBuilder:
|
||||
def __init__(
|
||||
self,
|
||||
detector_spec: DetectorSpec,
|
||||
):
|
||||
self.detector_spec = detector_spec
|
||||
|
||||
def build(self) -> IsotropicDetector | AngularDetector:
|
||||
if self.detector_spec.is_isotropic:
|
||||
return IsotropicDetector(
|
||||
self.detector_spec.name,
|
||||
self.detector_spec.eff
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Angular detector not supported yet.")
|
||||
38
src/pg_rad/detector/detectors.py
Normal file
38
src/pg_rad/detector/detectors.py
Normal file
@ -0,0 +1,38 @@
|
||||
from abc import ABC
|
||||
|
||||
|
||||
class BaseDetector(ABC):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
eff: float
|
||||
):
|
||||
self.name = name
|
||||
self.eff = eff
|
||||
|
||||
def get_efficiency(self):
|
||||
pass
|
||||
|
||||
|
||||
class IsotropicDetector(BaseDetector):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
eff: float | None = None
|
||||
):
|
||||
super().__init__(name, eff)
|
||||
|
||||
def get_efficiency(self, energy):
|
||||
return self.eff
|
||||
|
||||
|
||||
class AngularDetector(BaseDetector):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
eff: float | None = None
|
||||
):
|
||||
super().__init__(name, eff)
|
||||
|
||||
def get_efficiency(self, angle, energy):
|
||||
pass
|
||||
@ -17,7 +17,8 @@ from .specs import (
|
||||
SourceSpec,
|
||||
AbsolutePointSourceSpec,
|
||||
RelativePointSourceSpec,
|
||||
SimulationSpec,
|
||||
DetectorSpec,
|
||||
SimulationSpec
|
||||
)
|
||||
|
||||
|
||||
@ -31,7 +32,8 @@ class ConfigParser:
|
||||
"acquisition_time",
|
||||
"path",
|
||||
"sources",
|
||||
"options",
|
||||
"detector",
|
||||
"options"
|
||||
}
|
||||
|
||||
def __init__(self, config_source: str):
|
||||
@ -49,6 +51,7 @@ class ConfigParser:
|
||||
options = self._parse_options()
|
||||
path = self._parse_path()
|
||||
sources = self._parse_point_sources()
|
||||
detector = self._parse_detector()
|
||||
|
||||
return SimulationSpec(
|
||||
metadata=metadata,
|
||||
@ -56,6 +59,7 @@ class ConfigParser:
|
||||
options=options,
|
||||
path=path,
|
||||
point_sources=sources,
|
||||
detector=detector
|
||||
)
|
||||
|
||||
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
|
||||
@ -285,6 +289,34 @@ class ConfigParser:
|
||||
|
||||
return specs
|
||||
|
||||
def _parse_detector(self) -> DetectorSpec:
|
||||
det_dict = self.config.get("detector", {})
|
||||
required = {"name", "is_isotropic"}
|
||||
|
||||
missing = required - det_dict.keys()
|
||||
if missing:
|
||||
raise MissingConfigKeyError(missing)
|
||||
|
||||
name = det_dict.get("name")
|
||||
is_isotropic = det_dict.get("is_isotropic")
|
||||
eff = det_dict.get("efficiency")
|
||||
|
||||
default_detectors = defaults.DETECTOR_EFFICIENCIES
|
||||
|
||||
if eff is None and name in default_detectors.keys():
|
||||
eff = default_detectors[name]
|
||||
elif eff is not None:
|
||||
pass
|
||||
else:
|
||||
raise ValueError(
|
||||
f"The detector {name} is not in the library, and no "
|
||||
"efficiency was defined. Either specify detector efficiency "
|
||||
"or choose one from the following list: "
|
||||
f"{default_detectors.keys()}"
|
||||
)
|
||||
|
||||
return DetectorSpec(name=name, eff=eff, is_isotropic=is_isotropic)
|
||||
|
||||
def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
|
||||
unknown = provided - allowed
|
||||
if unknown:
|
||||
|
||||
@ -63,6 +63,13 @@ class RelativePointSourceSpec(SourceSpec):
|
||||
z: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectorSpec:
|
||||
name: str
|
||||
eff: float | None
|
||||
is_isotropic: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimulationSpec:
|
||||
metadata: MetadataSpec
|
||||
@ -70,3 +77,4 @@ class SimulationSpec:
|
||||
options: SimulationOptionsSpec
|
||||
path: PathSpec
|
||||
point_sources: list[SourceSpec]
|
||||
detector: DetectorSpec
|
||||
|
||||
@ -5,6 +5,7 @@ import sys
|
||||
from pandas.errors import ParserError
|
||||
from yaml import YAMLError
|
||||
|
||||
from pg_rad.detector.builder import DetectorBuilder
|
||||
from pg_rad.exceptions.exceptions import (
|
||||
MissingConfigKeyError,
|
||||
OutOfBoundsError,
|
||||
@ -81,9 +82,10 @@ def main():
|
||||
try:
|
||||
cp = ConfigParser(args.config).parse()
|
||||
landscape = LandscapeDirector.build_from_config(cp)
|
||||
|
||||
detector = DetectorBuilder(cp.detector).build()
|
||||
output = SimulationEngine(
|
||||
landscape=landscape,
|
||||
detector=detector,
|
||||
runtime_spec=cp.runtime,
|
||||
sim_spec=cp.options
|
||||
).simulate()
|
||||
|
||||
@ -2,6 +2,9 @@ from typing import Tuple, TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
|
||||
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pg_rad.landscape.landscape import Landscape
|
||||
|
||||
@ -12,6 +15,7 @@ def phi(
|
||||
branching_ratio: float,
|
||||
mu_mass_air: float,
|
||||
air_density: float,
|
||||
eff: float
|
||||
) -> float:
|
||||
"""Compute the contribution of a single point source to the
|
||||
primary photon fluence rate phi at position (x,y,z).
|
||||
@ -34,6 +38,7 @@ def phi(
|
||||
|
||||
phi_r = (
|
||||
activity
|
||||
* eff
|
||||
* branching_ratio
|
||||
* np.exp(-mu_air * r)
|
||||
/ (4 * np.pi * r**2)
|
||||
@ -42,12 +47,20 @@ def phi(
|
||||
return phi_r
|
||||
|
||||
|
||||
def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
|
||||
def calculate_fluence_at(
|
||||
landscape: "Landscape",
|
||||
pos: np.ndarray,
|
||||
detector: IsotropicDetector | AngularDetector,
|
||||
tangent_vectors: np.ndarray,
|
||||
scaling=1E6
|
||||
):
|
||||
"""Compute fluence at an arbitrary position in the landscape.
|
||||
|
||||
Args:
|
||||
landscape (Landscape): The landscape to compute.
|
||||
pos (np.ndarray): (N, 3) array of positions.
|
||||
detector (IsotropicDetector | AngularDetector):
|
||||
Detector object, needed to compute correct efficiency.
|
||||
|
||||
Returns:
|
||||
total_phi (np.ndarray): (N,) array of fluences.
|
||||
@ -56,15 +69,30 @@ def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
|
||||
total_phi = np.zeros(pos.shape[0])
|
||||
|
||||
for source in landscape.point_sources:
|
||||
r = np.linalg.norm(pos - np.array(source.pos), axis=1)
|
||||
source_to_detector = pos - np.array(source.pos)
|
||||
r = np.linalg.norm(source_to_detector, axis=1)
|
||||
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
|
||||
|
||||
if isinstance(detector, AngularDetector):
|
||||
cos_theta = (
|
||||
np.sum(tangent_vectors * source_to_detector, axis=1) / (
|
||||
np.linalg.norm(source_to_detector, axis=1) *
|
||||
np.linalg.norm(tangent_vectors, axis=1)
|
||||
)
|
||||
)
|
||||
cos_theta = np.clip(cos_theta, -1, 1)
|
||||
theta = np.arccos(cos_theta)
|
||||
eff = detector.get_efficiency(theta, energy=source.isotope.E)
|
||||
else:
|
||||
eff = detector.get_efficiency(energy=source.isotope.E)
|
||||
|
||||
phi_source = phi(
|
||||
r=r,
|
||||
activity=source.activity * scaling,
|
||||
branching_ratio=source.isotope.b,
|
||||
mu_mass_air=source.isotope.mu_mass_air,
|
||||
air_density=landscape.air_density
|
||||
air_density=landscape.air_density,
|
||||
eff=eff
|
||||
)
|
||||
|
||||
total_phi += phi_source
|
||||
@ -74,6 +102,7 @@ def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
|
||||
|
||||
def calculate_fluence_along_path(
|
||||
landscape: "Landscape",
|
||||
detector: "IsotropicDetector | AngularDetector",
|
||||
points_per_segment: int = 10
|
||||
) -> Tuple[np.ndarray, np.ndarray]:
|
||||
path = landscape.path
|
||||
@ -98,6 +127,17 @@ def calculate_fluence_along_path(
|
||||
z = np.full(xnew.shape, path.z)
|
||||
full_positions = np.c_[xnew, ynew, z]
|
||||
|
||||
phi_result = calculate_fluence_at(landscape, full_positions)
|
||||
# to compute the angle between sources and the direction of travel, we
|
||||
# compute tangent vectors along the path.
|
||||
dx_ds = np.gradient(xnew, s)
|
||||
dy_ds = np.gradient(ynew, s)
|
||||
tangent_vectors = np.c_[dx_ds, dy_ds, np.zeros_like(dx_ds)]
|
||||
tangent_vectors /= np.linalg.norm(tangent_vectors, axis=1, keepdims=True)
|
||||
|
||||
phi_result = calculate_fluence_at(
|
||||
landscape,
|
||||
full_positions,
|
||||
detector,
|
||||
tangent_vectors)
|
||||
|
||||
return s, phi_result
|
||||
|
||||
@ -6,6 +6,7 @@ from pg_rad.simulator.outputs import (
|
||||
SimulationOutput,
|
||||
SourceOutput
|
||||
)
|
||||
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
|
||||
from pg_rad.physics.fluence import calculate_fluence_along_path
|
||||
from pg_rad.utils.projection import minimal_distance_to_path
|
||||
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
|
||||
@ -16,11 +17,13 @@ class SimulationEngine:
|
||||
def __init__(
|
||||
self,
|
||||
landscape: Landscape,
|
||||
runtime_spec=RuntimeSpec,
|
||||
sim_spec=SimulationOptionsSpec
|
||||
detector: IsotropicDetector | AngularDetector,
|
||||
runtime_spec: RuntimeSpec,
|
||||
sim_spec: SimulationOptionsSpec,
|
||||
):
|
||||
|
||||
self.landscape = landscape
|
||||
self.detector = detector
|
||||
self.runtime_spec = runtime_spec
|
||||
self.sim_spec = sim_spec
|
||||
|
||||
@ -37,7 +40,10 @@ class SimulationEngine:
|
||||
)
|
||||
|
||||
def _calculate_count_rate_along_path(self) -> CountRateOutput:
|
||||
arc_length, phi = calculate_fluence_along_path(self.landscape)
|
||||
arc_length, phi = calculate_fluence_along_path(
|
||||
self.landscape,
|
||||
self.detector
|
||||
)
|
||||
return CountRateOutput(arc_length, phi)
|
||||
|
||||
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:
|
||||
|
||||
@ -7,7 +7,13 @@ from pg_rad.physics import calculate_fluence_at
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def phi_ref(test_landscape):
|
||||
def isotropic_detector():
|
||||
from pg_rad.detector.detectors import IsotropicDetector
|
||||
return IsotropicDetector(name="test_detector", eff=1.0)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def phi_ref(test_landscape, isotropic_detector):
|
||||
source = test_landscape.point_sources[0]
|
||||
|
||||
r = np.linalg.norm(np.array([10, 10, 0]) - np.array(source.pos))
|
||||
@ -17,7 +23,8 @@ def phi_ref(test_landscape):
|
||||
mu_air = source.isotope.mu_mass_air * test_landscape.air_density
|
||||
mu_air *= 0.1
|
||||
|
||||
return A * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
|
||||
eff = isotropic_detector.get_efficiency(source.isotope.E)
|
||||
return A * eff * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -38,6 +45,10 @@ def test_landscape():
|
||||
activity_MBq: 100
|
||||
position: [0, 0, 0]
|
||||
isotope: CS137
|
||||
|
||||
detector:
|
||||
name: dummy
|
||||
is_isotropic: True
|
||||
"""
|
||||
|
||||
cp = ConfigParser(test_yaml).parse()
|
||||
@ -45,9 +56,11 @@ def test_landscape():
|
||||
return landscape
|
||||
|
||||
|
||||
def test_single_source_fluence(phi_ref, test_landscape):
|
||||
def test_single_source_fluence(phi_ref, test_landscape, isotropic_detector):
|
||||
phi = calculate_fluence_at(
|
||||
test_landscape,
|
||||
np.array([10, 10, 0]),
|
||||
isotropic_detector,
|
||||
tangent_vectors=None,
|
||||
)
|
||||
assert pytest.approx(phi[0], rel=1E-3) == phi_ref
|
||||
|
||||
Reference in New Issue
Block a user