Compare commits

111 Commits

Author SHA1 Message Date
09e74051f0 improve naming of integrated count calculation function 2026-03-11 09:51:11 +01:00
7dec54fa1c update plotter to add direction of travel arrow and detector info 2026-03-10 20:45:35 +01:00
938b3a7afc update exceptions and main.py (including test case) 2026-03-10 20:44:45 +01:00
b82196e431 Add flip direction. Change mean to Trapezoidal rule for integration along path. Scale count rate properly with acquisition time 2026-03-10 20:44:18 +01:00
b882f20358 add basic colouring 2026-03-03 21:42:19 +01:00
7e2d6076fd update docs 2026-03-03 21:04:46 +01:00
cdd6d3a8b4 improve logging. update test case for detector. 2026-03-03 21:03:26 +01:00
1c8cc41e3c Improve error handling. Add alignment feature for point sources 2026-03-03 21:01:51 +01:00
7612f74bcb rename eff to efficiency 2026-03-03 20:59:15 +01:00
b69b7455f1 fix isotope typo 2026-03-03 20:58:34 +01:00
c98000dfd8 Add detector architecture + isotropic detectors 2026-03-03 09:48:20 +01:00
41a8ca95b3 remove print statement 2026-03-03 09:32:45 +01:00
bb781ed082 let segmented road generator take specific angles and lengths for segments 2026-03-02 13:00:14 +01:00
8e429fe636 fix interpolator to properly interpolate in 2D space 2026-03-02 12:58:12 +01:00
92789c718f Merge pull request #38 from pim-n/feature-plotter
Config parser, plotting, simulation engine
2026-03-02 11:42:07 +01:00
176baa543a flake8 + fix bug in seed type checking 2026-03-02 08:51:02 +01:00
dba1240e9f improve error handling and PEP8 2026-03-02 08:38:18 +01:00
9a169da520 update tests to follow new architecture 2026-02-25 15:05:22 +01:00
5c9841190f update main.py --test case to conform to new architecture. Update error handling in --config case. 2026-02-25 14:37:35 +01:00
561fb1dca1 update SegmentedRoadGenerator 2026-02-25 14:31:42 +01:00
39572da682 improve landscape architecture. builder is separate file. cleaned up hardcoded defaults 2026-02-25 14:26:49 +01:00
a74ea765d7 move ConfigParser to inputparser module and introduce config specs classes for structured config handling 2026-02-25 14:25:32 +01:00
58de830a39 add default values to central location 2026-02-25 14:22:46 +01:00
ae0a038948 add SimulationEngine and SimulationOutputs to compute results and pass standardized objects on to plotter 2026-02-25 14:21:59 +01:00
9944c06466 update plotting 2026-02-25 14:21:03 +01:00
5615914c7e update projection utlity functions 2026-02-25 14:20:11 +01:00
e926338b69 add InvalidIsotopeError and DimensionError 2026-02-25 14:17:14 +01:00
bab41c128b vectorize fluence calculation 2026-02-25 14:13:27 +01:00
c18f924348 add init for road_gen package 2026-02-20 12:02:28 +01:00
f1bed93ca7 add integrator for road generation 2026-02-20 12:01:33 +01:00
3a92f79a43 fix dataloader exception handling 2026-02-20 12:01:12 +01:00
80f7b71c38 Integrate modules from road-gen that are needed for segmented road generation 2026-02-20 11:59:09 +01:00
61dc05073a improve plotting visuals for path 2026-02-20 11:47:57 +01:00
cca514a2ba add MissingSubKeyError for config loading. update main.py for --config flag 2026-02-20 11:47:38 +01:00
265d3b0111 Add isotope lookup dictionary, so isotopes can be loaded from string in config. 2026-02-20 11:46:45 +01:00
8f652875dc update LandscapeDirector to be able to build from config. 2026-02-20 11:45:21 +01:00
347ff4c102 Add badges for Python version and CI tests
Added badges for Python version and CI tests to README.
2026-02-20 11:44:54 +01:00
5fc806bd39 Add function to go from relative source position to absolute position. 2026-02-20 11:43:13 +01:00
0611b943a8 Merge pull request #35 from pim-n/33-add-ci-workflow-for-running-tests
33 add ci workflow for running tests
2026-02-20 11:42:15 +01:00
d53f7c5e2f Move fluence calcs to physics from landscape. Update LandScapeBuilder to accommodate config and segments 2026-02-20 11:40:36 +01:00
fdc11b4076 Add landscape config parser 2026-02-20 11:39:20 +01:00
2d5ba0ac3c Updated config. Third test 2026-02-20 11:34:21 +01:00
81d2cd6f9e Fix indentation for checkout action in CI workflow 2026-02-20 11:30:52 +01:00
d32fd411bb Updated config. Second test 2026-02-20 11:25:51 +01:00
191c92e176 Merge branch '33-add-ci-workflow-for-running-tests' of github.com:pim-n/pg-rad into 33-add-ci-workflow-for-running-tests
Fetched changes from github. Fast-forward changes
2026-02-20 11:17:55 +01:00
8c0eeb3127 Updated config file for ci. Added dispatch 2026-02-20 11:17:36 +01:00
db86ee42b2 Updated config file for ci. Added dispatch 2026-02-20 11:16:37 +01:00
79ba5f7c83 Fix event name for pull request trigger 2026-02-20 11:00:22 +01:00
66c0b0c6cf Added ci for tests. First testing 2026-02-20 10:57:28 +01:00
c2b05c63a8 change intra-package import statements to have absolute path. this to avoid circular importing. the imports specified in __init__ of each module are only intended to be used outside of src (e.g. tests, API usage). 2026-02-17 10:11:03 +01:00
5684525d0f add plotting to test case. Add saveplot argument. Improve logging in LandscapeSlicePlotter 2026-02-17 09:46:30 +01:00
3fd2eafb2a add save and show options to Plotter 2026-02-17 09:41:54 +01:00
f49b2a5f8a Merge pull request #19 from pim-n/feature-fluence-rate
- Added mass attenuation interpolator
- PG fluence rate function that can be called at any position in the landscape
- Rewrote landscape module to pseudo-builder pattern (LandscapeBuilder & LandscapeConstructor)
- Added entry point (CLI)
- Verified installation process and updated instructions accordingly
- Updated tests
- various bugfixes
- various minor improvements
2026-02-13 16:18:52 +01:00
508cad474b Add scipy to requirements (due to interpolation of attenuation coeffs) 2026-02-13 16:13:55 +01:00
beb411d456 update tests to reflect new Landscape design 2026-02-13 16:07:42 +01:00
0db1fe0626 manually remove detector imports (not implemented yet) 2026-02-13 16:01:07 +01:00
845f006cd3 Update pyproject.toml and README.md to reflect new conditions. 2026-02-13 15:02:53 +01:00
ac8c38592d Add CLI entry point with --test flag for building test landscape using LandscapeDirector 2026-02-13 14:58:11 +01:00
a4fb4a7c57 Add LandscapeDirector with a test case to build landscape using LandscapeBuilder 2026-02-13 14:57:37 +01:00
e8bf687563 add default box height to ensure non-zero z dimension in the landscape 2026-02-13 14:56:27 +01:00
49a0dcd301 Improve isotopes and sources 2026-02-13 14:55:42 +01:00
26f96b06fe code file paths/names in config folder for more centralized definition of filenames 2026-02-13 14:50:02 +01:00
55258d7727 rename pg_rad logging->logger to avoid import conflict with default logging library. 2026-02-13 14:46:51 +01:00
82331f3bbd add default value for object position (0,0,0) 2026-02-12 16:48:16 +01:00
abc1195c91 Rewrite preset isotope 2026-02-12 14:45:24 +01:00
a95cca26d9 Move landscape construction to LandscapeBuilder object 2026-02-12 14:43:41 +01:00
8274b5e371 add size of path as attribute to Path 2026-02-12 14:41:05 +01:00
4f72fe8ff4 Add OutOfBoundsError 2026-02-12 14:02:13 +01:00
9b0c77d254 Merge branch 'feature-fluence-rate' of github.com:pim-n/pg-rad into feature-fluence-rate 2026-02-12 13:33:36 +01:00
225287c46a Merge pull request #27 from pim-n/dev
pull latest dev
2026-02-12 13:32:59 +01:00
6ceffb4361 Move plotting functionality out of Landscape to LandscapeSlicePlotter 2026-02-12 09:28:37 +01:00
08299724e1 Merge pull request #20 from pim-n/remove-path-simplify
Remove path simplify
2026-02-10 14:15:10 +01:00
3aff764075 update __init__.py to remove simplify_path 2026-02-10 14:11:12 +01:00
05a71c31a8 remove piecewise_regression from requirements 2026-02-10 14:08:57 +01:00
f5cc5218e6 remove simplify_path functionality 2026-02-10 14:08:40 +01:00
3f7395ed70 add fluence function to landscape and update to new position system 2026-02-10 13:53:57 +01:00
d9e3f2a209 remove simplify path tests 2026-02-10 13:49:09 +01:00
0971c2bab9 fix typo in unit conversion 2026-02-10 11:28:38 +01:00
9c1b97d912 Update sources and objects to new position system 2026-02-10 11:24:40 +01:00
a1acf95004 update sources tests to accomodate new pos attribute 2026-02-09 15:36:23 +01:00
016ea6b783 add tests for fluence rate and attenuation interpolation 2026-02-09 15:35:22 +01:00
2b85a07aa0 add attenuation table 2026-02-09 15:34:05 +01:00
d6d9fa6f92 update init for isotopes module 2026-02-09 15:32:49 +01:00
521e5a556e Add isotope presets file 2026-02-09 15:32:19 +01:00
0c81b4df89 Add mu_mass_air to isotope automatically calculated from energy and fix error msg 2026-02-09 15:31:53 +01:00
2c436c52ad generate init for physics module 2026-02-09 15:30:27 +01:00
d4b67be775 Add primary photon fluence at distance R from point source 2026-02-09 15:29:46 +01:00
c2ddc5bfe2 Add attenuation interpolator 2026-02-09 15:29:30 +01:00
08a056a32d fix Object -> BaseOject reference 2026-02-05 14:37:37 +01:00
f37db46037 Merge pull request #16 from pim-n/fix-pep8
Improve PEP8 adherance
2026-02-05 14:21:31 +01:00
c23ea40ec6 Improve PEP8 adherance using flake8 linter 2026-02-05 14:19:49 +01:00
dcc3be1c22 Merge pull request #14 from pim-n/fix-object-name
rename Object to BaseObject. Addresses #8
2026-02-02 11:38:04 +01:00
52b2eaaeb5 rename Object to BaseObject 2026-02-02 11:33:20 +01:00
ead96eb723 Merge pull request #13 from pim-n/fix-imports-and-refactor
Fix imports and refactor
2026-01-31 10:14:39 +01:00
f5a126b927 bump version 2026-01-31 10:10:14 +01:00
c1b827c871 ignore dev tools folder 2026-01-31 10:06:12 +01:00
85f80ace97 resolve circular imports 2026-01-31 10:01:54 +01:00
a4e965c9d6 Add init files for modularization, generated using mkinit (now a dependency on dev) 2026-01-31 09:44:18 +01:00
15b7e7e65e refactor code into modules 2026-01-31 09:42:21 +01:00
db6f859a60 improving logging setup 2026-01-28 15:54:34 +01:00
14e49e63aa fix to make mathjax work in notebook page 2026-01-28 15:07:58 +01:00
2551f854d6 Merge branch 'dev' of github.com:pim-n/pg-rad into dev 2026-01-28 13:50:35 +01:00
caec70b39b move test_objects.py to test_sources.py and update to reflect refactor 2026-01-28 13:50:16 +01:00
21ea25a3d8 fix typo in README.md 2026-01-28 13:40:30 +01:00
d7c670d344 Update README.md 2026-01-28 13:39:38 +01:00
85f306a469 Merge branch 'dev' of github.com:pim-n/pg-rad into dev 2026-01-28 13:33:12 +01:00
c459b732bb update demo notebook 2026-01-28 13:32:25 +01:00
4632fe35c9 add mathjax javascript code 2026-01-28 13:24:23 +01:00
7cfbbb8792 move demo within docs 2026-01-28 13:22:42 +01:00
7290e241a2 Update ci-docs.yml 2026-01-28 12:34:53 +01:00
38318ad822 Merge pull request #2 from pim-n/main
to dev
2026-01-28 12:33:32 +01:00
85 changed files with 3525 additions and 840 deletions

View File

@ -17,7 +17,7 @@ jobs:
git config user.email 41898282+github-actions[bot]@users.noreply.github.com git config user.email 41898282+github-actions[bot]@users.noreply.github.com
- uses: actions/setup-python@v5 - uses: actions/setup-python@v5
with: with:
python-version: 3.x python-version: 3.12.9
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV - run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
- uses: actions/cache@v4 - uses: actions/cache@v4
with: with:

33
.github/workflows/ci-tests.yml vendored Normal file
View File

@ -0,0 +1,33 @@
name: Tests
on:
pull_request:
branches: ["main", "dev"]
workflow_dispatch:
jobs:
tests:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest, windows-latest ]
python-version: [ '3.12' ]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev]
- name: Run linting
run: |
flake8 ./src/
- name: Run tests
run: |
pytest

5
.gitignore vendored
View File

@ -1,3 +1,6 @@
# Custom
dev-tools/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[codz] *.py[codz]
@ -173,7 +176,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ .idea/
# Abstra # Abstra
# Abstra is an AI-powered process automation framework. # Abstra is an AI-powered process automation framework.

View File

@ -1,3 +1,5 @@
[![Python 3.12](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/release/python-312/)
[![Tests](https://github.com/pim-n/pg-rad/actions/workflows/ci-tests.yml/badge.svg)](https://github.com/pim-n/pg-rad/actions/workflows/ci-tests.yml)
# pg-rad # pg-rad
Primary Gamma RADiation landscape - Development Primary Gamma RADiation landscape - Development
@ -23,13 +25,35 @@ With Python verion `>=3.12.4` and `<3.13`, create a virtual environment and inst
``` ```
python3 -m venv .venv python3 -m venv .venv
source .venv/bin/activate source .venv/bin/activate
(venv) pip install -e .[dev] ```
With the virtual environment activated, run:
```
pip install -e .[dev]
```
## Running example landscape
The example landscape can be generated using the command-line interface. Still in the virtual environment, run
```
pgrad --test --loglevel DEBUG
``` ```
## Tests ## Tests
Tests can be run with `pytest` from the root directory of the repository. Tests can be run with `pytest` from the root directory of the repository. With the virtual environment activated, run:
``` ```
(venv) pytest pytest
``` ```
## Local viewing of documentation
PG-RAD uses [Material for MkDocs](https://squidfunk.github.io/mkdocs-material/) for generating documentation. It can be locally viewed by (in the venv) running:
```
mkdocs serve
```
where you can add the `--livereload` flag to automatically update the documentation as you write to the Markdown files.

File diff suppressed because one or more lines are too long

View File

@ -1,4 +0,0 @@
---
title: pg_rad.landscape.create_landscape_from_path
---
::: pg_rad.landscape.create_landscape_from_path

View File

@ -1,4 +0,0 @@
---
title: pg_rad.landscape.Landscape
---
::: pg_rad.landscape.Landscape

View File

@ -1,5 +0,0 @@
---
title: pg_rad.objects.Object
---
::: pg_rad.objects.Object

View File

@ -1,4 +0,0 @@
---
title: pg_rad.path.Path
---
::: pg_rad.path.Path

View File

@ -1,5 +0,0 @@
---
title: pg_rad.path.path_from_RT90
---
::: pg_rad.path.path_from_RT90

View File

@ -1,4 +0,0 @@
---
title: pg_rad.path.simplify_path
---
::: pg_rad.path.simplify_path

View File

@ -1,5 +0,0 @@
---
title: pg_rad.sources.PointSource
---
::: pg_rad.sources.PointSource

218
docs/config-spec.md Normal file
View File

@ -0,0 +1,218 @@
!!! note
To get started quickly, you may copy and modify one of the example configs found [here](quickstart.md#example-configs).
The config file must be a [YAML](https://yaml.org/) file. YAML is a serialization language that works with key-value pairs, but in a syntax more readable than some other alternatives. In YAML, the indentation matters. I
The remainder of this chapter will explain the different required and optionals keys, what they represent, and allowed values.
## Required keys
### Simulation options
The first step is to name the simulation, and define the speed of the vehicle (assumed constant) and acquisition time.
#### Landscape name
The name is a string, which may include spaces, numbers and special characters.
Examples:
```yaml
name: test_landscape
```
```yaml
name: Test Landscape 1
```
#### Acquisition time
The acquisition time of the detector in seconds.
Example:
```yaml
acquisition_time: 1
```
!!! note
All units in the config file must be specified in SI units, e.g. meters and seconds, unless the key contains a unit itself (e.g. `activity_MBq` means activity in MegaBequerels).
#### Vehicle speed
The speed of the vehicle in m/s. Currently, the vehicle speed must be assumed constant. An example could be
```yaml
speed: 13.89 # this is approximately 50 km/h
```
!!! note
The text after the `#` signifies a comment. PG-RAD will ignore this, but it can be helpful for yourself to write notes.
### Path
The `path` keyword is used to create a path for the detector to travel along. There are two ways to specify a path; from experimental data or by specifying a procedural path.
#### Path - Experimental data
Currently the only supported coordinate format is the RT90 (East, North) coordinate system. If you have experimental data in CSV format with columns for these coordinates, then you can load that path into PG-RAD as follows:
```yaml
path:
file: path/to/experimental_data.csv
east_col_name: East
north_col_name: North
```
#### Path - Procedural path
Alternatively, you can let PG-RAD generate a path for you. A procedural path can be specified with at least two subkeys: `length` and `segments`.
Currently supported segments are: `straight`, `turn_left` and `turn_right`, and are provided in a list under the `segments` subkey as follows:
```yaml
path:
segments:
- straight
- turn_left
- straight
```
The length must also be specified, using the `length` subkey. `length` can be specified in two ways: a list with the same length as the `segments` list
```yaml
path:
segments:
- straight
- turn_left
- straight
length:
- 500
- 250
- 500
```
which will assign that length (meters) to each segment. Alternatively, a single number can be passed:
```yaml
path:
segments:
- straight
- turn_left
- straight
length: 1250
```
Setting the length for the total path will cause PG-RAD to *randomly assign* portions of the total length to each segment.
Finally, there is also an option to specify the turn angle in degrees:
```yaml
path:
segments:
- straight
- turn_left: 90
- straight
length: 1250
```
Like with the lengths, if a turn segment has no angle specified, a random one (within pre-defined limits) will be taken.
!!! warning
Letting PG-RAD randomly assign lengths and angles can cause (expected) issues. That is because of physics restrictions. If the combination of length, angle (radius) and velocity of the vehicle is such that the centrifugal force makes it impossible to take this turn, PG-RAD will raise an error. To fix it, you can 1) reduce the speed; 2) define a smaller angle for the turn; or 3) assign more length to the turn segment.
!!! info
For more information about how procedural roads are generated, including the random sampling of lengths and angles, see X
### Sources
Currently, the only type of source supported is a point source. Point sources can be added under the `sources` key, where the **subkey is the name** of the source:
```yaml
sources:
my_source: ...
```
the source name should not contain spaces or special characters other than `_` or `-`. There are three required subkeys under `sources.my_source`, which are: `activity_MBq`, `isotope` and `position`.
#### Source activity
The source activity is in MegaBequerels and must be a strictly positive number:
```yaml
sources:
my_source:
activity_MBq: 100
```
#### Source isotope
The isotope for the point source. This must be a string, following the naming convention of the symbol followed by the number of nucleons, e.g. `Cs137`:
```yaml
sources:
my_source:
activity_MBq: 100
isotope: Cs137
```
!!! info
Currently the following isotopes are supported: `Cs137`
#### Source position
There are two ways to specify the source position. Either with absolute (x,y,z) coordinates
```yaml
sources:
my_source:
activity_MBq: 100
isotope: Cs137
position: [0, 0, 0]
```
or relative to the path, using the subkeys `along_path`, `dist_from_path` and `side`
```yaml
sources:
my_source:
activity_MBq: 100
isotope: Cs137
position:
along_path: 100
dist_from_path: 50
side: left
```
Note that side is relative to the direction of travel. The path will by default start at (x,y) = (0,0) and initial heading is parallel to the x-axis.
### Detector
The final required key is the `detector`. Currently, only isotropic detectors are supported. Nonetheless, you must specify it with `name`, `is_isotropic` and `efficiency`:
```yaml
detector:
name: test
is_isotropic: True
efficiency: 0.02
```
Note there are some existing detectors available, where efficiency is not required and will be looked up by PG-RAD itself:
```yaml
detector:
name: NaIR
is_isotropic: True
```
## Optional keys
The following subkeys are optional and should be put under the `options` key.
```yaml
options:
air_density_kg_per_m3: 1.243
seed: 1234
```

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,118 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "1a063d05",
"metadata": {},
"source": [
"# Pseudo-ramdom procedural roads\n",
"\n",
"Suppose one wishes to describe a road between A and B in terms of segments\n",
"\n",
"$$\n",
"\\text{straight, turn left, straight, turn right, straight}\n",
"$$\n",
"\n",
"Let's see how we can create a random road of length $L$ from a pre-determined set of prefabs.\n",
"\n",
"#### Random apportionment of total length\n",
"\n",
"Suppose we want to build a road of length $L$ out of $K$ segments. The total number of waypoints $N$ depends on the step size $\\Delta s$:\n",
"\n",
"$$\n",
"N = \\frac{L}{\\Delta s}.\n",
"$$\n",
"\n",
"Let $\\left( p_1, p_2, \\dots, p_K \\right)$ represent the proportion of $N$ that each prefab will be assigned, where $\\sum p_i = 1$. One useful distribution here is the [Dirichlet distribution](https://en.wikipedia.org/wiki/Dirichlet_distribution), which is parametrized by a vector $\\mathbf{\\alpha} = \\left(\\alpha_1, \\alpha_2, \\dots, \\alpha_K \\right)$. The special case where all $\\alpha_i$, the scalar parameter $\\alpha$ is called a *concentration parameter*. Setting the same $\\alpha$ across the entire parameter space makes the distribution symmetric, meaning no prior assumptions are made regarding the proportion of $N$ that will be assigned to each segment. $\\alpha = 1$ leads to what is known as a flat Dirichlet distribution, whereas higher values lead to more dense and evenly distributed $\\left( p_1, p_2, \\dots, p_K \\right)$. On the other hand, keeping $\\alpha \\leq 1$ gives a sparser distribution which can lead to larger variance in apportioned number of waypoints to $\\left( p_1, p_2, \\dots, p_K \\right)$.\n",
"\n",
"#### Expectation value and variance of Dirichlet distribution\n",
"\n",
"Suppose we draw our samples for proportion of length from the Dirichlet distribution\n",
"\n",
"$$\n",
"(p_1, p_2, \\ldots, p_K) \\sim \\text{Dirichlet}(\\alpha, \\alpha, \\ldots, \\alpha)\n",
"$$\n",
"\n",
"with $\\alpha _{0}=\\sum _{i=1}^{K}\\alpha _{i}$, the mean and variance are then\n",
"\n",
"$$\n",
"\\operatorname {E} [p_{i}]={\\frac {\\alpha _{i}}{\\alpha _{0}}}, \\; \\operatorname {Var} [p_{i}]={\\frac {\\alpha _{i}(\\alpha _{0}-\\alpha _{i})}{\\alpha _{0}^{2}(\\alpha _{0}+1)}}.\n",
"$$\n",
"\n",
"If $\\alpha$ is a scalar, then $\\alpha _{0}= K \\alpha$ and the above simplifies to\n",
"\n",
"$$\n",
"\\operatorname {E} [p_{i}]={\\frac {\\alpha}{K \\alpha}}={\\frac {1}{K}}, \\; \\operatorname {Var} [p_{i}]={\\frac {\\alpha(K \\alpha -\\alpha)}{(K \\alpha)^{2}(K \\alpha +1)}}.\n",
"$$\n",
"\n",
"We see that $\\operatorname {Var} [p_{i}] \\propto \\frac{1}{\\alpha}$ meaning that the variance reduces with increasing $\\alpha$. We can simply scale the proportions\n",
"\n",
"$$\n",
"(N \\cdot p_1, N \\cdot p_2, \\ldots, N \\cdot p_K)\n",
"$$\n",
"\n",
"to get the randomly assigned number of waypoints for each prefab. We now have a distribution which can give randomly assigned lengths to a given list of prefabs, with a parameter to control the degree of randomness. With a large concentration parameter $\\alpha$, the distribution of lengths will be more uniform, with each prefab getting $N \\cdot \\operatorname {E} [p_{i}]={\\frac {N}{K}}$ waypoints assigned to it. Likewise, keeping $\\alpha$ low increases variance and allows for a more random assignment of proportions of waypoints to each prefab segment.\n",
"\n",
"#### Random angles\n",
"\n",
"Suppose a turn of a pre-defined arc length $l$ made of $N/K$ waypoints. If one wants to create a random angle, one has to keep in mind that the minimum radius $R_{min}$ depends on the speed of the vehicle and the weather conditions:\n",
"\n",
"$$\n",
"R_{\\text{min,vehicle}} = \\frac{v^2}{g\\mu},\n",
"$$\n",
"\n",
"where\n",
"- $v$ is the velocity of the vehicle in $\\text{m/s}$,\n",
"- $g$ is the gravitational acceleration (about $9.8$ $\\text{m/s}^{2}$), and\n",
"- $\\mu$ is the friction coefficient (about $0.7$ for dry asphalt).\n",
"\n",
"A regular turn (not a U-turn or roundabout) should also have an lower and upper limit on the angle, say, 30 degrees to 90 degrees for a conservative estimate. In terms of radii, it becomes\n",
"\n",
"$$\n",
"R_{\\text{min}} = \\max\\left(R_{\\text{min,vehicle}}, \\frac{l}{\\pi/2}\\right)\n",
"$$\n",
"\n",
"and\n",
"\n",
"$$\n",
"R_{\\text{max}} = \\frac{l}{\\pi/6}.\n",
"$$\n",
"\n",
"We then sample\n",
"\n",
"$$\n",
"R \\sim \\text{Uniform}\\left(R_{\\text{min}}, R_{\\text{max\\_angle}}\\right)\n",
"$$\n",
"\n",
"and obtain a random radius for a turn of arc length $l$ with limits to ensure the radius is large enough given the velocity of the vehicle. Finally, the curvature profile is related to the radius by\n",
"\n",
"$$\n",
"\\kappa = \\frac{1}{R}\n",
"$$\n",
"\n",
"which means that the curvature profile of a turn is simply a vector $\\mathbf{\\kappa} = (1/R, \\dots, 1/R)$ with a length of $N/K$ waypoints."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -2,29 +2,17 @@
Primary Gamma RADiation Landscapes (PG-RAD) is a Python package for research in source localization. It can simulate mobile gamma spectrometry data acquired from vehicle-borne detectors along a predefined path (e.g. a road). Primary Gamma RADiation Landscapes (PG-RAD) is a Python package for research in source localization. It can simulate mobile gamma spectrometry data acquired from vehicle-borne detectors along a predefined path (e.g. a road).
## Requirements ## About
PG-RAD requires Python `3.12`. The guides below assume a unix-like system. This software has been developed as part of dissertation work for the degree of master of Computational Science and Physics at Lund University, Sweden. The work has been done at the department of Medical Radiation Physics (MSF), Faculty of Medicine. The radiological emergency preparedness research group of MSF is assigned by the Swedish Radiation Safety Authority (SSM) to aid in preparation for effective mitigation of radiological or nuclear disasters on Swedish soil.
## Installation (CLI) ## Value proposition
<!--pipx seems like a possible option to install python package in a contained environment on unix--> PG-RAD is a toolbox that allows for simulation of detector response for a wide variety of source localization scenarios. The strength of the software lies in its simple and minimal configuration and user input, while its flexibility allows for reconstruction of specific scenarios with relative ease. PG-RAD is also general enough that novel methods such as UAV-borne detectors can be simulated and evaluated.
Lorem ipsum User input takes the form of an input file (YAML), describing the path, detector and source(s), and optional parameters. The output of the program is visualizations of the world (the path and sources), as well as the detector count rate as a function of distance travelled along the path.
## Installation (Python module) Users can provide experimental / geographical coordinates representing real roads. Alternatively, users can let PG-RAD generate a procedural road, where the user can easily control what that road should look like. The user can specify a single point source, several point sources, as well as a field of radioactive material covering a large area.
If you are interested in using PG-RAD in another Python project, create a virtual environment first:
```
python3 -m venv .venv
```
Then install PG-RAD in it:
```
source .venv/bin/activate
(.venv) pip install git+https://github.com/pim-n/pg-rad
``` ```
See how to get started with PG-RAD with your own Python code [here](pg-rad-in-python). See how to get started with PG-RAD with your own Python code [here](pg-rad-in-python).

47
docs/installation.md Normal file
View File

@ -0,0 +1,47 @@
## Requirements
PG-RAD requires Python `>=3.12.4` and `<3.13`. It has been tested on `3.12.9`. The guides below assume a unix-like system. You can check the Python version you have installed as follows:
```
python --version
```
If you don't have the right version installed there are various ways to get a compatible version, such as [pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation).
## Installation (CLI)
<!--pipx seems like a possible option to install python package in a contained environment on unix-->
Lorem ipsum
## Installation (Python module)
If you are interested in using PG-RAD in another Python project, create a virtual environment first:
```
python -m venv .venv
```
Then install PG-RAD in it:
```
source .venv/bin/activate
(.venv) pip install git+https://github.com/pim-n/pg-rad
```
See how to get started with PG-RAD with your own Python code [here](pg-rad-in-python).
## For developers
```
git clone https://github.com/pim-n/pg-rad
cd pg-rad
git checkout dev
```
or
```
git@github.com:pim-n/pg-rad.git
cd pg-rad
git checkout dev
```

View File

@ -0,0 +1,18 @@
window.MathJax = {
tex: {
inlineMath: [['$', '$'], ["\\(", "\\)"]],
displayMath: [['$$', '$$'], ["\\[", "\\]"]],
processEscapes: true,
processEnvironments: true
},
options: {
processHtmlClass: "arithmatex"
}
};
document$.subscribe(() => {
MathJax.startup.output.clearCache()
MathJax.typesetClear()
MathJax.texReset()
MathJax.typesetPromise()
})

View File

@ -1,4 +0,0 @@
---
title: Using PG-RAD in CLI
---
Lorem ipsum.

View File

@ -0,0 +1,35 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "5e30f59a",
"metadata": {},
"source": [
"# The design of PG-RAD\n",
"\n",
"This discusses the overall design of the code by using an interactive notebook demo."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -1,5 +0,0 @@
---
title: Using PG-RAD as a module
---
Consult the API documentation in the side bar.

187
docs/quickstart.md Normal file
View File

@ -0,0 +1,187 @@
## Installation
See the [installation guide](installation.md).
## Test your installation
First, see if PG-RAD is available on your system by typing
```
pgrad --help
```
You should get output along the lines of
```
usage: pg-rad [-h] ...
Primary Gamma RADiation landscape tool
...
```
If you get something like `pgrad: command not found`, please consult the [installation guide](installation.md).
You can run a quick test scenario as follows:
```
pgrad --test
```
This should produce a plot of a scenario containing a single point source and a path.
## Running PG-RAD
In order to use the CLI for your own simulations, you need to provide a *config file*. To run with your config, run
```
pgrad --config path/to/my_config.yml
```
where `path/to/my_config.yml` points to your config file.
## Example configs
The easiest way is to take one of these example configs, and adjust them as needed. Alternatively, there is a detailed guide on how to write your own config file [here](config-spec.md).
=== "Example 1"
The position can be defined relative to the path. `along_path` means at what distance traveled along the path the source is found. If the path is 200 meters long and `along_path` is `100` then the source is halfway along the path. `dist_from_path` is the distance in meters from the path. `side` is the side of the path the source is located. This is relative to the direction the path is traveled.
``` yaml
name: Example 1
speed: 13.89
acquisition_time: 1
path:
file: path/to/exp_coords.csv
east_col_name: East
north_col_name: North
sources:
source1:
activity_MBq: 1000
isotope: CS137
position:
along_path: 100
dist_from_path: 50
side: left
detector:
name: dummy
is_isotropic: True
```
=== "Example 2"
The position can also just be defined with (x,y,z) coordinates.
``` yaml
name: Example 2
speed: 13.89
acquisition_time: 1
path:
file: path/to/exp_coords.csv
east_col_name: East
north_col_name: North
sources:
source1:
activity_MBq: 1000
isotope: CS137
position: [104.3, 32.5, 0]
source2:
activity_MBq: 100
isotope: CS137
position: [0, 0, 0]
detector:
name: dummy
is_isotropic: True
```
=== "Example 3"
This is an example of a procedural path with random apportionment of total length and random angles being assigned to turns. The parameter `alpha` is optional, and is related to randomness. A higher value leads to more uniform apportionment of lengths and a lower value to more random apportionment. More information about `alpha` can be found [here](pg-rad-config-spec.md).
``` yaml
name: Example 3
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
- turn_left
- straight
alpha: 100
sources:
source1:
activity_MBq: 1000
isotope: CS137
position: [0, 0, 0]
detector:
name: dummy
is_isotropic: True
```
=== "Example 4"
This is an example of a procedural path that is partially specified. Note that turn_left now is a key for the corresponding angle of 45 degrees. The length is still divided randomly
``` yaml
name: Example 4
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
- turn_left: 45
- straight
sources:
source1:
activity_MBq: 1000
isotope: CS137
position: [0, 0, 0]
detector:
name: dummy
is_isotropic: True
```
=== "Example 5"
This is an example of a procedural path that is fully specified. See how length is now a list matching the length of the segments.
``` yaml
name: Example 5
speed: 8.33
acquisition_time: 1
path:
length:
- 400
- 200
- 400
segments:
- straight
- turn_left: 45
- straight
sources:
source1:
activity_MBq: 1000
isotope: CS137
position: [0, 0, 0]
detector:
name: dummy
is_isotropic: True
```

View File

@ -28,8 +28,21 @@ markdown_extensions:
- pymdownx.inlinehilite - pymdownx.inlinehilite
- pymdownx.snippets - pymdownx.snippets
- pymdownx.superfences - pymdownx.superfences
- pymdownx.arithmatex:
generic: true
- admonition
- pymdownx.details
- pymdownx.tabbed:
alternate_style: true
combine_header_slug: true
extra_javascript:
- javascripts/mathjax.js
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js
plugins: plugins:
- mkdocs-jupyter:
execute: false
- mkdocstrings: - mkdocstrings:
enabled: !ENV [ENABLE_MKDOCSTRINGS, true] enabled: !ENV [ENABLE_MKDOCSTRINGS, true]
default_handler: python default_handler: python
@ -39,3 +52,12 @@ plugins:
options: options:
show_source: false show_source: false
show_root_heading: false show_root_heading: false
nav:
- Home: index.md
- Installation Guide: installation.md
- Quickstart Guide: quickstart.md
- 'Tutorial: Writing a Config File': config-spec.md
- Explainers:
- explainers/planar_curve.ipynb
- explainers/prefab_roads.ipynb

View File

@ -5,9 +5,13 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
[tool.setuptools.package-data]
"pg_rad.data" = ["*.csv"]
"pg_rad.configs" = ["*.yml"]
[project] [project]
name = "pg-rad" name = "pg-rad"
version = "0.2.0" version = "0.2.1"
authors = [ authors = [
{ name="Pim Nelissen", email="pi0274ne-s@student.lu.se" }, { name="Pim Nelissen", email="pi0274ne-s@student.lu.se" },
] ]
@ -18,15 +22,19 @@ dependencies = [
"matplotlib>=3.9.2", "matplotlib>=3.9.2",
"numpy>=2", "numpy>=2",
"pandas>=2.3.1", "pandas>=2.3.1",
"piecewise_regression==1.5.0", "pyyaml>=6.0.2",
"pyyaml>=6.0.2" "scipy>=1.15.0"
] ]
license = "MIT" license = "MIT"
license-files = ["LICEN[CS]E*"] license-files = ["LICEN[CS]E*"]
[project.scripts]
pgrad = "pg_rad.main:main"
[project.urls] [project.urls]
Homepage = "https://github.com/pim-n/pg-rad" Homepage = "https://github.com/pim-n/pg-rad"
Issues = "https://github.com/pim-n/pg-rad/issues" Issues = "https://github.com/pim-n/pg-rad/issues"
[project.optional-dependencies] [project.optional-dependencies]
dev = ["pytest", "notebook", "mkdocs-material", "mkdocstrings-python"]
dev = ["pytest", "mkinit", "notebook", "mkdocs-material", "mkdocstrings-python", "mkdocs-jupyter", "flake8"]

View File

@ -0,0 +1 @@
__all__ = []

View File

@ -0,0 +1,25 @@
# --- Physics defaults ---
DEFAULT_AIR_DENSITY = 1.243 # kg/m^3, Skåne
# --- Simulation defaults ---
DEFAULT_SEED = None
DEFAULT_ACQUISITION_TIME = 1.0
# --- Source defaults ---
DEFAULT_PATH_HEIGHT = 0.0
DEFAULT_SOURCE_HEIGHT = 0.0
# --- Segmented road defaults ---
DEFAULT_MIN_TURN_ANGLE = 30.
DEFAULT_MAX_TURN_ANGLE = 90.
DEFAULT_FRICTION_COEFF = 0.7 # dry asphalt
DEFAULT_GRAVITATIONAL_ACC = 9.81 # m/s^2
DEFAULT_ALPHA = 100.
# --- Detector efficiencies ---
DETECTOR_EFFICIENCIES = {
"dummy": 1.0,
"NaIR": 0.0216,
"NaIF": 0.0254
}

View File

@ -0,0 +1,3 @@
ATTENUATION_TABLE = 'attenuation_table.csv'
TEST_EXP_DATA = 'test_path_coords.csv'
LOGGING_CONFIG = 'logging.yml'

View File

@ -3,10 +3,13 @@ disable_existing_loggers: false
formatters: formatters:
simple: simple:
format: '%(asctime)s - %(levelname)s: %(message)s' format: '%(asctime)s - %(levelname)s: %(message)s'
colored:
'()': pg_rad.logger.logger.ColorFormatter
format: '%(asctime)s - %(levelname)s: %(message)s'
handlers: handlers:
stdout: stdout:
class: logging.StreamHandler class: logging.StreamHandler
formatter: simple formatter: colored
stream: ext://sys.stdout stream: ext://sys.stdout
loggers: loggers:
root: root:

View File

@ -0,0 +1 @@
__all__ = []

View File

@ -0,0 +1,39 @@
energy_mev,mu
1.00000E-03,3.606E+03
1.50000E-03,1.191E+03
2.00000E-03,5.279E+02
3.00000E-03,1.625E+02
3.20290E-03,1.340E+02
3.20290E-03,1.485E+02
4.00000E-03,7.788E+01
5.00000E-03,4.027E+01
6.00000E-03,2.341E+01
8.00000E-03,9.921E+00
1.00000E-02,5.120E+00
1.50000E-02,1.614E+00
2.00000E-02,7.779E-01
3.00000E-02,3.538E-01
4.00000E-02,2.485E-01
5.00000E-02,2.080E-01
6.00000E-02,1.875E-01
8.00000E-02,1.662E-01
1.00000E-01,1.541E-01
1.50000E-01,1.356E-01
2.00000E-01,1.233E-01
3.00000E-01,1.067E-01
4.00000E-01,9.549E-02
5.00000E-01,8.712E-02
6.00000E-01,8.055E-02
8.00000E-01,7.074E-02
1.00000E+00,6.358E-02
1.25000E+00,5.687E-02
1.50000E+00,5.175E-02
2.00000E+00,4.447E-02
3.00000E+00,3.581E-02
4.00000E+00,3.079E-02
5.00000E+00,2.751E-02
6.00000E+00,2.522E-02
8.00000E+00,2.225E-02
1.00000E+01,2.045E-02
1.50000E+01,1.810E-02
2.00000E+01,1.705E-02
1 energy_mev mu
2 1.00000E-03 3.606E+03
3 1.50000E-03 1.191E+03
4 2.00000E-03 5.279E+02
5 3.00000E-03 1.625E+02
6 3.20290E-03 1.340E+02
7 3.20290E-03 1.485E+02
8 4.00000E-03 7.788E+01
9 5.00000E-03 4.027E+01
10 6.00000E-03 2.341E+01
11 8.00000E-03 9.921E+00
12 1.00000E-02 5.120E+00
13 1.50000E-02 1.614E+00
14 2.00000E-02 7.779E-01
15 3.00000E-02 3.538E-01
16 4.00000E-02 2.485E-01
17 5.00000E-02 2.080E-01
18 6.00000E-02 1.875E-01
19 8.00000E-02 1.662E-01
20 1.00000E-01 1.541E-01
21 1.50000E-01 1.356E-01
22 2.00000E-01 1.233E-01
23 3.00000E-01 1.067E-01
24 4.00000E-01 9.549E-02
25 5.00000E-01 8.712E-02
26 6.00000E-01 8.055E-02
27 8.00000E-01 7.074E-02
28 1.00000E+00 6.358E-02
29 1.25000E+00 5.687E-02
30 1.50000E+00 5.175E-02
31 2.00000E+00 4.447E-02
32 3.00000E+00 3.581E-02
33 4.00000E+00 3.079E-02
34 5.00000E+00 2.751E-02
35 6.00000E+00 2.522E-02
36 8.00000E+00 2.225E-02
37 1.00000E+01 2.045E-02
38 1.50000E+01 1.810E-02
39 2.00000E+01 1.705E-02

View File

@ -1,26 +0,0 @@
import pandas as pd
from pg_rad.logger import setup_logger
from pg_rad.exceptions import DataLoadError, InvalidCSVError
logger = setup_logger(__name__)
def load_data(filename: str) -> pd.DataFrame:
logger.debug(f"Attempting to load data from {filename}")
try:
df = pd.read_csv(filename, delimiter=',')
except FileNotFoundError as e:
logger.error(f"File not found: {filename}")
raise DataLoadError(f"File does not exist: {filename}") from e
except pd.errors.ParserError as e:
logger.error(f"Invalid CSV format: {filename}")
raise InvalidCSVError(f"Invalid CSV file: {filename}") from e
except Exception as e:
logger.exception(f"Unexpected error while loading {filename}")
raise DataLoadError("Unexpected error while loading data") from e
return df

View File

@ -0,0 +1,8 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.dataloader import dataloader
from pg_rad.dataloader.dataloader import (load_data,)
__all__ = ['dataloader', 'load_data']

View File

@ -0,0 +1,22 @@
import logging
import pandas as pd
logger = logging.getLogger(__name__)
def load_data(filename: str) -> pd.DataFrame:
logger.debug(f"Attempting to load file: {filename}")
try:
df = pd.read_csv(filename, delimiter=',')
except FileNotFoundError as e:
logger.critical(e)
raise
except pd.errors.ParserError as e:
logger.critical(e)
raise
logger.debug(f"File loaded: {filename}")
return df

View File

View File

@ -0,0 +1,20 @@
from pg_rad.inputparser.specs import DetectorSpec
from .detectors import IsotropicDetector, AngularDetector
class DetectorBuilder:
def __init__(
self,
detector_spec: DetectorSpec,
):
self.detector_spec = detector_spec
def build(self) -> IsotropicDetector | AngularDetector:
if self.detector_spec.is_isotropic:
return IsotropicDetector(
self.detector_spec.name,
self.detector_spec.efficiency
)
else:
raise NotImplementedError("Angular detector not supported yet.")

View File

@ -0,0 +1,38 @@
from abc import ABC
class BaseDetector(ABC):
def __init__(
self,
name: str,
efficiency: float
):
self.name = name
self.efficiency = efficiency
def get_efficiency(self):
pass
class IsotropicDetector(BaseDetector):
def __init__(
self,
name: str,
efficiency: float,
):
super().__init__(name, efficiency)
def get_efficiency(self, energy):
return self.efficiency
class AngularDetector(BaseDetector):
def __init__(
self,
name: str,
efficiency: float
):
super().__init__(name, efficiency)
def get_efficiency(self, angle, energy):
pass

View File

@ -1,8 +0,0 @@
class ConvergenceError(Exception):
"""Raised when an algorithm fails to converge."""
class DataLoadError(Exception):
"""Base class for data loading errors."""
class InvalidCSVError(DataLoadError):
"""Raised when a file is not a valid CSV."""

View File

@ -0,0 +1,10 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.exceptions import exceptions
from pg_rad.exceptions.exceptions import (ConvergenceError, DataLoadError,
InvalidCSVError, OutOfBoundsError,)
__all__ = ['ConvergenceError', 'DataLoadError', 'InvalidCSVError',
'OutOfBoundsError', 'exceptions']

View File

@ -0,0 +1,45 @@
class ConvergenceError(Exception):
"""Raised when an algorithm fails to converge."""
class DataLoadError(Exception):
"""Base class for data loading errors."""
class InvalidCSVError(DataLoadError):
"""Raised when a file is not a valid CSV."""
class InvalidYAMLError(DataLoadError):
"""Raised when a file is not a valid YAML."""
class OutOfBoundsError(Exception):
"""Raised when an object is attempted to be placed out of bounds."""
class MissingConfigKeyError(KeyError):
"""Raised when a (nested) config key is missing in the config."""
def __init__(self, key, subkey=None):
if subkey:
if isinstance(subkey, str):
pass
elif isinstance(subkey, set):
subkey = ', '.join(list(subkey))
self.message = f"Missing key in {key}: {subkey}"
else:
self.message = f"Missing key: {key}"
super().__init__(self.message)
class DimensionError(ValueError):
"""Raised if dimensions or coordinates do not match the system."""
class InvalidIsotopeError(ValueError):
"""Raised if attempting to load an isotope that is not valid."""
class InvalidConfigValueError(ValueError):
"""Raised if a config key has an incorrect type or value."""

View File

@ -0,0 +1,391 @@
import logging
import os
from typing import Any, Dict, List, Union
import yaml
from pg_rad.exceptions.exceptions import (
MissingConfigKeyError,
DimensionError,
InvalidConfigValueError,
InvalidYAMLError
)
from pg_rad.configs import defaults
from .specs import (
MetadataSpec,
RuntimeSpec,
SimulationOptionsSpec,
PathSpec,
ProceduralPathSpec,
CSVPathSpec,
PointSourceSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec,
DetectorSpec,
SimulationSpec
)
logger = logging.getLogger(__name__)
class ConfigParser:
_ALLOWED_ROOT_KEYS = {
"name",
"speed",
"acquisition_time",
"path",
"sources",
"detector",
"options"
}
def __init__(self, config_source: str):
self.config = self._load_yaml(config_source)
def parse(self) -> SimulationSpec:
self._warn_unknown_keys(
section="root",
provided=set(self.config.keys()),
allowed=self._ALLOWED_ROOT_KEYS,
)
metadata = self._parse_metadata()
runtime = self._parse_runtime()
options = self._parse_options()
path = self._parse_path()
sources = self._parse_point_sources()
detector = self._parse_detector()
return SimulationSpec(
metadata=metadata,
runtime=runtime,
options=options,
path=path,
point_sources=sources,
detector=detector
)
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
if os.path.exists(config_source):
with open(config_source) as f:
data = yaml.safe_load(f)
else:
data = yaml.safe_load(config_source)
if not isinstance(data, dict):
raise InvalidYAMLError(
"Provided path or string is not a valid YAML representation."
)
return data
def _parse_metadata(self) -> MetadataSpec:
try:
return MetadataSpec(
name=self.config["name"]
)
except KeyError as e:
raise MissingConfigKeyError("root", {"name"}) from e
def _parse_runtime(self) -> RuntimeSpec:
required = {"speed", "acquisition_time"}
missing = required - self.config.keys()
if missing:
raise MissingConfigKeyError("root", missing)
return RuntimeSpec(
speed=float(self.config["speed"]),
acquisition_time=float(self.config["acquisition_time"]),
)
def _parse_options(self) -> SimulationOptionsSpec:
options = self.config.get("options", {})
allowed = {"air_density_kg_per_m3", "seed"}
self._warn_unknown_keys(
section="options",
provided=set(options.keys()),
allowed=allowed,
)
air_density = options.get(
"air_density_kg_per_m3",
defaults.DEFAULT_AIR_DENSITY
)
seed = options.get("seed")
if not isinstance(air_density, float) or air_density <= 0:
raise InvalidConfigValueError(
"options.air_density_kg_per_m3 must be a positive float "
"in kg/m^3."
)
if (
seed is not None or
(isinstance(seed, int) and seed <= 0)
):
raise InvalidConfigValueError(
"Seed must be a positive integer value."
)
return SimulationOptionsSpec(
air_density=air_density,
seed=seed,
)
def _parse_path(self) -> PathSpec:
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
allowed_proc = {"segments", "length", "z", "alpha", "direction"}
path = self.config.get("path")
direction = path.get("direction", 'positive')
if direction == 'positive':
opposite_direction = False
elif direction == 'negative':
opposite_direction = True
else:
raise InvalidConfigValueError(
"Direction must be positive or negative."
)
if path is None:
raise MissingConfigKeyError("path")
if not isinstance(path, dict):
raise InvalidConfigValueError("Path must be a dictionary.")
if "file" in path:
self._warn_unknown_keys(
section="path (csv)",
provided=set(path.keys()),
allowed=allowed_csv,
)
return CSVPathSpec(
file=path["file"],
east_col_name=path["east_col_name"],
north_col_name=path["north_col_name"],
z=path.get("z", 0),
opposite_direction=opposite_direction
)
elif "segments" in path:
self._warn_unknown_keys(
section="path (procedural)",
provided=set(path.keys()),
allowed=allowed_proc,
)
return self._parse_procedural_path(path, opposite_direction)
else:
raise InvalidConfigValueError("Invalid path configuration.")
def _parse_procedural_path(
self,
path: Dict[str, Any],
opposite_direction: bool
) -> ProceduralPathSpec:
raw_segments = path.get("segments")
if not isinstance(raw_segments, List):
raise InvalidConfigValueError(
"path.segments must be a list of segments."
)
raw_length = path.get("length")
if raw_length is None:
raise MissingConfigKeyError("path", {"length"})
if isinstance(raw_length, int | float):
raw_length = [float(raw_length)]
segments, angles = self._process_segment_angles(raw_segments)
lengths = self._process_segment_lengths(raw_length, len(segments))
return ProceduralPathSpec(
segments=segments,
angles=angles,
lengths=lengths,
z=path.get("z", defaults.DEFAULT_PATH_HEIGHT),
alpha=path.get("alpha", defaults.DEFAULT_ALPHA),
opposite_direction=opposite_direction
)
def _process_segment_angles(
self,
raw_segments: List[Union[str, dict]]
) -> List[Dict[str, Any]]:
segments, angles = [], []
for segment in raw_segments:
if isinstance(segment, str):
segments.append(segment)
angles.append(None)
elif isinstance(segment, dict):
if len(segment) != 1:
raise InvalidConfigValueError(
"Invalid segment definition."
)
seg_type, angle = list(segment.items())[0]
segments.append(seg_type)
angles.append(angle)
else:
raise InvalidConfigValueError(
"Invalid segment entry format."
)
return segments, angles
def _process_segment_lengths(
self,
raw_length_list: List[Union[int, float]],
num_segments: int
) -> List[float]:
num_lengths = len(raw_length_list)
if num_lengths == num_segments or num_lengths == 1:
return raw_length_list
else:
raise ValueError(
"Path length must either be a single number or a list with "
"number of elements equal to the number of segments."
)
@staticmethod
def _is_turn(segment_type: str) -> bool:
return segment_type in {"turn_left", "turn_right"}
def _parse_point_sources(self) -> List[PointSourceSpec]:
source_dict = self.config.get("sources")
if source_dict is None:
raise MissingConfigKeyError("sources")
if not isinstance(source_dict, dict):
raise InvalidConfigValueError(
"sources must have subkeys representing point source names."
)
specs: List[PointSourceSpec] = []
for name, params in source_dict.items():
required = {"activity_MBq", "isotope", "position"}
if not isinstance(params, dict):
raise InvalidConfigValueError(
f"sources.{name} is not defined correctly."
f" Must have subkeys {required}"
)
missing = required - params.keys()
if missing:
raise MissingConfigKeyError(name, missing)
activity = params.get("activity_MBq")
isotope = params.get("isotope")
if not isinstance(activity, int | float) or activity <= 0:
raise InvalidConfigValueError(
f"sources.{name}.activity_MBq must be positive value "
"in MegaBequerels."
)
position = params.get("position")
if isinstance(position, list):
if len(position) != 3:
raise DimensionError(
"Absolute position must be [x, y, z]."
)
specs.append(
AbsolutePointSourceSpec(
name=name,
activity_MBq=float(activity),
isotope=isotope,
x=float(position[0]),
y=float(position[1]),
z=float(position[2]),
)
)
elif isinstance(position, dict):
alignment = position.get("acquisition_alignment")
if alignment not in {'best', 'worst', None}:
raise InvalidConfigValueError(
f"sources.{name}.acquisition_alignment must be "
"'best' or 'worst', with 'best' aligning source "
f"{name} in the middle of the two nearest acquisition "
"points, and 'worst' aligning exactly perpendicular "
"to the nearest acquisition point."
)
specs.append(
RelativePointSourceSpec(
name=name,
activity_MBq=float(activity),
isotope=isotope,
along_path=float(position["along_path"]),
dist_from_path=float(position["dist_from_path"]),
side=position["side"],
z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT),
alignment=alignment
)
)
else:
raise InvalidConfigValueError(
f"Invalid position format for source '{name}'."
)
return specs
def _parse_detector(self) -> DetectorSpec:
det_dict = self.config.get("detector")
required = {"name", "is_isotropic"}
if not isinstance(det_dict, dict):
raise InvalidConfigValueError(
"detector is not specified correctly. Must contain at least"
f"the subkeys {required}"
)
missing = required - det_dict.keys()
if missing:
raise MissingConfigKeyError("detector", missing)
name = det_dict.get("name")
is_isotropic = det_dict.get("is_isotropic")
eff = det_dict.get("efficiency")
default_detectors = defaults.DETECTOR_EFFICIENCIES
if name in default_detectors.keys() and not eff:
eff = default_detectors[name]
elif eff:
pass
else:
raise InvalidConfigValueError(
f"The detector {name} not found in library. Either "
f"specify {name}.efficiency or "
"choose a detector from the following list: "
f"{default_detectors.keys()}."
)
return DetectorSpec(
name=name,
efficiency=eff,
is_isotropic=is_isotropic
)
def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
unknown = provided - allowed
if unknown:
logger.warning(
f"Unknown keys in '{section}' section: {unknown}"
)

View File

@ -0,0 +1,81 @@
from abc import ABC
from dataclasses import dataclass
from typing import Literal
@dataclass
class MetadataSpec:
name: str
@dataclass
class RuntimeSpec:
speed: float
acquisition_time: float
@dataclass
class SimulationOptionsSpec:
air_density: float
seed: int | None = None
@dataclass
class PathSpec(ABC):
z: int | float
opposite_direction: bool
@dataclass
class ProceduralPathSpec(PathSpec):
segments: list[str]
angles: list[float]
lengths: list[int | None]
alpha: float
@dataclass
class CSVPathSpec(PathSpec):
file: str
east_col_name: str
north_col_name: str
@dataclass
class PointSourceSpec(ABC):
activity_MBq: float
isotope: str
name: str
@dataclass
class AbsolutePointSourceSpec(PointSourceSpec):
x: float
y: float
z: float
@dataclass
class RelativePointSourceSpec(PointSourceSpec):
along_path: float
dist_from_path: float
side: str
z: float
alignment: Literal["best", "worst"] | None
@dataclass
class DetectorSpec:
name: str
efficiency: float
is_isotropic: bool
@dataclass
class SimulationSpec:
metadata: MetadataSpec
runtime: RuntimeSpec
options: SimulationOptionsSpec
path: PathSpec
point_sources: list[PointSourceSpec]
detector: DetectorSpec

View File

@ -1,23 +0,0 @@
class Isotope:
"""Represents the essential information of an isotope.
Args:
name (str): Full name (e.g. Caesium-137).
E (float): Energy of the primary gamma in keV.
b (float): Branching ratio for the gamma at energy E.
"""
def __init__(
self,
name: str,
E: float,
b: float
):
if E <= 0:
raise ValueError("primary_gamma must be a positive energy (keV).")
if not (0 <= b <= 1):
raise ValueError("branching_ratio_pg must be a ratio (0 <= b <= 1)")
self.name = name
self.E = E
self.b = b

View File

@ -0,0 +1,9 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.isotopes import isotope
from pg_rad.isotopes.isotope import (CS137, Isotope, get_isotope,
preset_isotopes,)
__all__ = ['CS137', 'Isotope', 'get_isotope', 'isotope', 'preset_isotopes']

View File

@ -0,0 +1,51 @@
from typing import Dict, Type
from pg_rad.exceptions.exceptions import InvalidIsotopeError
from pg_rad.physics.attenuation import get_mass_attenuation_coeff
class Isotope:
"""Represents the essential information of an isotope.
Args:
name (str): Full name (e.g. Caesium-137).
E (float): Energy of the primary gamma in keV.
b (float): Branching ratio for the gamma at energy E.
"""
def __init__(
self,
name: str,
E: float,
b: float
):
if E <= 0:
raise ValueError("primary_gamma must be a positive energy (keV).")
if not (0 <= b <= 1):
raise ValueError("branching_ratio_pg must be a ratio b in [0,1]")
self.name = name
self.E = E
self.b = b
self.mu_mass_air = get_mass_attenuation_coeff(E / 1000)
class CS137(Isotope):
def __init__(self):
super().__init__(
name="Cs-137",
E=661.66,
b=0.851
)
preset_isotopes: Dict[str, Type[Isotope]] = {
"Cs137": CS137
}
def get_isotope(isotope_str: str) -> Isotope:
"""Lazy factory function to create isotope objects."""
if isotope_str not in preset_isotopes:
raise InvalidIsotopeError(f"Unknown isotope: {isotope_str}")
return preset_isotopes[isotope_str]()

View File

@ -1,126 +0,0 @@
from matplotlib import pyplot as plt
from matplotlib.patches import Circle
import numpy as np
from pg_rad.path import Path
from pg_rad.sources import PointSource
class Landscape:
"""A generic Landscape that can contain a Path and sources.
Args:
air_density (float, optional): Air density in kg / m^3. Defaults to 1.243.
size (int | tuple[int, int, int], optional): Size of the world. Defaults to 500.
scale (str, optional): The scale of the size argument passed. Defaults to 'meters'.
"""
def __init__(
self,
air_density: float = 1.243,
size: int | tuple[int, int, int] = 500,
scale = 'meters',
):
if isinstance(size, int):
self.world = np.zeros((size, size, size))
elif isinstance(size, tuple) and len(size) == 3:
self.world = np.zeros(size)
else:
raise TypeError("size must be an integer or a tuple of 3 integers.")
self.air_density = air_density
self.scale = scale
self.path: Path = None
self.sources: list[PointSource] = []
def plot(self, z = 0):
"""Plot a slice of the world at a height `z`.
Args:
z (int, optional): Height of slice. Defaults to 0.
Returns:
fig, ax: Matplotlib figure objects.
"""
x_lim, y_lim, _ = self.world.shape
fig, ax = plt.subplots()
ax.set_xlim(right=x_lim)
ax.set_ylim(top=y_lim)
ax.set_xlabel(f"X [{self.scale}]")
ax.set_ylabel(f"Y [{self.scale}]")
if not self.path == None:
ax.plot(self.path.x_list, self.path.y_list, 'bo-')
for s in self.sources:
if np.isclose(s.z, z):
dot = Circle(
(s.x, s.y),
radius=5,
color=s.color,
zorder=5
)
ax.text(
s.x + 0.06,
s.y + 0.06,
s.name,
color=s.color,
fontsize=10,
ha="left",
va="bottom",
zorder=6
)
ax.add_patch(dot)
return fig, ax
def add_sources(self, *sources: PointSource):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources, passed as
Source1, Source2, ...
Raises:
ValueError: If the source is outside the boundaries of the landscape.
"""
max_x, max_y, max_z = self.world.shape[:3]
if any(
not (0 <= source.x < max_x and
0 <= source.y < max_y and
0 <= source.z < max_z)
for source in sources
):
raise ValueError("One or more sources are outside the landscape!")
self.sources.extend(sources)
def set_path(self, path: Path):
"""
Set the path in the landscape.
"""
self.path = path
def create_landscape_from_path(path: Path, max_z = 500):
"""Generate a landscape from a path, using its dimensions to determine
the size of the landscape.
Args:
path (Path): A Path object describing the trajectory.
max_z (int, optional): Height of the world. Defaults to 500 meters.
Returns:
landscape (pg_rad.landscape.Landscape): A landscape with dimensions based on the provided Path.
"""
max_x = np.ceil(max(path.x_list))
max_y = np.ceil(max(path.y_list))
landscape = Landscape(
size = (max_x, max_y, max_z)
)
landscape.path = path
return landscape

View File

View File

@ -0,0 +1,243 @@
import logging
from typing import Literal, Self
import numpy as np
from .landscape import Landscape
from pg_rad.dataloader.dataloader import load_data
from pg_rad.objects.sources import PointSource
from pg_rad.exceptions.exceptions import OutOfBoundsError
from pg_rad.utils.projection import rel_to_abs_source_position
from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
AbsolutePointSourceSpec,
RelativePointSourceSpec,
DetectorSpec
)
from pg_rad.path.path import Path, path_from_RT90
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
logger = logging.getLogger(__name__)
class LandscapeBuilder:
def __init__(self, name: str = "Unnamed landscape"):
self.name = name
self._path = None
self._point_sources = []
self._size = None
self._air_density = 1.243
self._detector = None
logger.debug(f"LandscapeBuilder initialized: {self.name}")
def set_air_density(self, air_density: float | None) -> Self:
"""Set the air density of the world."""
if air_density and isinstance(air_density, float):
self._air_density = air_density
return self
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def get_path(self):
return self._path
def set_path_from_segments(
self,
sim_spec: SimulationSpec
):
path = sim_spec.path
segments = path.segments
lengths = path.lengths
angles = path.angles
alpha = path.alpha
z = path.z
sg = SegmentedRoadGenerator(
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
velocity=sim_spec.runtime.speed,
seed=sim_spec.options.seed
)
x, y = sg.generate(
segments=segments,
lengths=lengths,
angles=angles,
alpha=alpha
)
self._path = Path(
list(zip(x, y)),
z=z,
opposite_direction=path.opposite_direction
)
self._fit_landscape_to_path()
return self
def set_path_from_experimental_data(
self,
spec: CSVPathSpec
) -> Self:
df = load_data(spec.file)
self._path = path_from_RT90(
df=df,
east_col=spec.east_col_name,
north_col=spec.north_col_name,
z=spec.z,
opposite_direction=spec.opposite_direction
)
self._fit_landscape_to_path()
return self
def set_point_sources(
self,
*sources: AbsolutePointSourceSpec | RelativePointSourceSpec
):
"""Add one or more point sources to the world.
Args:
*sources (AbsolutePointSourceSpec | RelativePointSourceSpec):
One or more sources, passed as SourceSpecs tuple
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
for s in sources:
if isinstance(s, AbsolutePointSourceSpec):
pos = (s.x, s.y, s.z)
elif isinstance(s, RelativePointSourceSpec):
path = self.get_path()
if s.alignment:
along_path = self._align_relative_source(
s.along_path,
path,
s.alignment
)
logger.info(
f"Because source {s.name} was set to align with path "
f"({s.alignment} alignment), it was moved to be at "
f"{along_path} m along the path from {s.along_path} m."
)
else:
along_path = s.along_path
pos = rel_to_abs_source_position(
x_list=path.x_list,
y_list=path.y_list,
path_z=path.z,
along_path=along_path,
side=s.side,
dist_from_path=s.dist_from_path)
if any(
p < 0 or p >= s for p, s in zip(pos, self._size)
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources.append(PointSource(
activity_MBq=s.activity_MBq,
isotope=s.isotope,
position=pos,
name=s.name
))
def set_detector(self, spec: DetectorSpec) -> Self:
self._detector = spec
return self
def _fit_landscape_to_path(self) -> None:
"""The size of the landscape will be updated if
1) _size is not set, or
2) _size is too small to contain the path."""
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.debug("Because no Landscape size was set, "
"it will now set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Landscape size will be expanded to accommodate path."
)
max_size = max(self._path.size)
self.set_landscape_size((max_size, max_size))
def _align_relative_source(
self,
along_path: float,
path: "Path",
mode: Literal["best", "worst"],
) -> tuple[float, float, float]:
"""Given the arc length at which the point source is placed,
align the source relative to the waypoints of the path. Here,
'best' means the point source is moved such that it is
perpendicular to the midpoint between two acuisition points.
'worst' means the point source is moved such that it is
perpendicular to the nearest acquisition point.
The distance to the path is not affected by this algorithm.
For more details on alignment, see
Fig. 4 and page 24 in Bukartas (2021).
Args:
along_path (float): Current arc length position of the source.
path (Path): The path to align to.
mode (Literal["best", "worst"]): Alignment mode.
Returns:
along_new (float): The updated arc length position.
"""
ds = np.hypot(
path.x_list[1] - path.x_list[0],
path.y_list[1] - path.y_list[0],
)
if mode == "worst":
along_new = round(along_path / ds) * ds
elif mode == "best":
along_new = (round(along_path / ds - 0.5) + 0.5) * ds
else:
raise ValueError(f"Unknown alignment mode: {mode}")
return along_new
def build(self):
landscape = Landscape(
name=self.name,
path=self._path,
point_sources=self._point_sources,
detector=self._detector,
size=self._size,
air_density=self._air_density
)
logger.info(f"Landscape built successfully: {landscape.name}")
return landscape

View File

@ -0,0 +1,50 @@
from importlib.resources import files
import logging
from pg_rad.configs.filepaths import TEST_EXP_DATA
from .builder import LandscapeBuilder
from pg_rad.inputparser.specs import (
SimulationSpec,
CSVPathSpec,
ProceduralPathSpec
)
from pg_rad.objects.sources import PointSource
logger = logging.getLogger(__name__)
class LandscapeDirector:
def __init__(self):
logger.debug("LandscapeDirector initialized.")
@staticmethod
def build_test_landscape():
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
source = PointSource(
activity_MBq=100E6,
isotope="CS137",
position=(0, 0, 0)
)
lb = LandscapeBuilder("Test landscape")
lb.set_air_density(1.243)
lb.set_path_from_experimental_data(fp, z=0)
lb.set_point_sources(source)
landscape = lb.build()
return landscape
@staticmethod
def build_from_config(config: SimulationSpec):
lb = LandscapeBuilder(config.metadata.name)
lb.set_air_density(config.options.air_density)
if isinstance(config.path, CSVPathSpec):
lb.set_path_from_experimental_data(spec=config.path)
elif isinstance(config.path, ProceduralPathSpec):
lb.set_path_from_segments(
sim_spec=config,
)
lb.set_point_sources(*config.point_sources)
lb.set_detector(config.detector)
landscape = lb.build()
return landscape

View File

@ -0,0 +1,41 @@
import logging
from pg_rad.path.path import Path
from pg_rad.detector.detectors import AngularDetector, IsotropicDetector
from pg_rad.objects.sources import PointSource
logger = logging.getLogger(__name__)
class Landscape:
"""
A generic Landscape that can contain a Path and sources.
"""
def __init__(
self,
name: str,
path: Path,
air_density: float,
point_sources: list[PointSource],
detector: IsotropicDetector | AngularDetector,
size: tuple[int, int, int]
):
"""Initialize a landscape.
Args:
name (str): Name of the landscape.
path (Path): The path of the detector.
air_density (float): Air density in kg/m^3.
point_sources (list[PointSource]): List of point sources.
size (tuple[int, int, int]): Size of the world.
"""
self.name = name
self.path = path
self.point_sources = point_sources
self.size = size
self.air_density = air_density
self.detector = detector
logger.debug(f"Landscape created: {self.name}")

View File

@ -1,17 +0,0 @@
import logging
import logging.config
import pathlib
import yaml
def setup_logger(name):
logger = logging.getLogger(name)
base_dir = pathlib.Path(__file__).resolve().parent
config_file = base_dir / "configs" / "logging.yml"
with open(config_file) as f:
config = yaml.safe_load(f)
logging.config.dictConfig(config)
return logger

View File

@ -0,0 +1,5 @@
from pg_rad.logger import logger
from pg_rad.logger.logger import (setup_logger,)
__all__ = ['logger', 'setup_logger']

View File

@ -0,0 +1,40 @@
import logging.config
from importlib.resources import files
import yaml
from pg_rad.configs.filepaths import LOGGING_CONFIG
def setup_logger(log_level: str = "WARNING"):
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if log_level not in levels:
raise ValueError(f"Log level must be one of {levels}.")
config_file = files('pg_rad.configs').joinpath(LOGGING_CONFIG)
with open(config_file) as f:
config = yaml.safe_load(f)
config["loggers"]["root"]["level"] = log_level
logging.config.dictConfig(config)
class ColorFormatter(logging.Formatter):
# ANSI escape codes
COLORS = {
logging.DEBUG: "\033[36m", # Cyan
logging.INFO: "\033[32m", # Green
logging.WARNING: "\033[33m", # Yellow
logging.ERROR: "\033[31m", # Red
logging.CRITICAL: "\033[41m", # Red background
}
RESET = "\033[0m"
def format(self, record):
color = self.COLORS.get(record.levelno, self.RESET)
record.levelname = f"{color}{record.levelname}{self.RESET}"
record.msg = f"{record.msg}"
return super().format(record)

137
src/pg_rad/main.py Normal file
View File

@ -0,0 +1,137 @@
import argparse
import logging
import sys
from pandas.errors import ParserError
from pg_rad.detector.builder import DetectorBuilder
from pg_rad.exceptions.exceptions import (
MissingConfigKeyError,
OutOfBoundsError,
DimensionError,
InvalidConfigValueError,
InvalidIsotopeError,
InvalidYAMLError
)
from pg_rad.logger.logger import setup_logger
from pg_rad.inputparser.parser import ConfigParser
from pg_rad.landscape.director import LandscapeDirector
from pg_rad.plotting.result_plotter import ResultPlotter
from pg_rad.simulator.engine import SimulationEngine
def main():
parser = argparse.ArgumentParser(
prog="pg-rad",
description="Primary Gamma RADiation landscape tool"
)
parser.add_argument(
"--config",
help="Build from a config file."
)
parser.add_argument(
"--test",
action="store_true",
help="Load and run the test landscape."
)
parser.add_argument(
"--loglevel",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"--saveplot",
action="store_true",
help="Save the plot or not."
)
args = parser.parse_args()
setup_logger(args.loglevel)
logger = logging.getLogger(__name__)
if args.test:
test_yaml = """
name: Test landscape
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
- turn_left
direction: negative
sources:
test_source:
activity_MBq: 100
position: [250, 100, 0]
isotope: Cs137
detector:
name: dummy
is_isotropic: True
"""
cp = ConfigParser(test_yaml).parse()
landscape = LandscapeDirector.build_from_config(cp)
detector = DetectorBuilder(cp.detector).build()
output = SimulationEngine(
landscape=landscape,
runtime_spec=cp.runtime,
sim_spec=cp.options,
detector=detector
).simulate()
plotter = ResultPlotter(landscape, output)
plotter.plot()
elif args.config:
try:
cp = ConfigParser(args.config).parse()
landscape = LandscapeDirector.build_from_config(cp)
detector = DetectorBuilder(cp.detector).build()
output = SimulationEngine(
landscape=landscape,
detector=detector,
runtime_spec=cp.runtime,
sim_spec=cp.options
).simulate()
plotter = ResultPlotter(landscape, output)
plotter.plot()
except (
MissingConfigKeyError,
KeyError
) as e:
logger.critical(e)
logger.critical(
"The config file is missing required keys or may be an "
"invalid YAML file. Check the log above. Consult the "
"documentation for examples of how to write a config file."
)
sys.exit(1)
except (
OutOfBoundsError,
DimensionError,
InvalidIsotopeError,
InvalidConfigValueError
) as e:
logger.critical(e)
logger.critical(
"One or more items in config are not specified correctly. "
"Please consult this log and fix the problem."
)
sys.exit(1)
except (
FileNotFoundError,
ParserError,
InvalidYAMLError
) as e:
logger.critical(e)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,33 +0,0 @@
import math
from typing import Self
class Object:
def __init__(
self,
x: float,
y: float,
z: float,
name: str = "Unnamed object",
color: str = 'grey'):
"""
A generic object.
Args:
x (float): X coordinate.
y (float): Y coordinate.
z (float): Z coordinate.
name (str, optional): Name for the object. Defaults to "Unnamed object".
color (str, optional): Matplotlib compatible color string. Defaults to "red".
"""
self.x = x
self.y = y
self.z = z
self.name = name
self.color = color
def distance_to(self, other: Self) -> float:
return math.dist(
(self.x, self.y, self.z),
(other.x, other.y, other.z),
)

View File

@ -0,0 +1,11 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.objects import objects
from pg_rad.objects import sources
from pg_rad.objects.objects import (BaseObject,)
from pg_rad.objects.sources import (PointSource,)
__all__ = ['BaseObject', 'PointSource', 'objects',
'sources']

View File

@ -0,0 +1,44 @@
from typing import Self
import numpy as np
from pg_rad.exceptions.exceptions import DimensionError
class BaseObject:
def __init__(
self,
pos: tuple[float, float, float] = (0, 0, 0),
name: str = "Unnamed object",
color: str = 'grey'):
"""
A generic object.
Args:
pos (tuple[float, float, float]): Position vector (x,y,z).
name (str, optional): Name for the object.
Defaults to "Unnamed object".
color (str, optional): Matplotlib compatible color string.
Defaults to "red".
"""
if len(pos) != 3:
raise DimensionError("Position must be tuple of length 3 (x,y,z).")
self.pos = pos
self.name = name
self.color = color
def distance_to(self, other: Self | tuple) -> float:
if isinstance(other, tuple) and len(other) == 3:
r = np.linalg.norm(
np.subtract(self.pos, other)
)
else:
try:
r = np.linalg.norm(
np.subtract(self.pos, other.pos)
)
except AttributeError as e:
raise e("other must be an object in the world \
or a position tuple (x,y,z).")
return r

View File

@ -0,0 +1,54 @@
import logging
from .objects import BaseObject
from pg_rad.isotopes.isotope import Isotope, get_isotope
logger = logging.getLogger(__name__)
class PointSource(BaseObject):
_id_counter = 1
def __init__(
self,
activity_MBq: int,
isotope: str,
position: tuple[float, float, float] = (0, 0, 0),
name: str | None = None,
color: str = 'red'
):
"""A point source.
Args:
activity_MBq (int): Activity A in MBq.
isotope (Isotope): The isotope.
pos (tuple[float, float, float], optional):
Position of the PointSource.
name (str, optional): Can give the source a unique name.
If not provided, point sources are sequentially
named: Source1, Source2, ...
color (str, optional): Matplotlib compatible color string
"""
self.id = PointSource._id_counter
PointSource._id_counter += 1
# default name derived from ID if not provided
if name is None:
name = f"Source {self.id}"
super().__init__(position, name, color)
self.activity = activity_MBq
self.isotope: Isotope = get_isotope(isotope)
logger.debug(f"Source created: {self.name}")
def __repr__(self):
x, y, z = self.position
repr_str = (f"PointSource(name={self.name}, "
+ f"pos={(x, y, z)}, "
+ f"A={self.activity} MBq), "
+ f"isotope={self.isotope.name}.")
return repr_str

View File

@ -1,187 +0,0 @@
from collections.abc import Sequence
import math
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import piecewise_regression
from pg_rad.exceptions import ConvergenceError
from pg_rad.logger import setup_logger
logger = setup_logger(__name__)
class PathSegment:
def __init__(self, a: tuple[float, float], b: tuple[float, float]):
"""A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b).
Args:
a (tuple[float, float]): The starting point (x_a, y_a).
b (tuple[float, float]): The final point (x_b, y_b).
"""
self.a = a
self.b = b
def get_length(self) -> float:
return math.dist(self.a, self.b)
length = property(get_length)
def __str__(self) -> str:
return str(f"({self.a}, {self.b})")
def __getitem__(self, index) -> float:
if index == 0:
return self.a
elif index == 1:
return self.b
else:
raise IndexError
class Path:
def __init__(
self,
coord_list: Sequence[tuple[float, float]],
z: float = 0,
path_simplify = False
):
"""Construct a path of sequences based on a list of coordinates.
Args:
coord_list (Sequence[tuple[float, float]]): List of x,y coordinates.
z (float, optional): Height of the path. Defaults to 0.
path_simplify (bool, optional): Whether to pg_rad.path.simplify_path(). Defaults to False.
"""
if len(coord_list) < 2:
raise ValueError("Must provide at least two coordinates as a list of tuples, e.g. [(x1, y1), (x2, y2)]")
x, y = tuple(zip(*coord_list))
if path_simplify:
try:
x, y = simplify_path(list(x), list(y))
except ConvergenceError:
logger.warning("Continuing without simplifying path.")
self.x_list = list(x)
self.y_list = list(y)
coord_list = list(zip(x, y))
self.segments = [PathSegment(i, ip1) for i, ip1 in zip(coord_list, coord_list[1:])]
self.z = z
def get_length(self) -> float:
return sum([s.length for s in self.segments])
length = property(get_length)
def __getitem__(self, index) -> PathSegment:
return self.segments[index]
def __str__(self) -> str:
return str([str(s) for s in self.segments])
def plot(self, **kwargs):
"""
Plot the path using matplotlib.
"""
plt.plot(self.x_list, self.y_list, **kwargs)
def simplify_path(
x: Sequence[float],
y: Sequence[float],
keep_endpoints_equal: bool = False,
n_breakpoints: int = 3
):
"""From full resolution x and y arrays, return a piecewise linearly approximated/simplified pair of x and y arrays.
This function uses the `piecewise_regression` package. From a full set of
coordinate pairs, the function fits linear sections, automatically finding
the number of breakpoints and their positions.
On why the default value of n_breakpoints is 3, from the `piecewise_regression`
docs:
"If you do not have (or do not want to use) initial guesses for the number
of breakpoints, you can set it to n_breakpoints=3, and the algorithm will
randomly generate start_values. With a 50% chance, the bootstrap restarting
algorithm will either use the best currently converged breakpoints or
randomly generate new start_values, escaping the local optima in two ways in
order to find better global optima."
Args:
x (Sequence[float]): Full list of x coordinates.
y (Sequence[float]): Full list of y coordinates.
keep_endpoints_equal (bool, optional): Whether or not to force start
and end to be exactly equal to the original. This will worsen the linear
approximation at the beginning and end of path. Defaults to False.
n_breakpoints (int, optional): Number of breakpoints. Defaults to 3.
Returns:
x (list[float]): Reduced list of x coordinates.
y (list[float]): Reduced list of y coordinates.
Raises:
ConvergenceError: If the fitting algorithm failed to simplify the path.
Reference:
Pilgrim, C., (2021). piecewise-regression (aka segmented regression) in Python. Journal of Open Source Software, 6(68), 3859, https://doi.org/10.21105/joss.03859.
"""
logger.debug(f"Attempting piecewise regression on path.")
pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints)
pw_res = pw_fit.get_results()
if pw_res == None:
logger.error("Piecewise regression failed to converge.")
raise ConvergenceError("Piecewise regression failed to converge.")
est = pw_res['estimates']
# extract and sort breakpoints
breakpoints_x = sorted(
v['estimate'] for k, v in est.items() if k.startswith('breakpoint')
)
x_points = [x[0]] + breakpoints_x + [x[-1]]
y_points = pw_fit.predict(x_points)
if keep_endpoints_equal:
logger.debug("Forcing endpoint equality.")
y_points[0] = y[0]
y_points[-1] = y[-1]
logger.info(
f"Piecewise regression reduced path from {len(x)-1} to {len(x_points)-1} segments."
)
return x_points, y_points
def path_from_RT90(
df: pd.DataFrame,
east_col: str = "East",
north_col: str = "North",
**kwargs
) -> Path:
"""Construct a path from East and North formatted coordinates (RT90) in a Pandas DataFrame.
Args:
df (pandas.DataFrame): DataFrame containing at least the two columns noted in the cols argument.
east_col (str): The column name for the East coordinates.
north_col (str): The column name for the North coordinates.
Returns:
Path: A Path object built from the aquisition coordinates in the DataFrame.
"""
east_arr = np.array(df[east_col]) - min(df[east_col])
north_arr = np.array(df[north_col]) - min(df[north_col])
coord_pairs = list(zip(east_arr, north_arr))
path = Path(coord_pairs, **kwargs)
return path

View File

@ -0,0 +1,8 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.path import path
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,)
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90']

133
src/pg_rad/path/path.py Normal file
View File

@ -0,0 +1,133 @@
from collections.abc import Sequence
import logging
import math
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
class PathSegment:
def __init__(self, a: tuple[float, float], b: tuple[float, float]):
"""A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b).
Args:
a (tuple[float, float]): The starting point (x_a, y_a).
b (tuple[float, float]): The final point (x_b, y_b).
"""
self.a = a
self.b = b
def get_length(self) -> float:
return math.dist(self.a, self.b)
length = property(get_length)
def __str__(self) -> str:
return str(f"({self.a}, {self.b})")
def __getitem__(self, index) -> float:
if index == 0:
return self.a
elif index == 1:
return self.b
else:
raise IndexError
class Path:
def __init__(
self,
coord_list: Sequence[tuple[float, float]],
opposite_direction: bool,
z: float = 0.,
z_box: float = 50.
):
"""Construct a path of sequences based on a list of coordinates.
Args:
coord_list (Sequence[tuple[float, float]]): List of x,y
coordinates.
z (float, optional): position of the path in z-direction in meters.
Defaults to 0 meters.
z_box (float, optional): How much empty space to set
above the path in meters. Defaults to 50 meters.
"""
if len(coord_list) < 2:
raise ValueError("Must provide at least two coordinates as a \
of tuples, e.g. [(x1, y1), (x2, y2)]")
x, y = tuple(zip(*coord_list))
self.x_list = list(x)
self.y_list = list(y)
coord_list = list(zip(x, y))
self.segments = [
PathSegment(i, ip1)
for i, ip1 in
zip(coord_list, coord_list[1:])
]
self.z = z
self.opposite_direction = opposite_direction
self.size = (
np.ceil(max(self.x_list)),
np.ceil(max(self.y_list)),
z + z_box
)
logger.debug("Path created.")
def get_length(self) -> float:
return sum([s.length for s in self.segments])
length = property(get_length)
def __getitem__(self, index) -> PathSegment:
return self.segments[index]
def __str__(self) -> str:
return str([str(s) for s in self.segments])
def plot(self, **kwargs):
"""
Plot the path using matplotlib.
"""
plt.plot(self.x_list, self.y_list, **kwargs)
def path_from_RT90(
df: pd.DataFrame,
east_col: str = "East",
north_col: str = "North",
**kwargs
) -> Path:
"""Construct a path from East and North formatted coordinates (RT90)
in a Pandas DataFrame.
Args:
df (pandas.DataFrame): DataFrame containing at least the two columns
noted in the cols argument.
east_col (str): The column name for the East coordinates.
north_col (str): The column name for the North coordinates.
Returns:
Path: A Path object built from the aquisition coordinates in the
DataFrame.
"""
east_arr = np.array(df[east_col]) - min(df[east_col])
north_arr = np.array(df[north_col]) - min(df[north_col])
coord_pairs = list(zip(east_arr, north_arr))
path = Path(coord_pairs, **kwargs)
logger.debug("Loaded path from provided RT90 coordinates.")
return path

View File

@ -0,0 +1,12 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.physics import attenuation
from pg_rad.physics import fluence
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
from pg_rad.physics.fluence import (calculate_fluence_along_path,
calculate_fluence_at, phi,)
__all__ = ['attenuation', 'calculate_fluence_along_path',
'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff',
'phi']

View File

@ -0,0 +1,17 @@
from importlib.resources import files
from pandas import read_csv
from scipy.interpolate import interp1d
from pg_rad.configs.filepaths import ATTENUATION_TABLE
def get_mass_attenuation_coeff(
*args
) -> float:
csv = files('pg_rad.data').joinpath(ATTENUATION_TABLE)
data = read_csv(csv)
x = data["energy_mev"].to_numpy()
y = data["mu"].to_numpy()
f = interp1d(x, y)
return f(*args)

View File

@ -0,0 +1,165 @@
from typing import Tuple, TYPE_CHECKING
import numpy as np
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
if TYPE_CHECKING:
from pg_rad.landscape.landscape import Landscape
def phi(
r: float,
activity: float | int,
branching_ratio: float,
mu_mass_air: float,
air_density: float,
eff: float
) -> float:
"""Compute the contribution of a single point source to the
primary photon fluence rate phi at position (x,y,z).
Args:
r (float): [m] Distance to the point source.
activity (float | int): [Bq] Activity of the point source.
branching_ratio (float): Branching ratio for the photon energy E_gamma.
mu_mass_air (float): [cm^2/g] Mass attenuation coefficient for air.
air_density (float): [kg/m^3] Air density.
Returns:
phi (float): [s^-1 m^-2] Primary photon fluence rate at distance r from
a point source.
"""
# Linear photon attenuation coefficient in m^-1.
mu_mass_air *= 0.1
mu_air = mu_mass_air * air_density
phi_r = (
activity
* eff
* branching_ratio
* np.exp(-mu_air * r)
/ (4 * np.pi * r**2)
)
return phi_r
def calculate_count_rate_per_second(
landscape: "Landscape",
pos: np.ndarray,
detector: IsotropicDetector | AngularDetector,
tangent_vectors: np.ndarray,
scaling=1E6
):
"""Compute count rate in s^-1 m^-2 at a position in the landscape.
Args:
landscape (Landscape): The landscape to compute.
pos (np.ndarray): (N, 3) array of positions.
detector (IsotropicDetector | AngularDetector):
Detector object, needed to compute correct efficiency.
Returns:
total_phi (np.ndarray): (N,) array of count rates per second.
"""
pos = np.atleast_2d(pos)
total_phi = np.zeros(pos.shape[0])
for source in landscape.point_sources:
source_to_detector = pos - np.array(source.pos)
r = np.linalg.norm(source_to_detector, axis=1)
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
if isinstance(detector, AngularDetector):
cos_theta = (
np.sum(tangent_vectors * source_to_detector, axis=1) / (
np.linalg.norm(source_to_detector, axis=1) *
np.linalg.norm(tangent_vectors, axis=1)
)
)
cos_theta = np.clip(cos_theta, -1, 1)
theta = np.arccos(cos_theta)
eff = detector.get_efficiency(theta, energy=source.isotope.E)
else:
eff = detector.get_efficiency(energy=source.isotope.E)
phi_source = phi(
r=r,
activity=source.activity * scaling,
branching_ratio=source.isotope.b,
mu_mass_air=source.isotope.mu_mass_air,
air_density=landscape.air_density,
eff=eff
)
total_phi += phi_source
return total_phi
def calculate_counts_along_path(
landscape: "Landscape",
detector: "IsotropicDetector | AngularDetector",
acquisition_time: int,
points_per_segment: int = 10,
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute the counts recorded in each acquisition period in the landscape.
Args:
landscape (Landscape): _description_
detector (IsotropicDetector | AngularDetector): _description_
points_per_segment (int, optional): _description_. Defaults to 100.
Returns:
Tuple[np.ndarray, np.ndarray]: Array of acquisition points and
integrated count rates.
"""
path = landscape.path
num_points = len(path.x_list)
num_segments = len(path.segments)
segment_lengths = np.array([seg.length for seg in path.segments])
ds = segment_lengths[0]
original_distances = np.zeros(num_points)
original_distances[1:] = np.cumsum(segment_lengths)
# arc lengths at which to evaluate the path
total_subpoints = num_segments * points_per_segment
s = np.linspace(0, original_distances[-1], total_subpoints)
# Interpolate x and y as functions of arc length
xnew = np.interp(s, original_distances, path.x_list)
ynew = np.interp(s, original_distances, path.y_list)
z = np.full_like(xnew, path.z)
full_positions = np.c_[xnew, ynew, z]
if path.opposite_direction:
full_positions = np.flip(full_positions, axis=0)
# to compute the angle between sources and the direction of travel, we
# compute tangent vectors along the path.
dx_ds = np.gradient(xnew, s)
dy_ds = np.gradient(ynew, s)
tangent_vectors = np.c_[dx_ds, dy_ds, np.zeros_like(dx_ds)]
tangent_vectors /= np.linalg.norm(tangent_vectors, axis=1, keepdims=True)
count_rate = calculate_count_rate_per_second(
landscape, full_positions, detector, tangent_vectors
)
count_rate *= (acquisition_time / points_per_segment)
count_rate_segs = count_rate.reshape(num_segments, points_per_segment)
integrated = np.trapezoid(
count_rate_segs,
dx=ds/points_per_segment,
axis=1
)
result = np.zeros(num_points)
result[1:] = integrated
return original_distances, result

View File

@ -0,0 +1,7 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.plotting import landscape_plotter
from pg_rad.plotting.landscape_plotter import (LandscapeSlicePlotter,)
__all__ = ['LandscapeSlicePlotter', 'landscape_plotter']

View File

@ -0,0 +1,150 @@
import logging
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.patches import Circle
from numpy import median
from pg_rad.landscape.landscape import Landscape
logger = logging.getLogger(__name__)
class LandscapeSlicePlotter:
def plot(
self,
landscape: Landscape,
z: int = 0,
show: bool = True,
save: bool = False,
ax: Axes | None = None
):
"""Plot a top-down slice of the landscape at a height z.
Args:
landscape (Landscape): the landscape to plot
z (int, optional): Height at which to plot slice. Defaults to 0.
show (bool, optional): Show the plot. Defaults to True.
save (bool, optional): Save the plot. Defaults to False.
""" """
"""
self.z = z
if not ax:
fig, ax = plt.subplots()
self._draw_base(ax, landscape)
self._draw_path(ax, landscape)
self._draw_point_sources(ax, landscape)
ax.set_aspect("equal")
if save and not ax:
landscape_name = landscape.name.lower().replace(' ', '_')
filename = f"{landscape_name}_z{self.z}.png"
plt.savefig(filename)
logger.info("Plot saved to file: "+filename)
if show and not ax:
plt.show()
return ax
def _draw_base(self, ax, landscape: Landscape):
width, height = landscape.size[:2]
ax.set_xlim(right=max(width, .5*height))
# if the road is very flat, we center it vertically (looks better)
if median(landscape.path.y_list) == 0:
h = max(height, .5*width)
ax.set_ylim(bottom=-h//2,
top=h//2)
else:
ax.set_ylim(top=max(height, .5*width))
ax.set_xlabel("X [m]")
ax.set_ylabel("Y [m]")
ax.set_title(f"Landscape (top-down, z = {self.z})")
def _draw_path(self, ax, landscape: Landscape):
if landscape.path.z <= self.z:
ax.plot(
landscape.path.x_list,
landscape.path.y_list,
linestyle='-',
marker='|',
markersize=3,
linewidth=1
)
if len(landscape.path.x_list) >= 2:
ax = self._draw_path_direction_arrow(ax, landscape.path)
else:
logger.warning(
"Path is above the slice height z."
"It will not show on the plot."
)
def _draw_point_sources(self, ax, landscape):
for s in landscape.point_sources:
x, y, z = s.pos
if z <= self.z:
dot = Circle(
(x, y),
radius=5,
color=s.color,
zorder=5
)
ax.text(
x + 0.06,
y + 0.06,
s.name+", z="+str(z),
color=s.color,
fontsize=10,
ha="left",
va="bottom",
zorder=6
)
ax.add_patch(dot)
else:
logger.warning(
f"Source {s.name} is above slice height z."
"It will not show on the plot."
)
def _draw_path_direction_arrow(self, ax, path) -> Axes:
inset_ax = ax.inset_axes([0.8, 0.1, 0.15, 0.15])
x_start, y_start = path.x_list[0], path.y_list[0]
x_end, y_end = path.x_list[1], path.y_list[1]
dx = x_end - x_start
dy = y_end - y_start
if path.opposite_direction:
dx = -dx
dy = -dy
length = 10
dx_norm = dx / (dx**2 + dy**2)**0.5 * length
dy_norm = dy / (dx**2 + dy**2)**0.5 * length
inset_ax.arrow(
0, 0,
dx_norm, dy_norm,
head_width=5, head_length=5,
fc='red', ec='red',
zorder=4, linewidth=1
)
inset_ax.set_xlim(-2*length, 2*length)
inset_ax.set_ylim(-2*length, 2*length)
inset_ax.set_title("Direction", fontsize=8)
inset_ax.set_xticks([])
inset_ax.set_yticks([])
return ax

View File

@ -0,0 +1,121 @@
from matplotlib import pyplot as plt
from matplotlib.gridspec import GridSpec
from .landscape_plotter import LandscapeSlicePlotter
from pg_rad.simulator.outputs import SimulationOutput
from pg_rad.landscape.landscape import Landscape
class ResultPlotter:
def __init__(self, landscape: Landscape, output: SimulationOutput):
self.landscape = landscape
self.count_rate_res = output.count_rate
self.source_res = output.sources
def plot(self, landscape_z: float = 0):
fig = plt.figure(figsize=(12, 10), constrained_layout=True)
fig.suptitle(self.landscape.name)
gs = GridSpec(
4,
2,
width_ratios=[0.5, 0.5],
height_ratios=[0.7, 0.1, 0.1, 0.1],
hspace=0.2)
ax1 = fig.add_subplot(gs[0, 0])
self._draw_count_rate(ax1)
ax2 = fig.add_subplot(gs[0, 1])
self._plot_landscape(ax2, landscape_z)
ax3 = fig.add_subplot(gs[1, :])
self._draw_table(ax3)
ax4 = fig.add_subplot(gs[2, :])
self._draw_detector_table(ax4)
ax5 = fig.add_subplot(gs[3, :])
self._draw_source_table(ax5)
plt.show()
def _plot_landscape(self, ax, z):
lp = LandscapeSlicePlotter()
ax = lp.plot(landscape=self.landscape, z=z, ax=ax, show=False)
return ax
def _draw_count_rate(self, ax):
x = self.count_rate_res.arc_length
y = self.count_rate_res.count_rate
ax.plot(x, y, label='Count rate', color='r')
ax.set_title('Count rate')
ax.set_xlabel('Arc length s [m]')
ax.set_ylabel('Counts')
ax.legend()
def _draw_table(self, ax):
ax.set_axis_off()
ax.set_title('Simulation parameters')
cols = ('Parameter', 'Value')
data = [
["Air density (kg/m^3)", round(self.landscape.air_density, 3)],
["Total path length (m)", round(self.landscape.path.length, 3)],
]
ax.table(
cellText=data,
colLabels=cols,
loc='center'
)
return ax
def _draw_detector_table(self, ax):
det = self.landscape.detector
print(det)
ax.set_axis_off()
ax.set_title('Detector')
cols = ('Parameter', 'Value')
data = [
["Name", det.name],
["Type", det.efficiency],
]
ax.table(
cellText=data,
colLabels=cols,
loc='center'
)
return ax
def _draw_source_table(self, ax):
ax.set_axis_off()
ax.set_title('Point sources')
cols = (
'Name',
'Isotope',
'Activity (MBq)',
'Position (m)',
'Dist. to path (m)'
)
# this formats position to tuple
data = [
[
s.name,
s.isotope,
s.activity,
"("+", ".join(f"{val:.2f}" for val in s.position)+")",
round(s.dist_from_path, 2)
]
for s in self.source_res
]
ax.table(
cellText=data,
colLabels=cols,
loc='center'
)
return ax

View File

View File

@ -0,0 +1,70 @@
from typing import List
from pg_rad.landscape.landscape import Landscape
from pg_rad.simulator.outputs import (
CountRateOutput,
SimulationOutput,
SourceOutput
)
from pg_rad.detector.detectors import IsotropicDetector, AngularDetector
from pg_rad.physics.fluence import calculate_counts_along_path
from pg_rad.utils.projection import minimal_distance_to_path
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
class SimulationEngine:
"""Takes a fully built landscape and produces results."""
def __init__(
self,
landscape: Landscape,
detector: IsotropicDetector | AngularDetector,
runtime_spec: RuntimeSpec,
sim_spec: SimulationOptionsSpec,
):
self.landscape = landscape
self.detector = detector
self.runtime_spec = runtime_spec
self.sim_spec = sim_spec
def simulate(self) -> SimulationOutput:
"""Compute everything and return structured output."""
count_rate_results = self._calculate_count_rate_along_path()
source_results = self._calculate_point_source_distance_to_path()
return SimulationOutput(
name=self.landscape.name,
count_rate=count_rate_results,
sources=source_results
)
def _calculate_count_rate_along_path(self) -> CountRateOutput:
arc_length, phi = calculate_counts_along_path(
self.landscape,
self.detector,
acquisition_time=self.runtime_spec.acquisition_time
)
return CountRateOutput(arc_length, phi)
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:
path = self.landscape.path
source_output = []
for s in self.landscape.point_sources:
dist_to_path = minimal_distance_to_path(
path.x_list,
path.y_list,
path.z,
s.pos)
source_output.append(
SourceOutput(
s.name,
s.isotope.name,
s.activity,
s.pos,
dist_to_path)
)
return source_output

View File

@ -0,0 +1,25 @@
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class CountRateOutput:
arc_length: List[float]
count_rate: List[float]
@dataclass
class SourceOutput:
name: str
isotope: str
activity: float
position: Tuple[float, float, float]
dist_from_path: float
@dataclass
class SimulationOutput:
name: str
count_rate: CountRateOutput
sources: List[SourceOutput]

View File

@ -1,43 +0,0 @@
from pg_rad.objects import Object
from pg_rad.isotope import Isotope
class PointSource(Object):
_id_counter = 1
def __init__(
self,
x: float,
y: float,
z: float,
activity: int,
isotope: Isotope,
name: str | None = None,
color: str = "red"):
"""A point source.
Args:
x (float): X coordinate.
y (float): Y coordinate.
z (float): Z coordinate.
activity (int): Activity A in MBq.
isotope (Isotope): The isotope.
name (str | None, optional): Can give the source a unique name.
Defaults to None, making the name sequential.
(Source-1, Source-2, etc.).
color (str, optional): Matplotlib compatible color string. Defaults to "red".
"""
self.id = PointSource._id_counter
PointSource._id_counter += 1
# default name derived from ID if not provided
if name is None:
name = f"Source {self.id}"
super().__init__(x, y, z, name, color)
self.activity = activity
self.isotope = isotope
self.color = color
def __repr__(self):
return f"PointSource(name={self.name}, pos={(self.x, self.y, self.z)}, isotope={self.isotope.name}, A={self.activity} MBq)"

View File

View File

@ -0,0 +1,168 @@
from typing import List, Tuple
import numpy as np
from pg_rad.exceptions.exceptions import OutOfBoundsError
def rel_to_abs_source_position(
x_list: List,
y_list: List,
path_z: int | float,
along_path: int | float,
side: str,
dist_from_path: int | float,
z_rel: int | float = 0
):
path = np.column_stack((x_list, y_list))
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
arc_length = np.cumsum(segment_lengths)
arc_length = np.insert(arc_length, 0, 0)
# find index of the segment where the source should be placed
s_i = np.searchsorted(arc_length, along_path, side='right') - 1
if not 0 <= s_i < len(segments):
raise OutOfBoundsError(
"along_path must be less than the length of the path!"
)
# fraction of segment length where to place the point source
segment_fraction = (
(along_path - arc_length[s_i])
/ segment_lengths[s_i]
)
# tangential vector at point where point source to be placed
point_on_path = path[s_i] + segment_fraction * segments[s_i]
tangent_vec = segments[s_i] / segment_lengths[s_i]
if side not in {"left", "right"}:
raise ValueError("side must be 'left' or 'right'")
orientation = 1 if side == "left" else -1
perp_vec = orientation * np.array([-tangent_vec[1], tangent_vec[0]])
x_abs, y_abs = point_on_path + dist_from_path * perp_vec
z_abs = path_z + z_rel
return x_abs, y_abs, z_abs
def minimal_distance_to_path(
x_list: List[float],
y_list: List[float],
path_z: float,
pos: Tuple[float, float, float]
) -> float:
"""
Compute minimal Euclidean distance from pos (x,y,z)
to interpolated polyline path at height path_z.
"""
x, y, z = pos
path = np.column_stack((x_list, y_list))
point_xy = np.array([x, y])
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
min_dist = np.inf
for p0, seg, seg_len in zip(path[:-1], segments, segment_lengths):
if seg_len == 0:
continue
t = np.dot(point_xy - p0, seg) / (seg_len ** 2)
t = np.clip(t, 0.0, 1.0)
projection_xy = p0 + t * seg
dx, dy = point_xy - projection_xy
dz = z - path_z
dist = np.sqrt(dx**2 + dy**2 + dz**2)
if dist < min_dist:
min_dist = dist
return min_dist
def abs_to_rel_source_position(
x_list: List,
y_list: List,
path_z: int | float,
pos: Tuple[float, float, float],
) -> Tuple[float, float, str, float]:
"""
Convert absolute (x,y,z) position into:
along_path,
dist_from_path,
side ("left" or "right"),
z_rel
"""
x, y, z = pos
path = np.column_stack((x_list, y_list))
point = np.array([x, y])
segments = np.diff(path, axis=0)
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
arc_length = np.cumsum(segment_lengths)
arc_length = np.insert(arc_length, 0, 0)
min_dist = np.inf
best_projection = None
best_s_i = None
best_fraction = None
best_signed_dist = None
for (i, (p0, seg, seg_len)) in enumerate(
zip(path[:-1], segments, segment_lengths)
):
if seg_len == 0:
continue
# project point onto infinite line
t = np.dot(point - p0, seg) / (seg_len ** 2)
# clamp to segment
t_clamped = np.clip(t, 0.0, 1.0)
projection = p0 + t_clamped * seg
diff_vec = point - projection
dist = np.linalg.norm(diff_vec)
if dist < min_dist:
min_dist = dist
best_projection = projection
best_s_i = i
best_fraction = t_clamped
# tangent and perpendicular
tangent_vec = seg / seg_len
perp_vec = np.array([-tangent_vec[1], tangent_vec[0]])
signed_dist = np.dot(diff_vec, perp_vec)
best_signed_dist = signed_dist
if best_projection is None:
raise ValueError("Could not project point onto path.")
# Compute along_path
along_path = (
arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
)
# Side and distance
side = "left" if best_signed_dist > 0 else "right"
dist_from_path = abs(best_signed_dist)
# z relative
z_rel = z - path_z
return along_path, dist_from_path, side, z_rel

0
src/road_gen/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,49 @@
import secrets
import numpy as np
class BaseRoadGenerator:
"""A base generator object for generating a road of a specified length."""
def __init__(
self,
ds: int | float,
velocity: int | float,
mu: float = 0.7,
g: float = 9.81,
seed: int | None = None
):
"""Initialize a BaseGenerator with a given or random seed.
Args:
ds (int | float): The step size in meters.
velocity (int | float): Velocity in meters per second.
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
seed (int | None, optional): Set a seed for the generator.
Defaults to a random seed.
"""
if seed is None:
seed = secrets.randbits(32)
if not isinstance(seed, int):
raise TypeError("seed must be an integer or None.")
if not isinstance(ds, int | float):
raise TypeError("Step size must be integer or float in meters.")
if not isinstance(velocity, int | float):
raise TypeError(
"Velocity must be integer or float in meters per second."
)
self.ds = ds
self.velocity = velocity
self.min_radius = (velocity ** 2) / (g * mu)
self.seed = seed
self._rng = np.random.default_rng(seed)
def generate(self):
pass

View File

@ -0,0 +1,144 @@
import logging
from typing import Tuple
import numpy as np
from .base_road_generator import BaseRoadGenerator
from road_gen.prefabs import prefabs
from road_gen.integrator.integrator import integrate_road
from pg_rad.configs import defaults
logger = logging.getLogger(__name__)
class SegmentedRoadGenerator(BaseRoadGenerator):
def __init__(
self,
ds: int | float,
velocity: int | float,
mu: float = 0.7,
g: float = 9.81,
seed: int | None = None
):
"""Initialize a SegmentedRoadGenerator with given or random seed.
Args:
ds (int | float): The step size in meters.
velocity (int | float): Velocity in meters per second.
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
seed (int | None, optional): Set a seed for the generator.
Defaults to random seed.
"""
super().__init__(ds, velocity, mu, g, seed)
def generate(
self,
segments: list[str],
lengths: list[int | float],
angles: list[float | None],
alpha: float = defaults.DEFAULT_ALPHA,
min_turn_angle: float = defaults.DEFAULT_MIN_TURN_ANGLE,
max_turn_angle: float = defaults.DEFAULT_MAX_TURN_ANGLE
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate a curvature profile from a list of segments.
Args:
segments (list[str]): List of segments.
lengths (list[int | float]): List of segment lengths.
angles (list[float | None]): List of angles.
alpha (float, optional): Dirichlet concentration parameter.
A higher value leads to more uniform apportionment of the
length amongst the segments, while a lower value allows more
random apportionment. Defaults to 1.0.
min_turn_angle (float, optional): Minimum turn angle in degrees for
random sampling of turn radius. Does nothing if `angle_list` is
provided or no `turn_*` segement is specified in the `segments`
list.
min_turn_angle (float, optional): Maximum turn angle in degrees for
random sampling of turn radius. Does nothing if `angle_list` is
provided or no `turn_*` segement is specified in the `segments`
list.
Raises:
ValueError: Raised when a turn
is too tight given its segment length and the velocity.
To fix this, you can try to reduce the amount of segments or
increase length. Increasing alpha
(Dirichlet concentration parameter) can also help because this
reduces the odds of very small lengths being assigned to
turn segments.
Returns:
Tuple[np.ndarray, np.ndarray]: x and y coordinates of the
waypoints describing the random road.
"""
existing_prefabs = prefabs.PREFABS.keys()
if not all(segment in existing_prefabs for segment in segments):
raise ValueError(
"Invalid segment type provided. Available choices"
f"{existing_prefabs}"
)
self.segments = segments
self.alpha = alpha
total_length = sum(lengths)
num_points = np.ceil(total_length / self.ds).astype(int)
# divide num_points into len(segments) randomly sized parts.
if len(lengths) == len(segments):
parts = np.array([seg_len / total_length for seg_len in lengths])
else:
parts = self._rng.dirichlet(
np.full(len(segments), alpha),
size=1)[0]
parts = parts * num_points
parts = np.round(parts).astype(int)
# correct round off so the sum of parts is still total length L.
if sum(parts) != num_points:
parts[0] += num_points - sum(parts)
curvature = np.zeros(num_points)
current_index = 0
for seg_name, seg_length, seg_angle in zip(segments, parts, angles):
seg_function = prefabs.PREFABS[seg_name]
if seg_name == 'straight':
curvature_s = seg_function(seg_length)
else:
R_min_angle = seg_length / np.deg2rad(max_turn_angle)
R_max_angle = seg_length / np.deg2rad(min_turn_angle)
# physics limit
R_min = max(self.min_radius, R_min_angle)
if R_min > R_max_angle:
raise ValueError(
f"{seg_name} with length {seg_length} does not have "
"a possible radius. The minimum for the provided "
"velocity and friction coefficient is "
f"{self.min_radius}, but the possible range is "
f"({R_min}, {R_max_angle})"
)
if seg_angle:
radius = seg_length / np.deg2rad(seg_angle)
else:
radius = self._rng.uniform(R_min, R_max_angle)
if seg_name.startswith("u_turn"):
curvature_s = seg_function(radius)
else:
curvature_s = seg_function(seg_length, radius)
curvature[current_index:(current_index + seg_length)] = curvature_s
current_index += seg_length
x, y = integrate_road(curvature)
return x * self.ds, y * self.ds

View File

View File

@ -0,0 +1,21 @@
import numpy as np
def integrate_road(curvature: np.ndarray, ds: float = 1.0):
"""Integrate along the curvature field to obtain X and Y coordinates.
Args:
curvature (np.ndarray): The curvature field.
ds (float, optional): _description_. Defaults to 1.0.
Returns:
x, y (np.ndarray): X and Y waypoints.
"""
theta = np.zeros(len(curvature))
theta[1:] = np.cumsum(curvature[:-1] * ds)
x = np.cumsum(np.cos(theta) * ds)
y = np.cumsum(np.sin(theta) * ds)
return x, y

View File

View File

@ -0,0 +1,20 @@
import numpy as np
def straight(length: int) -> np.ndarray:
return np.zeros(length)
def turn_left(length: int, radius: float) -> np.ndarray:
return np.full(length, 1.0 / radius)
def turn_right(length: int, radius: float) -> np.ndarray:
return -turn_left(length, radius)
PREFABS = {
'straight': straight,
'turn_left': turn_left,
'turn_right': turn_right,
}

View File

@ -0,0 +1,29 @@
import pytest
from pg_rad.physics import get_mass_attenuation_coeff
@pytest.mark.parametrize("energy,mu", [
(1.00000E-03, 3.606E+03),
(1.00000E-02, 5.120E+00),
(1.00000E-01, 1.541E-01),
(1.00000E+00, 6.358E-02),
(1.00000E+01, 2.045E-02)
])
def test_exact_attenuation_retrieval(energy, mu):
"""
Test if retrieval of values that are exactly in the table is correct.
"""
func_mu = get_mass_attenuation_coeff(energy)
assert pytest.approx(func_mu, rel=1E-6) == mu
@pytest.mark.parametrize("energy,mu", [
(0.662, 0.0778)
])
def test_attenuation_interpolation(energy, mu):
"""
Test retreival for Cs-137 mass attenuation coefficient.
"""
interp_mu = get_mass_attenuation_coeff(energy)
assert pytest.approx(interp_mu, rel=1E-2) == mu

View File

@ -0,0 +1,66 @@
import numpy as np
import pytest
from pg_rad.inputparser.parser import ConfigParser
from pg_rad.landscape.director import LandscapeDirector
from pg_rad.physics import calculate_fluence_at
@pytest.fixture
def isotropic_detector():
from pg_rad.detector.detectors import IsotropicDetector
return IsotropicDetector(name="test_detector", eff=1.0)
@pytest.fixture
def phi_ref(test_landscape, isotropic_detector):
source = test_landscape.point_sources[0]
r = np.linalg.norm(np.array([10, 10, 0]) - np.array(source.pos))
A = source.activity * 1E6
b = source.isotope.b
mu_air = source.isotope.mu_mass_air * test_landscape.air_density
mu_air *= 0.1
eff = isotropic_detector.get_efficiency(source.isotope.E)
return A * eff * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
@pytest.fixture
def test_landscape():
test_yaml = """
name: Test landscape
speed: 8.33
acquisition_time: 1
path:
length: 1000
segments:
- straight
sources:
test_source:
activity_MBq: 100
position: [0, 0, 0]
isotope: CS137
detector:
name: dummy
is_isotropic: True
"""
cp = ConfigParser(test_yaml).parse()
landscape = LandscapeDirector.build_from_config(cp)
return landscape
def test_single_source_fluence(phi_ref, test_landscape, isotropic_detector):
phi = calculate_fluence_at(
test_landscape,
np.array([10, 10, 0]),
isotropic_detector,
tangent_vectors=None,
)
assert pytest.approx(phi[0], rel=1E-3) == phi_ref

View File

@ -1,47 +0,0 @@
import pathlib
import numpy as np
import pytest
from pg_rad.dataloader import load_data
from pg_rad.path import Path, path_from_RT90
@pytest.fixture
def test_df():
csv_path = pathlib.Path(__file__).parent / "data/coordinates.csv"
return load_data(csv_path)
def test_piecewise_regression(test_df):
"""_Verify whether the intermediate points deviate less than 0.1 SD._"""
p_full = path_from_RT90(
test_df,
east_col="East",
north_col="North",
simplify_path=False
)
p_simpl = path_from_RT90(
test_df,
east_col="East",
north_col="North",
simplify_path=True
)
x_f = np.array(p_full.x_list)
y_f = np.array(p_full.y_list)
x_s = np.array(p_simpl.x_list)
y_s = np.array(p_simpl.y_list)
sd = np.std(y_f)
for xb, yb in zip(x_s[1:-1], y_s[1:-1]):
# find nearest original x index
idx = np.argmin(np.abs(x_f - xb))
deviation = abs(yb - y_f[idx])
assert deviation < 0.1 * sd, (
f"Breakpoint deviation too large: {deviation:.4f} "
f"(threshold {0.1 * sd:.4f}) at x={xb:.2f}"
)

View File

@ -1,28 +1,35 @@
import numpy as np import numpy as np
import pytest import pytest
from pg_rad.objects import Source from pg_rad.objects import PointSource
@pytest.fixture @pytest.fixture
def test_sources(): def test_sources():
iso = "CS137"
pos_a = np.random.rand(3) pos_a = np.random.rand(3)
pos_b = np.random.rand(3) pos_b = np.random.rand(3)
a = Source(*tuple(pos_a), strength = None) a = PointSource(position=pos_a, activity_MBq=None, isotope=iso)
b = Source(*tuple(pos_b), strength = None) b = PointSource(position=pos_b, activity_MBq=None, isotope=iso)
return pos_a, pos_b, a, b return pos_a, pos_b, a, b
def test_if_distances_equal(test_sources): def test_if_distances_equal(test_sources):
"""_Verify whether from object A to object B is the same as B to A._""" """
Verify whether distance from PointSource A to B is the same as B to A.
"""
_, _, a, b = test_sources _, _, a, b = test_sources
assert a.distance_to(b) == b.distance_to(a) assert a.distance_to(b) == b.distance_to(a)
def test_distance_calculation(test_sources): def test_distance_calculation(test_sources):
"""_Verify whether distance between two static objects (e.g. sources) """
is calculated correctly._""" Verify whether distance between PointSources is calculated correctly.
"""
pos_a, pos_b, a, b = test_sources pos_a, pos_b, a, b = test_sources
@ -33,4 +40,4 @@ def test_distance_calculation(test_sources):
assert np.isclose( assert np.isclose(
a.distance_to(b), a.distance_to(b),
np.sqrt(dx**2 + dy**2 + dz**2), np.sqrt(dx**2 + dy**2 + dz**2),
rtol = 1e-12) rtol=1e-12)