mirror of
https://github.com/pim-n/pg-rad
synced 2026-03-23 21:58:12 +01:00
Compare commits
97 Commits
main
...
92789c718f
| Author | SHA1 | Date | |
|---|---|---|---|
| 92789c718f | |||
| 176baa543a | |||
| dba1240e9f | |||
| 9a169da520 | |||
| 5c9841190f | |||
| 561fb1dca1 | |||
| 39572da682 | |||
| a74ea765d7 | |||
| 58de830a39 | |||
| ae0a038948 | |||
| 9944c06466 | |||
| 5615914c7e | |||
| e926338b69 | |||
| bab41c128b | |||
| c18f924348 | |||
| f1bed93ca7 | |||
| 3a92f79a43 | |||
| 80f7b71c38 | |||
| 61dc05073a | |||
| cca514a2ba | |||
| 265d3b0111 | |||
| 8f652875dc | |||
| 347ff4c102 | |||
| 5fc806bd39 | |||
| 0611b943a8 | |||
| d53f7c5e2f | |||
| fdc11b4076 | |||
| 2d5ba0ac3c | |||
| 81d2cd6f9e | |||
| d32fd411bb | |||
| 191c92e176 | |||
| 8c0eeb3127 | |||
| db86ee42b2 | |||
| 79ba5f7c83 | |||
| 66c0b0c6cf | |||
| c2b05c63a8 | |||
| 5684525d0f | |||
| 3fd2eafb2a | |||
| f49b2a5f8a | |||
| 508cad474b | |||
| beb411d456 | |||
| 0db1fe0626 | |||
| 845f006cd3 | |||
| ac8c38592d | |||
| a4fb4a7c57 | |||
| e8bf687563 | |||
| 49a0dcd301 | |||
| 26f96b06fe | |||
| 55258d7727 | |||
| 82331f3bbd | |||
| abc1195c91 | |||
| a95cca26d9 | |||
| 8274b5e371 | |||
| 4f72fe8ff4 | |||
| 9b0c77d254 | |||
| 225287c46a | |||
| 6ceffb4361 | |||
| 08299724e1 | |||
| 3aff764075 | |||
| 05a71c31a8 | |||
| f5cc5218e6 | |||
| 3f7395ed70 | |||
| d9e3f2a209 | |||
| 0971c2bab9 | |||
| 9c1b97d912 | |||
| a1acf95004 | |||
| 016ea6b783 | |||
| 2b85a07aa0 | |||
| d6d9fa6f92 | |||
| 521e5a556e | |||
| 0c81b4df89 | |||
| 2c436c52ad | |||
| d4b67be775 | |||
| c2ddc5bfe2 | |||
| 08a056a32d | |||
| f37db46037 | |||
| c23ea40ec6 | |||
| dcc3be1c22 | |||
| 52b2eaaeb5 | |||
| ead96eb723 | |||
| f5a126b927 | |||
| c1b827c871 | |||
| 85f80ace97 | |||
| a4e965c9d6 | |||
| 15b7e7e65e | |||
| db6f859a60 | |||
| 14e49e63aa | |||
| 2551f854d6 | |||
| caec70b39b | |||
| 21ea25a3d8 | |||
| d7c670d344 | |||
| 85f306a469 | |||
| c459b732bb | |||
| 4632fe35c9 | |||
| 7cfbbb8792 | |||
| 7290e241a2 | |||
| 38318ad822 |
2
.github/workflows/ci-docs.yml
vendored
2
.github/workflows/ci-docs.yml
vendored
@ -17,7 +17,7 @@ jobs:
|
|||||||
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
|
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
|
||||||
- uses: actions/setup-python@v5
|
- uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: 3.x
|
python-version: 3.12.9
|
||||||
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
|
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
|
||||||
- uses: actions/cache@v4
|
- uses: actions/cache@v4
|
||||||
with:
|
with:
|
||||||
|
|||||||
33
.github/workflows/ci-tests.yml
vendored
Normal file
33
.github/workflows/ci-tests.yml
vendored
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
name: Tests
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches: ["main", "dev"]
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
tests:
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
os: [ ubuntu-latest, windows-latest ]
|
||||||
|
python-version: [ '3.12' ]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
- name: Set up Python
|
||||||
|
uses: actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: ${{ matrix.python-version }}
|
||||||
|
cache: pip
|
||||||
|
- name: Install Dependencies
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -e .[dev]
|
||||||
|
- name: Run linting
|
||||||
|
run: |
|
||||||
|
flake8 ./src/
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: |
|
||||||
|
pytest
|
||||||
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,3 +1,6 @@
|
|||||||
|
# Custom
|
||||||
|
dev-tools/
|
||||||
|
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
__pycache__/
|
__pycache__/
|
||||||
*.py[codz]
|
*.py[codz]
|
||||||
@ -173,7 +176,7 @@ cython_debug/
|
|||||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
#.idea/
|
.idea/
|
||||||
|
|
||||||
# Abstra
|
# Abstra
|
||||||
# Abstra is an AI-powered process automation framework.
|
# Abstra is an AI-powered process automation framework.
|
||||||
|
|||||||
30
README.md
30
README.md
@ -1,3 +1,5 @@
|
|||||||
|
[](https://www.python.org/downloads/release/python-312/)
|
||||||
|
[](https://github.com/pim-n/pg-rad/actions/workflows/ci-tests.yml)
|
||||||
# pg-rad
|
# pg-rad
|
||||||
Primary Gamma RADiation landscape - Development
|
Primary Gamma RADiation landscape - Development
|
||||||
|
|
||||||
@ -23,13 +25,35 @@ With Python verion `>=3.12.4` and `<3.13`, create a virtual environment and inst
|
|||||||
```
|
```
|
||||||
python3 -m venv .venv
|
python3 -m venv .venv
|
||||||
source .venv/bin/activate
|
source .venv/bin/activate
|
||||||
(venv) pip install -e .[dev]
|
```
|
||||||
|
|
||||||
|
With the virtual environment activated, run:
|
||||||
|
|
||||||
|
```
|
||||||
|
pip install -e .[dev]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running example landscape
|
||||||
|
|
||||||
|
The example landscape can be generated using the command-line interface. Still in the virtual environment, run
|
||||||
|
|
||||||
|
```
|
||||||
|
pgrad --test --loglevel DEBUG
|
||||||
```
|
```
|
||||||
|
|
||||||
## Tests
|
## Tests
|
||||||
|
|
||||||
Tests can be run with `pytest` from the root directory of the repository.
|
Tests can be run with `pytest` from the root directory of the repository. With the virtual environment activated, run:
|
||||||
|
|
||||||
```
|
```
|
||||||
(venv) pytest
|
pytest
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Local viewing of documentation
|
||||||
|
|
||||||
|
PG-RAD uses [Material for MkDocs](https://squidfunk.github.io/mkdocs-material/) for generating documentation. It can be locally viewed by (in the venv) running:
|
||||||
|
```
|
||||||
|
mkdocs serve
|
||||||
|
```
|
||||||
|
|
||||||
|
where you can add the `--livereload` flag to automatically update the documentation as you write to the Markdown files.
|
||||||
|
|||||||
253
demo/demo.ipynb
253
demo/demo.ipynb
File diff suppressed because one or more lines are too long
18
docs/javascripts/mathjax.js
Normal file
18
docs/javascripts/mathjax.js
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
window.MathJax = {
|
||||||
|
tex: {
|
||||||
|
inlineMath: [['$', '$'], ["\\(", "\\)"]],
|
||||||
|
displayMath: [['$$', '$$'], ["\\[", "\\]"]],
|
||||||
|
processEscapes: true,
|
||||||
|
processEnvironments: true
|
||||||
|
},
|
||||||
|
options: {
|
||||||
|
processHtmlClass: "arithmatex"
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
document$.subscribe(() => {
|
||||||
|
MathJax.startup.output.clearCache()
|
||||||
|
MathJax.typesetClear()
|
||||||
|
MathJax.texReset()
|
||||||
|
MathJax.typesetPromise()
|
||||||
|
})
|
||||||
272
docs/pg-rad-in-python.ipynb
Normal file
272
docs/pg-rad-in-python.ipynb
Normal file
File diff suppressed because one or more lines are too long
@ -1,5 +0,0 @@
|
|||||||
---
|
|
||||||
title: Using PG-RAD as a module
|
|
||||||
---
|
|
||||||
|
|
||||||
Consult the API documentation in the side bar.
|
|
||||||
@ -28,8 +28,16 @@ markdown_extensions:
|
|||||||
- pymdownx.inlinehilite
|
- pymdownx.inlinehilite
|
||||||
- pymdownx.snippets
|
- pymdownx.snippets
|
||||||
- pymdownx.superfences
|
- pymdownx.superfences
|
||||||
|
- pymdownx.arithmatex:
|
||||||
|
generic: true
|
||||||
|
|
||||||
|
extra_javascript:
|
||||||
|
- javascripts/mathjax.js
|
||||||
|
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js
|
||||||
|
|
||||||
plugins:
|
plugins:
|
||||||
|
- mkdocs-jupyter:
|
||||||
|
execute: false
|
||||||
- mkdocstrings:
|
- mkdocstrings:
|
||||||
enabled: !ENV [ENABLE_MKDOCSTRINGS, true]
|
enabled: !ENV [ENABLE_MKDOCSTRINGS, true]
|
||||||
default_handler: python
|
default_handler: python
|
||||||
|
|||||||
@ -5,9 +5,13 @@ build-backend = "setuptools.build_meta"
|
|||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
where = ["src"]
|
where = ["src"]
|
||||||
|
|
||||||
|
[tool.setuptools.package-data]
|
||||||
|
"pg_rad.data" = ["*.csv"]
|
||||||
|
"pg_rad.configs" = ["*.yml"]
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "pg-rad"
|
name = "pg-rad"
|
||||||
version = "0.2.0"
|
version = "0.2.1"
|
||||||
authors = [
|
authors = [
|
||||||
{ name="Pim Nelissen", email="pi0274ne-s@student.lu.se" },
|
{ name="Pim Nelissen", email="pi0274ne-s@student.lu.se" },
|
||||||
]
|
]
|
||||||
@ -18,15 +22,19 @@ dependencies = [
|
|||||||
"matplotlib>=3.9.2",
|
"matplotlib>=3.9.2",
|
||||||
"numpy>=2",
|
"numpy>=2",
|
||||||
"pandas>=2.3.1",
|
"pandas>=2.3.1",
|
||||||
"piecewise_regression==1.5.0",
|
"pyyaml>=6.0.2",
|
||||||
"pyyaml>=6.0.2"
|
"scipy>=1.15.0"
|
||||||
]
|
]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
license-files = ["LICEN[CS]E*"]
|
license-files = ["LICEN[CS]E*"]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
pgrad = "pg_rad.main:main"
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
Homepage = "https://github.com/pim-n/pg-rad"
|
Homepage = "https://github.com/pim-n/pg-rad"
|
||||||
Issues = "https://github.com/pim-n/pg-rad/issues"
|
Issues = "https://github.com/pim-n/pg-rad/issues"
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
dev = ["pytest", "notebook", "mkdocs-material", "mkdocstrings-python"]
|
|
||||||
|
dev = ["pytest", "mkinit", "notebook", "mkdocs-material", "mkdocstrings-python", "mkdocs-jupyter", "flake8"]
|
||||||
1
src/pg_rad/configs/__init__.py
Normal file
1
src/pg_rad/configs/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
__all__ = []
|
||||||
18
src/pg_rad/configs/defaults.py
Normal file
18
src/pg_rad/configs/defaults.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
# --- Physics defaults ---
|
||||||
|
DEFAULT_AIR_DENSITY = 1.243 # kg/m^3, Skåne
|
||||||
|
|
||||||
|
# --- Simulation defaults ---
|
||||||
|
DEFAULT_SEED = None
|
||||||
|
DEFAULT_ACQUISITION_TIME = 1.0
|
||||||
|
|
||||||
|
# --- Source defaults ---
|
||||||
|
DEFAULT_PATH_HEIGHT = 0.0
|
||||||
|
DEFAULT_SOURCE_HEIGHT = 0.0
|
||||||
|
|
||||||
|
# --- Segmented road defaults ---
|
||||||
|
DEFAULT_MIN_TURN_ANGLE = 30.
|
||||||
|
DEFAULT_MAX_TURN_ANGLE = 90.
|
||||||
|
|
||||||
|
DEFAULT_FRICTION_COEFF = 0.7 # dry asphalt
|
||||||
|
DEFAULT_GRAVITATIONAL_ACC = 9.81 # m/s^2
|
||||||
|
DEFAULT_ALPHA = 100.
|
||||||
3
src/pg_rad/configs/filepaths.py
Normal file
3
src/pg_rad/configs/filepaths.py
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
ATTENUATION_TABLE = 'attenuation_table.csv'
|
||||||
|
TEST_EXP_DATA = 'test_path_coords.csv'
|
||||||
|
LOGGING_CONFIG = 'logging.yml'
|
||||||
1
src/pg_rad/data/__init__.py
Normal file
1
src/pg_rad/data/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
__all__ = []
|
||||||
39
src/pg_rad/data/attenuation_table.csv
Normal file
39
src/pg_rad/data/attenuation_table.csv
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
energy_mev,mu
|
||||||
|
1.00000E-03,3.606E+03
|
||||||
|
1.50000E-03,1.191E+03
|
||||||
|
2.00000E-03,5.279E+02
|
||||||
|
3.00000E-03,1.625E+02
|
||||||
|
3.20290E-03,1.340E+02
|
||||||
|
3.20290E-03,1.485E+02
|
||||||
|
4.00000E-03,7.788E+01
|
||||||
|
5.00000E-03,4.027E+01
|
||||||
|
6.00000E-03,2.341E+01
|
||||||
|
8.00000E-03,9.921E+00
|
||||||
|
1.00000E-02,5.120E+00
|
||||||
|
1.50000E-02,1.614E+00
|
||||||
|
2.00000E-02,7.779E-01
|
||||||
|
3.00000E-02,3.538E-01
|
||||||
|
4.00000E-02,2.485E-01
|
||||||
|
5.00000E-02,2.080E-01
|
||||||
|
6.00000E-02,1.875E-01
|
||||||
|
8.00000E-02,1.662E-01
|
||||||
|
1.00000E-01,1.541E-01
|
||||||
|
1.50000E-01,1.356E-01
|
||||||
|
2.00000E-01,1.233E-01
|
||||||
|
3.00000E-01,1.067E-01
|
||||||
|
4.00000E-01,9.549E-02
|
||||||
|
5.00000E-01,8.712E-02
|
||||||
|
6.00000E-01,8.055E-02
|
||||||
|
8.00000E-01,7.074E-02
|
||||||
|
1.00000E+00,6.358E-02
|
||||||
|
1.25000E+00,5.687E-02
|
||||||
|
1.50000E+00,5.175E-02
|
||||||
|
2.00000E+00,4.447E-02
|
||||||
|
3.00000E+00,3.581E-02
|
||||||
|
4.00000E+00,3.079E-02
|
||||||
|
5.00000E+00,2.751E-02
|
||||||
|
6.00000E+00,2.522E-02
|
||||||
|
8.00000E+00,2.225E-02
|
||||||
|
1.00000E+01,2.045E-02
|
||||||
|
1.50000E+01,1.810E-02
|
||||||
|
2.00000E+01,1.705E-02
|
||||||
|
@ -1,26 +0,0 @@
|
|||||||
import pandas as pd
|
|
||||||
|
|
||||||
from pg_rad.logger import setup_logger
|
|
||||||
from pg_rad.exceptions import DataLoadError, InvalidCSVError
|
|
||||||
|
|
||||||
logger = setup_logger(__name__)
|
|
||||||
|
|
||||||
def load_data(filename: str) -> pd.DataFrame:
|
|
||||||
logger.debug(f"Attempting to load data from {filename}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
df = pd.read_csv(filename, delimiter=',')
|
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
logger.error(f"File not found: {filename}")
|
|
||||||
raise DataLoadError(f"File does not exist: {filename}") from e
|
|
||||||
|
|
||||||
except pd.errors.ParserError as e:
|
|
||||||
logger.error(f"Invalid CSV format: {filename}")
|
|
||||||
raise InvalidCSVError(f"Invalid CSV file: {filename}") from e
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(f"Unexpected error while loading {filename}")
|
|
||||||
raise DataLoadError("Unexpected error while loading data") from e
|
|
||||||
|
|
||||||
return df
|
|
||||||
8
src/pg_rad/dataloader/__init__.py
Normal file
8
src/pg_rad/dataloader/__init__.py
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
|
from pg_rad.dataloader import dataloader
|
||||||
|
|
||||||
|
from pg_rad.dataloader.dataloader import (load_data,)
|
||||||
|
|
||||||
|
__all__ = ['dataloader', 'load_data']
|
||||||
22
src/pg_rad/dataloader/dataloader.py
Normal file
22
src/pg_rad/dataloader/dataloader.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def load_data(filename: str) -> pd.DataFrame:
|
||||||
|
logger.debug(f"Attempting to load file: {filename}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
df = pd.read_csv(filename, delimiter=',')
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
logger.critical(e)
|
||||||
|
raise
|
||||||
|
except pd.errors.ParserError as e:
|
||||||
|
logger.critical(e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
logger.debug(f"File loaded: {filename}")
|
||||||
|
return df
|
||||||
@ -1,8 +0,0 @@
|
|||||||
class ConvergenceError(Exception):
|
|
||||||
"""Raised when an algorithm fails to converge."""
|
|
||||||
|
|
||||||
class DataLoadError(Exception):
|
|
||||||
"""Base class for data loading errors."""
|
|
||||||
|
|
||||||
class InvalidCSVError(DataLoadError):
|
|
||||||
"""Raised when a file is not a valid CSV."""
|
|
||||||
10
src/pg_rad/exceptions/__init__.py
Normal file
10
src/pg_rad/exceptions/__init__.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
|
from pg_rad.exceptions import exceptions
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import (ConvergenceError, DataLoadError,
|
||||||
|
InvalidCSVError, OutOfBoundsError,)
|
||||||
|
|
||||||
|
__all__ = ['ConvergenceError', 'DataLoadError', 'InvalidCSVError',
|
||||||
|
'OutOfBoundsError', 'exceptions']
|
||||||
33
src/pg_rad/exceptions/exceptions.py
Normal file
33
src/pg_rad/exceptions/exceptions.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
class ConvergenceError(Exception):
|
||||||
|
"""Raised when an algorithm fails to converge."""
|
||||||
|
|
||||||
|
|
||||||
|
class DataLoadError(Exception):
|
||||||
|
"""Base class for data loading errors."""
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidCSVError(DataLoadError):
|
||||||
|
"""Raised when a file is not a valid CSV."""
|
||||||
|
|
||||||
|
|
||||||
|
class OutOfBoundsError(Exception):
|
||||||
|
"""Raised when an object is attempted to be placed out of bounds."""
|
||||||
|
|
||||||
|
|
||||||
|
class MissingConfigKeyError(KeyError):
|
||||||
|
"""Raised when a (nested) config key is missing in the config."""
|
||||||
|
def __init__(self, key, subkey=None):
|
||||||
|
if subkey:
|
||||||
|
self.message = f"Missing key in {key}: {', '.join(list(subkey))}"
|
||||||
|
else:
|
||||||
|
self.message = f"Missing key: {key}"
|
||||||
|
|
||||||
|
super().__init__(self.message)
|
||||||
|
|
||||||
|
|
||||||
|
class DimensionError(ValueError):
|
||||||
|
"""Raised if dimensions or coordinates do not match the system."""
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidIsotopeError(ValueError):
|
||||||
|
"""Raised if attempting to load an isotope that is not valid."""
|
||||||
316
src/pg_rad/inputparser/parser.py
Normal file
316
src/pg_rad/inputparser/parser.py
Normal file
@ -0,0 +1,316 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, List, Union
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import MissingConfigKeyError, DimensionError
|
||||||
|
from pg_rad.configs import defaults
|
||||||
|
|
||||||
|
from .specs import (
|
||||||
|
MetadataSpec,
|
||||||
|
RuntimeSpec,
|
||||||
|
SimulationOptionsSpec,
|
||||||
|
SegmentSpec,
|
||||||
|
PathSpec,
|
||||||
|
ProceduralPathSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
SourceSpec,
|
||||||
|
AbsolutePointSourceSpec,
|
||||||
|
RelativePointSourceSpec,
|
||||||
|
SimulationSpec,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigParser:
|
||||||
|
_ALLOWED_ROOT_KEYS = {
|
||||||
|
"name",
|
||||||
|
"speed",
|
||||||
|
"acquisition_time",
|
||||||
|
"path",
|
||||||
|
"sources",
|
||||||
|
"options",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, config_source: str):
|
||||||
|
self.config = self._load_yaml(config_source)
|
||||||
|
|
||||||
|
def parse(self) -> SimulationSpec:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="global",
|
||||||
|
provided=set(self.config.keys()),
|
||||||
|
allowed=self._ALLOWED_ROOT_KEYS,
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata = self._parse_metadata()
|
||||||
|
runtime = self._parse_runtime()
|
||||||
|
options = self._parse_options()
|
||||||
|
path = self._parse_path()
|
||||||
|
sources = self._parse_point_sources()
|
||||||
|
|
||||||
|
return SimulationSpec(
|
||||||
|
metadata=metadata,
|
||||||
|
runtime=runtime,
|
||||||
|
options=options,
|
||||||
|
path=path,
|
||||||
|
point_sources=sources,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
|
||||||
|
if os.path.exists(config_source):
|
||||||
|
with open(config_source) as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
else:
|
||||||
|
data = yaml.safe_load(config_source)
|
||||||
|
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise ValueError(
|
||||||
|
"Provided path or string is not a valid YAML representation."
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _parse_metadata(self) -> MetadataSpec:
|
||||||
|
try:
|
||||||
|
return MetadataSpec(
|
||||||
|
name=self.config["name"]
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise MissingConfigKeyError("global", {"name"}) from e
|
||||||
|
|
||||||
|
def _parse_runtime(self) -> RuntimeSpec:
|
||||||
|
required = {"speed", "acquisition_time"}
|
||||||
|
missing = required - self.config.keys()
|
||||||
|
if missing:
|
||||||
|
raise MissingConfigKeyError("global", missing)
|
||||||
|
|
||||||
|
return RuntimeSpec(
|
||||||
|
speed=float(self.config["speed"]),
|
||||||
|
acquisition_time=float(self.config["acquisition_time"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_options(self) -> SimulationOptionsSpec:
|
||||||
|
options = self.config.get("options", {})
|
||||||
|
|
||||||
|
allowed = {"air_density", "seed"}
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="options",
|
||||||
|
provided=set(options.keys()),
|
||||||
|
allowed=allowed,
|
||||||
|
)
|
||||||
|
|
||||||
|
air_density = options.get("air_density", defaults.DEFAULT_AIR_DENSITY)
|
||||||
|
seed = options.get("seed")
|
||||||
|
|
||||||
|
if not isinstance(air_density, float) or air_density <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
"options.air_density must be a positive float in kg/m^3."
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
seed is not None or
|
||||||
|
(isinstance(seed, int) and seed <= 0)
|
||||||
|
):
|
||||||
|
raise ValueError("Seed must be a positive integer value.")
|
||||||
|
|
||||||
|
return SimulationOptionsSpec(
|
||||||
|
air_density=air_density,
|
||||||
|
seed=seed,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_path(self) -> PathSpec:
|
||||||
|
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
|
||||||
|
allowed_proc = {"segments", "length", "z"}
|
||||||
|
|
||||||
|
path = self.config.get("path")
|
||||||
|
if path is None:
|
||||||
|
raise MissingConfigKeyError("global", {"path"})
|
||||||
|
|
||||||
|
if not isinstance(path, dict):
|
||||||
|
raise ValueError("Path must be a dictionary.")
|
||||||
|
|
||||||
|
if "file" in path:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="path (csv)",
|
||||||
|
provided=set(path.keys()),
|
||||||
|
allowed=allowed_csv,
|
||||||
|
)
|
||||||
|
|
||||||
|
return CSVPathSpec(
|
||||||
|
file=path["file"],
|
||||||
|
east_col_name=path["east_col_name"],
|
||||||
|
north_col_name=path["north_col_name"],
|
||||||
|
z=path.get("z", 0),
|
||||||
|
)
|
||||||
|
|
||||||
|
if "segments" in path:
|
||||||
|
self._warn_unknown_keys(
|
||||||
|
section="path (procedural)",
|
||||||
|
provided=set(path.keys()),
|
||||||
|
allowed=allowed_proc,
|
||||||
|
)
|
||||||
|
return self._parse_procedural_path(path)
|
||||||
|
|
||||||
|
raise ValueError("Invalid path configuration.")
|
||||||
|
|
||||||
|
def _parse_procedural_path(
|
||||||
|
self,
|
||||||
|
path: Dict[str, Any]
|
||||||
|
) -> ProceduralPathSpec:
|
||||||
|
raw_segments = path.get("segments")
|
||||||
|
|
||||||
|
if not isinstance(raw_segments, List):
|
||||||
|
raise ValueError("path.segments must be a list of segments.")
|
||||||
|
|
||||||
|
raw_length = path.get("length")
|
||||||
|
|
||||||
|
if raw_length is None:
|
||||||
|
raise MissingConfigKeyError("path", {"length"})
|
||||||
|
|
||||||
|
if isinstance(raw_length, int | float):
|
||||||
|
raw_length = [float(raw_length)]
|
||||||
|
|
||||||
|
segments = self._process_segment_angles(raw_segments)
|
||||||
|
lengths = self._process_segment_lengths(raw_length, len(segments))
|
||||||
|
resolved_segments = self._combine_segments_lengths(segments, lengths)
|
||||||
|
|
||||||
|
return ProceduralPathSpec(
|
||||||
|
segments=resolved_segments,
|
||||||
|
z=path.get("z", defaults.DEFAULT_PATH_HEIGHT),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_segment_angles(
|
||||||
|
self,
|
||||||
|
raw_segments: List[Union[str, dict]]
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
|
||||||
|
normalized = []
|
||||||
|
|
||||||
|
for segment in raw_segments:
|
||||||
|
|
||||||
|
if isinstance(segment, str):
|
||||||
|
normalized.append({"type": segment, "angle": None})
|
||||||
|
|
||||||
|
elif isinstance(segment, dict):
|
||||||
|
if len(segment) != 1:
|
||||||
|
raise ValueError("Invalid segment definition.")
|
||||||
|
seg_type, angle = list(segment.items())[0]
|
||||||
|
normalized.append({"type": seg_type, "angle": angle})
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid segment entry format.")
|
||||||
|
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
def _process_segment_lengths(
|
||||||
|
self,
|
||||||
|
raw_length_list: List[Union[int, float]],
|
||||||
|
num_segments: int
|
||||||
|
) -> List[float]:
|
||||||
|
num_lengths = len(raw_length_list)
|
||||||
|
|
||||||
|
if num_lengths == num_segments or num_lengths == 1:
|
||||||
|
return raw_length_list
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Path length must either be a single number or a list with "
|
||||||
|
"number of elements equal to the number of segments."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _combine_segments_lengths(
|
||||||
|
self,
|
||||||
|
segments: List[Dict[str, Any]],
|
||||||
|
lengths: List[float],
|
||||||
|
) -> List[SegmentSpec]:
|
||||||
|
|
||||||
|
resolved = []
|
||||||
|
|
||||||
|
for seg, length in zip(segments, lengths):
|
||||||
|
angle = seg["angle"]
|
||||||
|
|
||||||
|
if angle is not None and not self._is_turn(seg["type"]):
|
||||||
|
raise ValueError(
|
||||||
|
f"A {seg['type']} segment does not support an angle."
|
||||||
|
)
|
||||||
|
|
||||||
|
resolved.append(
|
||||||
|
SegmentSpec(
|
||||||
|
type=seg["type"],
|
||||||
|
length=length,
|
||||||
|
angle=angle,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_turn(segment_type: str) -> bool:
|
||||||
|
return segment_type in {"turn_left", "turn_right"}
|
||||||
|
|
||||||
|
def _parse_point_sources(self) -> List[SourceSpec]:
|
||||||
|
source_dict = self.config.get("sources", {})
|
||||||
|
specs: List[SourceSpec] = []
|
||||||
|
|
||||||
|
for name, params in source_dict.items():
|
||||||
|
|
||||||
|
required = {"activity_MBq", "isotope", "position"}
|
||||||
|
missing = required - params.keys()
|
||||||
|
if missing:
|
||||||
|
raise MissingConfigKeyError(name, missing)
|
||||||
|
|
||||||
|
activity = params.get("activity_MBq")
|
||||||
|
isotope = params.get("isotope")
|
||||||
|
|
||||||
|
if not isinstance(activity, int | float) or activity <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
f"sources.{name}.activity_MBq must be positive value "
|
||||||
|
"in MegaBequerels."
|
||||||
|
)
|
||||||
|
|
||||||
|
position = params.get("position")
|
||||||
|
|
||||||
|
if isinstance(position, list):
|
||||||
|
if len(position) != 3:
|
||||||
|
raise DimensionError(
|
||||||
|
"Absolute position must be [x, y, z]."
|
||||||
|
)
|
||||||
|
|
||||||
|
specs.append(
|
||||||
|
AbsolutePointSourceSpec(
|
||||||
|
name=name,
|
||||||
|
activity_MBq=float(activity),
|
||||||
|
isotope=isotope,
|
||||||
|
x=float(position[0]),
|
||||||
|
y=float(position[1]),
|
||||||
|
z=float(position[2]),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
elif isinstance(position, dict):
|
||||||
|
specs.append(
|
||||||
|
RelativePointSourceSpec(
|
||||||
|
name=name,
|
||||||
|
activity_MBq=float(activity),
|
||||||
|
isotope=isotope,
|
||||||
|
along_path=float(position["along_path"]),
|
||||||
|
dist_from_path=float(position["dist_from_path"]),
|
||||||
|
side=position["side"],
|
||||||
|
z=position.get("z", defaults.DEFAULT_SOURCE_HEIGHT)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid position format for source '{name}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
return specs
|
||||||
|
|
||||||
|
def _warn_unknown_keys(self, section: str, provided: set, allowed: set):
|
||||||
|
unknown = provided - allowed
|
||||||
|
if unknown:
|
||||||
|
logger.warning(
|
||||||
|
f"Unknown keys in '{section}' section: {unknown}"
|
||||||
|
)
|
||||||
76
src/pg_rad/inputparser/specs.py
Normal file
76
src/pg_rad/inputparser/specs.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
from abc import ABC
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MetadataSpec:
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RuntimeSpec:
|
||||||
|
speed: float
|
||||||
|
acquisition_time: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationOptionsSpec:
|
||||||
|
air_density: float = 1.243
|
||||||
|
seed: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SegmentSpec:
|
||||||
|
type: str
|
||||||
|
length: float
|
||||||
|
angle: float | None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PathSpec(ABC):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ProceduralPathSpec(PathSpec):
|
||||||
|
segments: list[SegmentSpec]
|
||||||
|
z: int | float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CSVPathSpec(PathSpec):
|
||||||
|
file: str
|
||||||
|
east_col_name: str
|
||||||
|
north_col_name: str
|
||||||
|
z: int | float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SourceSpec(ABC):
|
||||||
|
activity_MBq: float
|
||||||
|
isotope: str
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AbsolutePointSourceSpec(SourceSpec):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RelativePointSourceSpec(SourceSpec):
|
||||||
|
along_path: float
|
||||||
|
dist_from_path: float
|
||||||
|
side: str
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationSpec:
|
||||||
|
metadata: MetadataSpec
|
||||||
|
runtime: RuntimeSpec
|
||||||
|
options: SimulationOptionsSpec
|
||||||
|
path: PathSpec
|
||||||
|
point_sources: list[SourceSpec]
|
||||||
@ -1,23 +0,0 @@
|
|||||||
class Isotope:
|
|
||||||
"""Represents the essential information of an isotope.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name (str): Full name (e.g. Caesium-137).
|
|
||||||
E (float): Energy of the primary gamma in keV.
|
|
||||||
b (float): Branching ratio for the gamma at energy E.
|
|
||||||
"""
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
name: str,
|
|
||||||
E: float,
|
|
||||||
b: float
|
|
||||||
):
|
|
||||||
if E <= 0:
|
|
||||||
raise ValueError("primary_gamma must be a positive energy (keV).")
|
|
||||||
|
|
||||||
if not (0 <= b <= 1):
|
|
||||||
raise ValueError("branching_ratio_pg must be a ratio (0 <= b <= 1)")
|
|
||||||
|
|
||||||
self.name = name
|
|
||||||
self.E = E
|
|
||||||
self.b = b
|
|
||||||
9
src/pg_rad/isotopes/__init__.py
Normal file
9
src/pg_rad/isotopes/__init__.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
|
from pg_rad.isotopes import isotope
|
||||||
|
|
||||||
|
from pg_rad.isotopes.isotope import (CS137, Isotope, get_isotope,
|
||||||
|
preset_isotopes,)
|
||||||
|
|
||||||
|
__all__ = ['CS137', 'Isotope', 'get_isotope', 'isotope', 'preset_isotopes']
|
||||||
51
src/pg_rad/isotopes/isotope.py
Normal file
51
src/pg_rad/isotopes/isotope.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
from typing import Dict, Type
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import InvalidIsotopeError
|
||||||
|
from pg_rad.physics.attenuation import get_mass_attenuation_coeff
|
||||||
|
|
||||||
|
|
||||||
|
class Isotope:
|
||||||
|
"""Represents the essential information of an isotope.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): Full name (e.g. Caesium-137).
|
||||||
|
E (float): Energy of the primary gamma in keV.
|
||||||
|
b (float): Branching ratio for the gamma at energy E.
|
||||||
|
"""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
E: float,
|
||||||
|
b: float
|
||||||
|
):
|
||||||
|
if E <= 0:
|
||||||
|
raise ValueError("primary_gamma must be a positive energy (keV).")
|
||||||
|
|
||||||
|
if not (0 <= b <= 1):
|
||||||
|
raise ValueError("branching_ratio_pg must be a ratio b in [0,1]")
|
||||||
|
|
||||||
|
self.name = name
|
||||||
|
self.E = E
|
||||||
|
self.b = b
|
||||||
|
self.mu_mass_air = get_mass_attenuation_coeff(E / 1000)
|
||||||
|
|
||||||
|
|
||||||
|
class CS137(Isotope):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
name="Cs-137",
|
||||||
|
E=661.66,
|
||||||
|
b=0.851
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
preset_isotopes: Dict[str, Type[Isotope]] = {
|
||||||
|
"CS137": CS137
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_isotope(isotope_str: str) -> Isotope:
|
||||||
|
"""Lazy factory function to create isotope objects."""
|
||||||
|
if isotope_str not in preset_isotopes:
|
||||||
|
raise InvalidIsotopeError(f"Unknown isotope: {isotope_str}")
|
||||||
|
return preset_isotopes[isotope_str]()
|
||||||
@ -1,126 +0,0 @@
|
|||||||
from matplotlib import pyplot as plt
|
|
||||||
from matplotlib.patches import Circle
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
from pg_rad.path import Path
|
|
||||||
from pg_rad.sources import PointSource
|
|
||||||
|
|
||||||
class Landscape:
|
|
||||||
"""A generic Landscape that can contain a Path and sources.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
air_density (float, optional): Air density in kg / m^3. Defaults to 1.243.
|
|
||||||
size (int | tuple[int, int, int], optional): Size of the world. Defaults to 500.
|
|
||||||
scale (str, optional): The scale of the size argument passed. Defaults to 'meters'.
|
|
||||||
"""
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
air_density: float = 1.243,
|
|
||||||
size: int | tuple[int, int, int] = 500,
|
|
||||||
scale = 'meters',
|
|
||||||
):
|
|
||||||
if isinstance(size, int):
|
|
||||||
self.world = np.zeros((size, size, size))
|
|
||||||
elif isinstance(size, tuple) and len(size) == 3:
|
|
||||||
self.world = np.zeros(size)
|
|
||||||
else:
|
|
||||||
raise TypeError("size must be an integer or a tuple of 3 integers.")
|
|
||||||
|
|
||||||
self.air_density = air_density
|
|
||||||
self.scale = scale
|
|
||||||
|
|
||||||
self.path: Path = None
|
|
||||||
self.sources: list[PointSource] = []
|
|
||||||
|
|
||||||
def plot(self, z = 0):
|
|
||||||
"""Plot a slice of the world at a height `z`.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
z (int, optional): Height of slice. Defaults to 0.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
fig, ax: Matplotlib figure objects.
|
|
||||||
"""
|
|
||||||
x_lim, y_lim, _ = self.world.shape
|
|
||||||
|
|
||||||
fig, ax = plt.subplots()
|
|
||||||
ax.set_xlim(right=x_lim)
|
|
||||||
ax.set_ylim(top=y_lim)
|
|
||||||
ax.set_xlabel(f"X [{self.scale}]")
|
|
||||||
ax.set_ylabel(f"Y [{self.scale}]")
|
|
||||||
|
|
||||||
if not self.path == None:
|
|
||||||
ax.plot(self.path.x_list, self.path.y_list, 'bo-')
|
|
||||||
|
|
||||||
for s in self.sources:
|
|
||||||
if np.isclose(s.z, z):
|
|
||||||
dot = Circle(
|
|
||||||
(s.x, s.y),
|
|
||||||
radius=5,
|
|
||||||
color=s.color,
|
|
||||||
zorder=5
|
|
||||||
)
|
|
||||||
|
|
||||||
ax.text(
|
|
||||||
s.x + 0.06,
|
|
||||||
s.y + 0.06,
|
|
||||||
s.name,
|
|
||||||
color=s.color,
|
|
||||||
fontsize=10,
|
|
||||||
ha="left",
|
|
||||||
va="bottom",
|
|
||||||
zorder=6
|
|
||||||
)
|
|
||||||
|
|
||||||
ax.add_patch(dot)
|
|
||||||
|
|
||||||
return fig, ax
|
|
||||||
|
|
||||||
def add_sources(self, *sources: PointSource):
|
|
||||||
"""Add one or more point sources to the world.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
*sources (pg_rad.sources.PointSource): One or more sources, passed as
|
|
||||||
Source1, Source2, ...
|
|
||||||
Raises:
|
|
||||||
ValueError: If the source is outside the boundaries of the landscape.
|
|
||||||
"""
|
|
||||||
|
|
||||||
max_x, max_y, max_z = self.world.shape[:3]
|
|
||||||
|
|
||||||
if any(
|
|
||||||
not (0 <= source.x < max_x and
|
|
||||||
0 <= source.y < max_y and
|
|
||||||
0 <= source.z < max_z)
|
|
||||||
for source in sources
|
|
||||||
):
|
|
||||||
raise ValueError("One or more sources are outside the landscape!")
|
|
||||||
|
|
||||||
self.sources.extend(sources)
|
|
||||||
|
|
||||||
def set_path(self, path: Path):
|
|
||||||
"""
|
|
||||||
Set the path in the landscape.
|
|
||||||
"""
|
|
||||||
self.path = path
|
|
||||||
|
|
||||||
def create_landscape_from_path(path: Path, max_z = 500):
|
|
||||||
"""Generate a landscape from a path, using its dimensions to determine
|
|
||||||
the size of the landscape.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path (Path): A Path object describing the trajectory.
|
|
||||||
max_z (int, optional): Height of the world. Defaults to 500 meters.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
landscape (pg_rad.landscape.Landscape): A landscape with dimensions based on the provided Path.
|
|
||||||
"""
|
|
||||||
max_x = np.ceil(max(path.x_list))
|
|
||||||
max_y = np.ceil(max(path.y_list))
|
|
||||||
|
|
||||||
landscape = Landscape(
|
|
||||||
size = (max_x, max_y, max_z)
|
|
||||||
)
|
|
||||||
|
|
||||||
landscape.path = path
|
|
||||||
return landscape
|
|
||||||
0
src/pg_rad/landscape/__init__.py
Normal file
0
src/pg_rad/landscape/__init__.py
Normal file
172
src/pg_rad/landscape/builder.py
Normal file
172
src/pg_rad/landscape/builder.py
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Self
|
||||||
|
|
||||||
|
from .landscape import Landscape
|
||||||
|
from pg_rad.dataloader.dataloader import load_data
|
||||||
|
from pg_rad.objects.sources import PointSource
|
||||||
|
from pg_rad.exceptions.exceptions import OutOfBoundsError
|
||||||
|
from pg_rad.utils.projection import rel_to_abs_source_position
|
||||||
|
from pg_rad.inputparser.specs import (
|
||||||
|
SimulationSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
AbsolutePointSourceSpec,
|
||||||
|
RelativePointSourceSpec
|
||||||
|
)
|
||||||
|
|
||||||
|
from pg_rad.path.path import Path, path_from_RT90
|
||||||
|
|
||||||
|
from road_gen.generators.segmented_road_generator import SegmentedRoadGenerator
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class LandscapeBuilder:
|
||||||
|
def __init__(self, name: str = "Unnamed landscape"):
|
||||||
|
self.name = name
|
||||||
|
self._path = None
|
||||||
|
self._point_sources = []
|
||||||
|
self._size = None
|
||||||
|
self._air_density = 1.243
|
||||||
|
|
||||||
|
logger.debug(f"LandscapeBuilder initialized: {self.name}")
|
||||||
|
|
||||||
|
def set_air_density(self, air_density: float | None) -> Self:
|
||||||
|
"""Set the air density of the world."""
|
||||||
|
if air_density and isinstance(air_density, float):
|
||||||
|
self._air_density = air_density
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
|
||||||
|
"""Set the size of the landscape in meters (x,y,z)."""
|
||||||
|
if self._path and any(p > s for p, s in zip(self._path.size, size)):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"Cannot set landscape size smaller than the path."
|
||||||
|
)
|
||||||
|
|
||||||
|
self._size = size
|
||||||
|
logger.debug("Size of the landscape has been updated.")
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_path(self):
|
||||||
|
return self._path
|
||||||
|
|
||||||
|
def set_path_from_segments(
|
||||||
|
self,
|
||||||
|
sim_spec: SimulationSpec
|
||||||
|
):
|
||||||
|
|
||||||
|
segments = sim_spec.path.segments
|
||||||
|
types = [s.type for s in segments]
|
||||||
|
lengths = [s.length for s in segments]
|
||||||
|
angles = [s.angle for s in segments]
|
||||||
|
|
||||||
|
sg = SegmentedRoadGenerator(
|
||||||
|
length=lengths,
|
||||||
|
ds=sim_spec.runtime.speed * sim_spec.runtime.acquisition_time,
|
||||||
|
velocity=sim_spec.runtime.speed,
|
||||||
|
seed=sim_spec.options.seed
|
||||||
|
)
|
||||||
|
|
||||||
|
x, y = sg.generate(
|
||||||
|
segments=types,
|
||||||
|
lengths=lengths,
|
||||||
|
angles=angles
|
||||||
|
)
|
||||||
|
|
||||||
|
self._path = Path(list(zip(x, y)))
|
||||||
|
self._fit_landscape_to_path()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_path_from_experimental_data(
|
||||||
|
self,
|
||||||
|
spec: CSVPathSpec
|
||||||
|
) -> Self:
|
||||||
|
df = load_data(spec.file)
|
||||||
|
self._path = path_from_RT90(
|
||||||
|
df=df,
|
||||||
|
east_col=spec.east_col_name,
|
||||||
|
north_col=spec.north_col_name,
|
||||||
|
z=spec.z
|
||||||
|
)
|
||||||
|
|
||||||
|
self._fit_landscape_to_path()
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def set_point_sources(
|
||||||
|
self,
|
||||||
|
*sources: AbsolutePointSourceSpec | RelativePointSourceSpec
|
||||||
|
):
|
||||||
|
"""Add one or more point sources to the world.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
*sources (AbsolutePointSourceSpec | RelativePointSourceSpec):
|
||||||
|
One or more sources, passed as SourceSpecs tuple
|
||||||
|
Raises:
|
||||||
|
OutOfBoundsError: If any source is outside the boundaries of the
|
||||||
|
landscape.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for s in sources:
|
||||||
|
if isinstance(s, AbsolutePointSourceSpec):
|
||||||
|
pos = (s.x, s.y, s.z)
|
||||||
|
elif isinstance(s, RelativePointSourceSpec):
|
||||||
|
path = self.get_path()
|
||||||
|
pos = rel_to_abs_source_position(
|
||||||
|
x_list=path.x_list,
|
||||||
|
y_list=path.y_list,
|
||||||
|
path_z=path.z,
|
||||||
|
along_path=s.along_path,
|
||||||
|
side=s.side,
|
||||||
|
dist_from_path=s.dist_from_path)
|
||||||
|
if any(
|
||||||
|
p < 0 or p >= s for p, s in zip(pos, self._size)
|
||||||
|
):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"One or more sources attempted to "
|
||||||
|
"be placed outside the landscape."
|
||||||
|
)
|
||||||
|
|
||||||
|
self._point_sources.append(PointSource(
|
||||||
|
activity_MBq=s.activity_MBq,
|
||||||
|
isotope=s.isotope,
|
||||||
|
position=pos,
|
||||||
|
name=s.name
|
||||||
|
))
|
||||||
|
|
||||||
|
def _fit_landscape_to_path(self) -> None:
|
||||||
|
"""The size of the landscape will be updated if
|
||||||
|
1) _size is not set, or
|
||||||
|
2) _size is too small to contain the path."""
|
||||||
|
|
||||||
|
needs_resize = (
|
||||||
|
not self._size
|
||||||
|
or any(p > s for p, s in zip(self._path.size, self._size))
|
||||||
|
)
|
||||||
|
|
||||||
|
if needs_resize:
|
||||||
|
if not self._size:
|
||||||
|
logger.debug("Because no Landscape size was set, "
|
||||||
|
"it will now set to path dimensions.")
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Path exceeds current landscape size. "
|
||||||
|
"Landscape size will be expanded to accommodate path."
|
||||||
|
)
|
||||||
|
|
||||||
|
max_size = max(self._path.size)
|
||||||
|
self.set_landscape_size((max_size, max_size))
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
landscape = Landscape(
|
||||||
|
name=self.name,
|
||||||
|
path=self._path,
|
||||||
|
point_sources=self._point_sources,
|
||||||
|
size=self._size,
|
||||||
|
air_density=self._air_density
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Landscape built successfully: {landscape.name}")
|
||||||
|
return landscape
|
||||||
49
src/pg_rad/landscape/director.py
Normal file
49
src/pg_rad/landscape/director.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
from importlib.resources import files
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from pg_rad.configs.filepaths import TEST_EXP_DATA
|
||||||
|
from .builder import LandscapeBuilder
|
||||||
|
from pg_rad.inputparser.specs import (
|
||||||
|
SimulationSpec,
|
||||||
|
CSVPathSpec,
|
||||||
|
ProceduralPathSpec
|
||||||
|
)
|
||||||
|
from pg_rad.objects.sources import PointSource
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class LandscapeDirector:
|
||||||
|
def __init__(self):
|
||||||
|
logger.debug("LandscapeDirector initialized.")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_test_landscape():
|
||||||
|
fp = files('pg_rad.data').joinpath(TEST_EXP_DATA)
|
||||||
|
source = PointSource(
|
||||||
|
activity_MBq=100E9,
|
||||||
|
isotope="CS137",
|
||||||
|
position=(0, 0, 0)
|
||||||
|
)
|
||||||
|
lb = LandscapeBuilder("Test landscape")
|
||||||
|
lb.set_air_density(1.243)
|
||||||
|
lb.set_path_from_experimental_data(fp, z=0)
|
||||||
|
lb.set_point_sources(source)
|
||||||
|
landscape = lb.build()
|
||||||
|
return landscape
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_from_config(config: SimulationSpec):
|
||||||
|
lb = LandscapeBuilder(config.metadata.name)
|
||||||
|
lb.set_air_density(config.options.air_density)
|
||||||
|
|
||||||
|
if isinstance(config.path, CSVPathSpec):
|
||||||
|
lb.set_path_from_experimental_data(spec=config.path)
|
||||||
|
elif isinstance(config.path, ProceduralPathSpec):
|
||||||
|
lb.set_path_from_segments(
|
||||||
|
sim_spec=config,
|
||||||
|
)
|
||||||
|
lb.set_point_sources(*config.point_sources)
|
||||||
|
landscape = lb.build()
|
||||||
|
return landscape
|
||||||
38
src/pg_rad/landscape/landscape.py
Normal file
38
src/pg_rad/landscape/landscape.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from pg_rad.path.path import Path
|
||||||
|
from pg_rad.objects.sources import PointSource
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Landscape:
|
||||||
|
"""
|
||||||
|
A generic Landscape that can contain a Path and sources.
|
||||||
|
"""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
path: Path,
|
||||||
|
air_density: float,
|
||||||
|
point_sources: list[PointSource],
|
||||||
|
size: tuple[int, int, int]
|
||||||
|
):
|
||||||
|
"""Initialize a landscape.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): Name of the landscape.
|
||||||
|
path (Path): The path of the detector.
|
||||||
|
air_density (float): Air density in kg/m^3.
|
||||||
|
point_sources (list[PointSource]): List of point sources.
|
||||||
|
size (tuple[int, int, int]): Size of the world.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.name = name
|
||||||
|
self.path = path
|
||||||
|
self.point_sources = point_sources
|
||||||
|
self.size = size
|
||||||
|
self.air_density = air_density
|
||||||
|
|
||||||
|
logger.debug(f"Landscape created: {self.name}")
|
||||||
@ -1,17 +0,0 @@
|
|||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
import pathlib
|
|
||||||
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
def setup_logger(name):
|
|
||||||
logger = logging.getLogger(name)
|
|
||||||
|
|
||||||
base_dir = pathlib.Path(__file__).resolve().parent
|
|
||||||
config_file = base_dir / "configs" / "logging.yml"
|
|
||||||
|
|
||||||
with open(config_file) as f:
|
|
||||||
config = yaml.safe_load(f)
|
|
||||||
|
|
||||||
logging.config.dictConfig(config)
|
|
||||||
return logger
|
|
||||||
5
src/pg_rad/logger/__init__.py
Normal file
5
src/pg_rad/logger/__init__.py
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
from pg_rad.logger import logger
|
||||||
|
|
||||||
|
from pg_rad.logger.logger import (setup_logger,)
|
||||||
|
|
||||||
|
__all__ = ['logger', 'setup_logger']
|
||||||
22
src/pg_rad/logger/logger.py
Normal file
22
src/pg_rad/logger/logger.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import logging.config
|
||||||
|
from importlib.resources import files
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from pg_rad.configs.filepaths import LOGGING_CONFIG
|
||||||
|
|
||||||
|
|
||||||
|
def setup_logger(log_level: str = "WARNING"):
|
||||||
|
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
||||||
|
|
||||||
|
if log_level not in levels:
|
||||||
|
raise ValueError(f"Log level must be one of {levels}.")
|
||||||
|
|
||||||
|
config_file = files('pg_rad.configs').joinpath(LOGGING_CONFIG)
|
||||||
|
|
||||||
|
with open(config_file) as f:
|
||||||
|
config = yaml.safe_load(f)
|
||||||
|
|
||||||
|
config["loggers"]["root"]["level"] = log_level
|
||||||
|
|
||||||
|
logging.config.dictConfig(config)
|
||||||
125
src/pg_rad/main.py
Normal file
125
src/pg_rad/main.py
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from pandas.errors import ParserError
|
||||||
|
from yaml import YAMLError
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import (
|
||||||
|
MissingConfigKeyError,
|
||||||
|
OutOfBoundsError,
|
||||||
|
DimensionError,
|
||||||
|
InvalidIsotopeError
|
||||||
|
)
|
||||||
|
from pg_rad.logger.logger import setup_logger
|
||||||
|
from pg_rad.inputparser.parser import ConfigParser
|
||||||
|
from pg_rad.landscape.director import LandscapeDirector
|
||||||
|
from pg_rad.plotting.result_plotter import ResultPlotter
|
||||||
|
from pg_rad.simulator.engine import SimulationEngine
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog="pg-rad",
|
||||||
|
description="Primary Gamma RADiation landscape tool"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--config",
|
||||||
|
help="Build from a config file."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--test",
|
||||||
|
action="store_true",
|
||||||
|
help="Load and run the test landscape."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--loglevel",
|
||||||
|
default="INFO",
|
||||||
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--saveplot",
|
||||||
|
action="store_true",
|
||||||
|
help="Save the plot or not."
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
setup_logger(args.loglevel)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
if args.test:
|
||||||
|
test_yaml = """
|
||||||
|
name: Test landscape
|
||||||
|
speed: 8.33
|
||||||
|
acquisition_time: 1
|
||||||
|
|
||||||
|
path:
|
||||||
|
length: 1000
|
||||||
|
segments:
|
||||||
|
- straight
|
||||||
|
|
||||||
|
sources:
|
||||||
|
test_source:
|
||||||
|
activity_MBq: 1000
|
||||||
|
position: [500, 100, 0]
|
||||||
|
isotope: CS137
|
||||||
|
"""
|
||||||
|
|
||||||
|
cp = ConfigParser(test_yaml).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
|
|
||||||
|
output = SimulationEngine(
|
||||||
|
landscape=landscape,
|
||||||
|
runtime_spec=cp.runtime,
|
||||||
|
sim_spec=cp.options
|
||||||
|
).simulate()
|
||||||
|
|
||||||
|
plotter = ResultPlotter(landscape, output)
|
||||||
|
plotter.plot()
|
||||||
|
|
||||||
|
elif args.config:
|
||||||
|
try:
|
||||||
|
cp = ConfigParser(args.config).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
|
|
||||||
|
output = SimulationEngine(
|
||||||
|
landscape=landscape,
|
||||||
|
runtime_spec=cp.runtime,
|
||||||
|
sim_spec=cp.options
|
||||||
|
).simulate()
|
||||||
|
|
||||||
|
plotter = ResultPlotter(landscape, output)
|
||||||
|
plotter.plot()
|
||||||
|
except (
|
||||||
|
MissingConfigKeyError,
|
||||||
|
KeyError,
|
||||||
|
YAMLError,
|
||||||
|
):
|
||||||
|
logger.critical(
|
||||||
|
"The provided config file is invalid. "
|
||||||
|
"Check the log above. You can consult the documentation for "
|
||||||
|
"an explanation of how to define a config file."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
except (
|
||||||
|
OutOfBoundsError,
|
||||||
|
DimensionError,
|
||||||
|
InvalidIsotopeError,
|
||||||
|
ValueError
|
||||||
|
) as e:
|
||||||
|
logger.critical(e)
|
||||||
|
logger.critical(
|
||||||
|
"One or more items in config are not specified correctly. "
|
||||||
|
"Please consult this log and fix the problem."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
except (
|
||||||
|
FileNotFoundError,
|
||||||
|
ParserError
|
||||||
|
):
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -1,33 +0,0 @@
|
|||||||
import math
|
|
||||||
from typing import Self
|
|
||||||
|
|
||||||
class Object:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
x: float,
|
|
||||||
y: float,
|
|
||||||
z: float,
|
|
||||||
name: str = "Unnamed object",
|
|
||||||
color: str = 'grey'):
|
|
||||||
"""
|
|
||||||
A generic object.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
x (float): X coordinate.
|
|
||||||
y (float): Y coordinate.
|
|
||||||
z (float): Z coordinate.
|
|
||||||
name (str, optional): Name for the object. Defaults to "Unnamed object".
|
|
||||||
color (str, optional): Matplotlib compatible color string. Defaults to "red".
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.x = x
|
|
||||||
self.y = y
|
|
||||||
self.z = z
|
|
||||||
self.name = name
|
|
||||||
self.color = color
|
|
||||||
|
|
||||||
def distance_to(self, other: Self) -> float:
|
|
||||||
return math.dist(
|
|
||||||
(self.x, self.y, self.z),
|
|
||||||
(other.x, other.y, other.z),
|
|
||||||
)
|
|
||||||
11
src/pg_rad/objects/__init__.py
Normal file
11
src/pg_rad/objects/__init__.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
|
from pg_rad.objects import objects
|
||||||
|
from pg_rad.objects import sources
|
||||||
|
|
||||||
|
from pg_rad.objects.objects import (BaseObject,)
|
||||||
|
from pg_rad.objects.sources import (PointSource,)
|
||||||
|
|
||||||
|
__all__ = ['BaseObject', 'PointSource', 'objects',
|
||||||
|
'sources']
|
||||||
44
src/pg_rad/objects/objects.py
Normal file
44
src/pg_rad/objects/objects.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
from typing import Self
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import DimensionError
|
||||||
|
|
||||||
|
|
||||||
|
class BaseObject:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
pos: tuple[float, float, float] = (0, 0, 0),
|
||||||
|
name: str = "Unnamed object",
|
||||||
|
color: str = 'grey'):
|
||||||
|
"""
|
||||||
|
A generic object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pos (tuple[float, float, float]): Position vector (x,y,z).
|
||||||
|
name (str, optional): Name for the object.
|
||||||
|
Defaults to "Unnamed object".
|
||||||
|
color (str, optional): Matplotlib compatible color string.
|
||||||
|
Defaults to "red".
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(pos) != 3:
|
||||||
|
raise DimensionError("Position must be tuple of length 3 (x,y,z).")
|
||||||
|
self.pos = pos
|
||||||
|
self.name = name
|
||||||
|
self.color = color
|
||||||
|
|
||||||
|
def distance_to(self, other: Self | tuple) -> float:
|
||||||
|
if isinstance(other, tuple) and len(other) == 3:
|
||||||
|
r = np.linalg.norm(
|
||||||
|
np.subtract(self.pos, other)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
r = np.linalg.norm(
|
||||||
|
np.subtract(self.pos, other.pos)
|
||||||
|
)
|
||||||
|
except AttributeError as e:
|
||||||
|
raise e("other must be an object in the world \
|
||||||
|
or a position tuple (x,y,z).")
|
||||||
|
return r
|
||||||
54
src/pg_rad/objects/sources.py
Normal file
54
src/pg_rad/objects/sources.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from .objects import BaseObject
|
||||||
|
from pg_rad.isotopes.isotope import Isotope, get_isotope
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PointSource(BaseObject):
|
||||||
|
_id_counter = 1
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
activity_MBq: int,
|
||||||
|
isotope: str,
|
||||||
|
position: tuple[float, float, float] = (0, 0, 0),
|
||||||
|
name: str | None = None,
|
||||||
|
color: str = 'red'
|
||||||
|
):
|
||||||
|
"""A point source.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
activity_MBq (int): Activity A in MBq.
|
||||||
|
isotope (Isotope): The isotope.
|
||||||
|
pos (tuple[float, float, float], optional):
|
||||||
|
Position of the PointSource.
|
||||||
|
name (str, optional): Can give the source a unique name.
|
||||||
|
If not provided, point sources are sequentially
|
||||||
|
named: Source1, Source2, ...
|
||||||
|
color (str, optional): Matplotlib compatible color string
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.id = PointSource._id_counter
|
||||||
|
PointSource._id_counter += 1
|
||||||
|
|
||||||
|
# default name derived from ID if not provided
|
||||||
|
if name is None:
|
||||||
|
name = f"Source {self.id}"
|
||||||
|
|
||||||
|
super().__init__(position, name, color)
|
||||||
|
|
||||||
|
self.activity = activity_MBq
|
||||||
|
self.isotope: Isotope = get_isotope(isotope)
|
||||||
|
|
||||||
|
logger.debug(f"Source created: {self.name}")
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
x, y, z = self.position
|
||||||
|
repr_str = (f"PointSource(name={self.name}, "
|
||||||
|
+ f"pos={(x, y, z)}, "
|
||||||
|
+ f"A={self.activity} MBq), "
|
||||||
|
+ f"isotope={self.isotope.name}.")
|
||||||
|
|
||||||
|
return repr_str
|
||||||
@ -1,187 +0,0 @@
|
|||||||
from collections.abc import Sequence
|
|
||||||
import math
|
|
||||||
|
|
||||||
from matplotlib import pyplot as plt
|
|
||||||
import numpy as np
|
|
||||||
import pandas as pd
|
|
||||||
import piecewise_regression
|
|
||||||
|
|
||||||
from pg_rad.exceptions import ConvergenceError
|
|
||||||
from pg_rad.logger import setup_logger
|
|
||||||
|
|
||||||
logger = setup_logger(__name__)
|
|
||||||
|
|
||||||
class PathSegment:
|
|
||||||
def __init__(self, a: tuple[float, float], b: tuple[float, float]):
|
|
||||||
"""A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
a (tuple[float, float]): The starting point (x_a, y_a).
|
|
||||||
b (tuple[float, float]): The final point (x_b, y_b).
|
|
||||||
"""
|
|
||||||
self.a = a
|
|
||||||
self.b = b
|
|
||||||
|
|
||||||
def get_length(self) -> float:
|
|
||||||
return math.dist(self.a, self.b)
|
|
||||||
|
|
||||||
length = property(get_length)
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
return str(f"({self.a}, {self.b})")
|
|
||||||
|
|
||||||
def __getitem__(self, index) -> float:
|
|
||||||
if index == 0:
|
|
||||||
return self.a
|
|
||||||
elif index == 1:
|
|
||||||
return self.b
|
|
||||||
else:
|
|
||||||
raise IndexError
|
|
||||||
|
|
||||||
class Path:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
coord_list: Sequence[tuple[float, float]],
|
|
||||||
z: float = 0,
|
|
||||||
path_simplify = False
|
|
||||||
):
|
|
||||||
"""Construct a path of sequences based on a list of coordinates.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
coord_list (Sequence[tuple[float, float]]): List of x,y coordinates.
|
|
||||||
z (float, optional): Height of the path. Defaults to 0.
|
|
||||||
path_simplify (bool, optional): Whether to pg_rad.path.simplify_path(). Defaults to False.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if len(coord_list) < 2:
|
|
||||||
raise ValueError("Must provide at least two coordinates as a list of tuples, e.g. [(x1, y1), (x2, y2)]")
|
|
||||||
|
|
||||||
x, y = tuple(zip(*coord_list))
|
|
||||||
|
|
||||||
if path_simplify:
|
|
||||||
try:
|
|
||||||
x, y = simplify_path(list(x), list(y))
|
|
||||||
except ConvergenceError:
|
|
||||||
logger.warning("Continuing without simplifying path.")
|
|
||||||
|
|
||||||
self.x_list = list(x)
|
|
||||||
self.y_list = list(y)
|
|
||||||
|
|
||||||
coord_list = list(zip(x, y))
|
|
||||||
|
|
||||||
self.segments = [PathSegment(i, ip1) for i, ip1 in zip(coord_list, coord_list[1:])]
|
|
||||||
|
|
||||||
self.z = z
|
|
||||||
|
|
||||||
def get_length(self) -> float:
|
|
||||||
return sum([s.length for s in self.segments])
|
|
||||||
|
|
||||||
length = property(get_length)
|
|
||||||
|
|
||||||
def __getitem__(self, index) -> PathSegment:
|
|
||||||
return self.segments[index]
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
return str([str(s) for s in self.segments])
|
|
||||||
|
|
||||||
def plot(self, **kwargs):
|
|
||||||
"""
|
|
||||||
Plot the path using matplotlib.
|
|
||||||
"""
|
|
||||||
plt.plot(self.x_list, self.y_list, **kwargs)
|
|
||||||
|
|
||||||
def simplify_path(
|
|
||||||
x: Sequence[float],
|
|
||||||
y: Sequence[float],
|
|
||||||
keep_endpoints_equal: bool = False,
|
|
||||||
n_breakpoints: int = 3
|
|
||||||
):
|
|
||||||
"""From full resolution x and y arrays, return a piecewise linearly approximated/simplified pair of x and y arrays.
|
|
||||||
|
|
||||||
This function uses the `piecewise_regression` package. From a full set of
|
|
||||||
coordinate pairs, the function fits linear sections, automatically finding
|
|
||||||
the number of breakpoints and their positions.
|
|
||||||
|
|
||||||
On why the default value of n_breakpoints is 3, from the `piecewise_regression`
|
|
||||||
docs:
|
|
||||||
"If you do not have (or do not want to use) initial guesses for the number
|
|
||||||
of breakpoints, you can set it to n_breakpoints=3, and the algorithm will
|
|
||||||
randomly generate start_values. With a 50% chance, the bootstrap restarting
|
|
||||||
algorithm will either use the best currently converged breakpoints or
|
|
||||||
randomly generate new start_values, escaping the local optima in two ways in
|
|
||||||
order to find better global optima."
|
|
||||||
|
|
||||||
Args:
|
|
||||||
x (Sequence[float]): Full list of x coordinates.
|
|
||||||
y (Sequence[float]): Full list of y coordinates.
|
|
||||||
keep_endpoints_equal (bool, optional): Whether or not to force start
|
|
||||||
and end to be exactly equal to the original. This will worsen the linear
|
|
||||||
approximation at the beginning and end of path. Defaults to False.
|
|
||||||
n_breakpoints (int, optional): Number of breakpoints. Defaults to 3.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
x (list[float]): Reduced list of x coordinates.
|
|
||||||
y (list[float]): Reduced list of y coordinates.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConvergenceError: If the fitting algorithm failed to simplify the path.
|
|
||||||
|
|
||||||
Reference:
|
|
||||||
Pilgrim, C., (2021). piecewise-regression (aka segmented regression) in Python. Journal of Open Source Software, 6(68), 3859, https://doi.org/10.21105/joss.03859.
|
|
||||||
"""
|
|
||||||
|
|
||||||
logger.debug(f"Attempting piecewise regression on path.")
|
|
||||||
|
|
||||||
pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints)
|
|
||||||
pw_res = pw_fit.get_results()
|
|
||||||
|
|
||||||
if pw_res == None:
|
|
||||||
logger.error("Piecewise regression failed to converge.")
|
|
||||||
raise ConvergenceError("Piecewise regression failed to converge.")
|
|
||||||
|
|
||||||
est = pw_res['estimates']
|
|
||||||
|
|
||||||
# extract and sort breakpoints
|
|
||||||
breakpoints_x = sorted(
|
|
||||||
v['estimate'] for k, v in est.items() if k.startswith('breakpoint')
|
|
||||||
)
|
|
||||||
|
|
||||||
x_points = [x[0]] + breakpoints_x + [x[-1]]
|
|
||||||
|
|
||||||
y_points = pw_fit.predict(x_points)
|
|
||||||
|
|
||||||
if keep_endpoints_equal:
|
|
||||||
logger.debug("Forcing endpoint equality.")
|
|
||||||
y_points[0] = y[0]
|
|
||||||
y_points[-1] = y[-1]
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"Piecewise regression reduced path from {len(x)-1} to {len(x_points)-1} segments."
|
|
||||||
)
|
|
||||||
|
|
||||||
return x_points, y_points
|
|
||||||
|
|
||||||
def path_from_RT90(
|
|
||||||
df: pd.DataFrame,
|
|
||||||
east_col: str = "East",
|
|
||||||
north_col: str = "North",
|
|
||||||
**kwargs
|
|
||||||
) -> Path:
|
|
||||||
"""Construct a path from East and North formatted coordinates (RT90) in a Pandas DataFrame.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
df (pandas.DataFrame): DataFrame containing at least the two columns noted in the cols argument.
|
|
||||||
east_col (str): The column name for the East coordinates.
|
|
||||||
north_col (str): The column name for the North coordinates.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Path: A Path object built from the aquisition coordinates in the DataFrame.
|
|
||||||
"""
|
|
||||||
|
|
||||||
east_arr = np.array(df[east_col]) - min(df[east_col])
|
|
||||||
north_arr = np.array(df[north_col]) - min(df[north_col])
|
|
||||||
|
|
||||||
coord_pairs = list(zip(east_arr, north_arr))
|
|
||||||
|
|
||||||
path = Path(coord_pairs, **kwargs)
|
|
||||||
return path
|
|
||||||
8
src/pg_rad/path/__init__.py
Normal file
8
src/pg_rad/path/__init__.py
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
|
||||||
|
from pg_rad.path import path
|
||||||
|
|
||||||
|
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,)
|
||||||
|
|
||||||
|
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90']
|
||||||
130
src/pg_rad/path/path.py
Normal file
130
src/pg_rad/path/path.py
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
from collections.abc import Sequence
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PathSegment:
|
||||||
|
def __init__(self, a: tuple[float, float], b: tuple[float, float]):
|
||||||
|
"""A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
a (tuple[float, float]): The starting point (x_a, y_a).
|
||||||
|
b (tuple[float, float]): The final point (x_b, y_b).
|
||||||
|
"""
|
||||||
|
self.a = a
|
||||||
|
self.b = b
|
||||||
|
|
||||||
|
def get_length(self) -> float:
|
||||||
|
return math.dist(self.a, self.b)
|
||||||
|
|
||||||
|
length = property(get_length)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return str(f"({self.a}, {self.b})")
|
||||||
|
|
||||||
|
def __getitem__(self, index) -> float:
|
||||||
|
if index == 0:
|
||||||
|
return self.a
|
||||||
|
elif index == 1:
|
||||||
|
return self.b
|
||||||
|
else:
|
||||||
|
raise IndexError
|
||||||
|
|
||||||
|
|
||||||
|
class Path:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
coord_list: Sequence[tuple[float, float]],
|
||||||
|
z: float = 0.,
|
||||||
|
z_box: float = 50.
|
||||||
|
):
|
||||||
|
"""Construct a path of sequences based on a list of coordinates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
coord_list (Sequence[tuple[float, float]]): List of x,y
|
||||||
|
coordinates.
|
||||||
|
z (float, optional): position of the path in z-direction in meters.
|
||||||
|
Defaults to 0 meters.
|
||||||
|
z_box (float, optional): How much empty space to set
|
||||||
|
above the path in meters. Defaults to 50 meters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(coord_list) < 2:
|
||||||
|
raise ValueError("Must provide at least two coordinates as a \
|
||||||
|
of tuples, e.g. [(x1, y1), (x2, y2)]")
|
||||||
|
|
||||||
|
x, y = tuple(zip(*coord_list))
|
||||||
|
|
||||||
|
self.x_list = list(x)
|
||||||
|
self.y_list = list(y)
|
||||||
|
|
||||||
|
coord_list = list(zip(x, y))
|
||||||
|
|
||||||
|
self.segments = [
|
||||||
|
PathSegment(i, ip1)
|
||||||
|
for i, ip1 in
|
||||||
|
zip(coord_list, coord_list[1:])
|
||||||
|
]
|
||||||
|
|
||||||
|
self.z = z
|
||||||
|
self.size = (
|
||||||
|
np.ceil(max(self.x_list)),
|
||||||
|
np.ceil(max(self.y_list)),
|
||||||
|
z + z_box
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Path created.")
|
||||||
|
|
||||||
|
def get_length(self) -> float:
|
||||||
|
return sum([s.length for s in self.segments])
|
||||||
|
|
||||||
|
length = property(get_length)
|
||||||
|
|
||||||
|
def __getitem__(self, index) -> PathSegment:
|
||||||
|
return self.segments[index]
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return str([str(s) for s in self.segments])
|
||||||
|
|
||||||
|
def plot(self, **kwargs):
|
||||||
|
"""
|
||||||
|
Plot the path using matplotlib.
|
||||||
|
"""
|
||||||
|
plt.plot(self.x_list, self.y_list, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def path_from_RT90(
|
||||||
|
df: pd.DataFrame,
|
||||||
|
east_col: str = "East",
|
||||||
|
north_col: str = "North",
|
||||||
|
**kwargs
|
||||||
|
) -> Path:
|
||||||
|
"""Construct a path from East and North formatted coordinates (RT90)
|
||||||
|
in a Pandas DataFrame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
df (pandas.DataFrame): DataFrame containing at least the two columns
|
||||||
|
noted in the cols argument.
|
||||||
|
east_col (str): The column name for the East coordinates.
|
||||||
|
north_col (str): The column name for the North coordinates.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path: A Path object built from the aquisition coordinates in the
|
||||||
|
DataFrame.
|
||||||
|
"""
|
||||||
|
|
||||||
|
east_arr = np.array(df[east_col]) - min(df[east_col])
|
||||||
|
north_arr = np.array(df[north_col]) - min(df[north_col])
|
||||||
|
|
||||||
|
coord_pairs = list(zip(east_arr, north_arr))
|
||||||
|
|
||||||
|
path = Path(coord_pairs, **kwargs)
|
||||||
|
logger.debug("Loaded path from provided RT90 coordinates.")
|
||||||
|
return path
|
||||||
12
src/pg_rad/physics/__init__.py
Normal file
12
src/pg_rad/physics/__init__.py
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
from pg_rad.physics import attenuation
|
||||||
|
from pg_rad.physics import fluence
|
||||||
|
|
||||||
|
from pg_rad.physics.attenuation import (get_mass_attenuation_coeff,)
|
||||||
|
from pg_rad.physics.fluence import (calculate_fluence_along_path,
|
||||||
|
calculate_fluence_at, phi,)
|
||||||
|
|
||||||
|
__all__ = ['attenuation', 'calculate_fluence_along_path',
|
||||||
|
'calculate_fluence_at', 'fluence', 'get_mass_attenuation_coeff',
|
||||||
|
'phi']
|
||||||
17
src/pg_rad/physics/attenuation.py
Normal file
17
src/pg_rad/physics/attenuation.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
from importlib.resources import files
|
||||||
|
|
||||||
|
from pandas import read_csv
|
||||||
|
from scipy.interpolate import interp1d
|
||||||
|
|
||||||
|
from pg_rad.configs.filepaths import ATTENUATION_TABLE
|
||||||
|
|
||||||
|
|
||||||
|
def get_mass_attenuation_coeff(
|
||||||
|
*args
|
||||||
|
) -> float:
|
||||||
|
csv = files('pg_rad.data').joinpath(ATTENUATION_TABLE)
|
||||||
|
data = read_csv(csv)
|
||||||
|
x = data["energy_mev"].to_numpy()
|
||||||
|
y = data["mu"].to_numpy()
|
||||||
|
f = interp1d(x, y)
|
||||||
|
return f(*args)
|
||||||
99
src/pg_rad/physics/fluence.py
Normal file
99
src/pg_rad/physics/fluence.py
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
from typing import Tuple, TYPE_CHECKING
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
|
|
||||||
|
def phi(
|
||||||
|
r: float,
|
||||||
|
activity: float | int,
|
||||||
|
branching_ratio: float,
|
||||||
|
mu_mass_air: float,
|
||||||
|
air_density: float,
|
||||||
|
) -> float:
|
||||||
|
"""Compute the contribution of a single point source to the
|
||||||
|
primary photon fluence rate phi at position (x,y,z).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
r (float): [m] Distance to the point source.
|
||||||
|
activity (float | int): [Bq] Activity of the point source.
|
||||||
|
branching_ratio (float): Branching ratio for the photon energy E_gamma.
|
||||||
|
mu_mass_air (float): [cm^2/g] Mass attenuation coefficient for air.
|
||||||
|
air_density (float): [kg/m^3] Air density.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
phi (float): [s^-1 m^-2] Primary photon fluence rate at distance r from
|
||||||
|
a point source.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Linear photon attenuation coefficient in m^-1.
|
||||||
|
mu_mass_air *= 0.1
|
||||||
|
mu_air = mu_mass_air * air_density
|
||||||
|
|
||||||
|
phi_r = (
|
||||||
|
activity
|
||||||
|
* branching_ratio
|
||||||
|
* np.exp(-mu_air * r)
|
||||||
|
/ (4 * np.pi * r**2)
|
||||||
|
)
|
||||||
|
|
||||||
|
return phi_r
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_fluence_at(landscape: "Landscape", pos: np.ndarray, scaling=1E6):
|
||||||
|
"""Compute fluence at an arbitrary position in the landscape.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
landscape (Landscape): The landscape to compute.
|
||||||
|
pos (np.ndarray): (N, 3) array of positions.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
total_phi (np.ndarray): (N,) array of fluences.
|
||||||
|
"""
|
||||||
|
pos = np.atleast_2d(pos)
|
||||||
|
total_phi = np.zeros(pos.shape[0])
|
||||||
|
|
||||||
|
for source in landscape.point_sources:
|
||||||
|
r = np.linalg.norm(pos - np.array(source.pos), axis=1)
|
||||||
|
r = np.maximum(r, 1E-3) # enforce minimum distance of 1cm
|
||||||
|
|
||||||
|
phi_source = phi(
|
||||||
|
r=r,
|
||||||
|
activity=source.activity * scaling,
|
||||||
|
branching_ratio=source.isotope.b,
|
||||||
|
mu_mass_air=source.isotope.mu_mass_air,
|
||||||
|
air_density=landscape.air_density
|
||||||
|
)
|
||||||
|
|
||||||
|
total_phi += phi_source
|
||||||
|
|
||||||
|
return total_phi
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_fluence_along_path(
|
||||||
|
landscape: "Landscape",
|
||||||
|
points_per_segment: int = 10
|
||||||
|
) -> Tuple[np.ndarray, np.ndarray]:
|
||||||
|
path = landscape.path
|
||||||
|
num_segments = len(path.segments)
|
||||||
|
|
||||||
|
xnew = np.linspace(
|
||||||
|
path.x_list[0],
|
||||||
|
path.x_list[-1],
|
||||||
|
num=num_segments*points_per_segment)
|
||||||
|
|
||||||
|
ynew = np.interp(xnew, path.x_list, path.y_list)
|
||||||
|
|
||||||
|
z = np.full(xnew.shape, path.z)
|
||||||
|
full_positions = np.c_[xnew, ynew, z]
|
||||||
|
phi_result = calculate_fluence_at(landscape, full_positions)
|
||||||
|
|
||||||
|
dist_travelled = np.linspace(
|
||||||
|
full_positions[0, 0],
|
||||||
|
path.length,
|
||||||
|
len(phi_result)
|
||||||
|
)
|
||||||
|
|
||||||
|
return dist_travelled, phi_result
|
||||||
7
src/pg_rad/plotting/__init__.py
Normal file
7
src/pg_rad/plotting/__init__.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
# do not expose internal logger when running mkinit
|
||||||
|
__ignore__ = ["logger"]
|
||||||
|
from pg_rad.plotting import landscape_plotter
|
||||||
|
|
||||||
|
from pg_rad.plotting.landscape_plotter import (LandscapeSlicePlotter,)
|
||||||
|
|
||||||
|
__all__ = ['LandscapeSlicePlotter', 'landscape_plotter']
|
||||||
115
src/pg_rad/plotting/landscape_plotter.py
Normal file
115
src/pg_rad/plotting/landscape_plotter.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
from matplotlib.axes import Axes
|
||||||
|
from matplotlib.patches import Circle
|
||||||
|
|
||||||
|
from numpy import median
|
||||||
|
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class LandscapeSlicePlotter:
|
||||||
|
def plot(
|
||||||
|
self,
|
||||||
|
landscape: Landscape,
|
||||||
|
z: int = 0,
|
||||||
|
show: bool = True,
|
||||||
|
save: bool = False,
|
||||||
|
ax: Axes | None = None
|
||||||
|
):
|
||||||
|
"""Plot a top-down slice of the landscape at a height z.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
landscape (Landscape): the landscape to plot
|
||||||
|
z (int, optional): Height at which to plot slice. Defaults to 0.
|
||||||
|
show (bool, optional): Show the plot. Defaults to True.
|
||||||
|
save (bool, optional): Save the plot. Defaults to False.
|
||||||
|
""" """
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.z = z
|
||||||
|
|
||||||
|
if not ax:
|
||||||
|
fig, ax = plt.subplots()
|
||||||
|
|
||||||
|
self._draw_base(ax, landscape)
|
||||||
|
self._draw_path(ax, landscape)
|
||||||
|
self._draw_point_sources(ax, landscape)
|
||||||
|
|
||||||
|
ax.set_aspect("equal")
|
||||||
|
|
||||||
|
if save and not ax:
|
||||||
|
landscape_name = landscape.name.lower().replace(' ', '_')
|
||||||
|
filename = f"{landscape_name}_z{self.z}.png"
|
||||||
|
plt.savefig(filename)
|
||||||
|
logger.info("Plot saved to file: "+filename)
|
||||||
|
|
||||||
|
if show and not ax:
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
return ax
|
||||||
|
|
||||||
|
def _draw_base(self, ax, landscape: Landscape):
|
||||||
|
width, height = landscape.size[:2]
|
||||||
|
|
||||||
|
ax.set_xlim(right=max(width, .5*height))
|
||||||
|
|
||||||
|
# if the road is very flat, we center it vertically (looks better)
|
||||||
|
if median(landscape.path.y_list) == 0:
|
||||||
|
h = max(height, .5*width)
|
||||||
|
ax.set_ylim(bottom=-h//2,
|
||||||
|
top=h//2)
|
||||||
|
else:
|
||||||
|
ax.set_ylim(top=max(height, .5*width))
|
||||||
|
|
||||||
|
ax.set_xlabel("X [m]")
|
||||||
|
ax.set_ylabel("Y [m]")
|
||||||
|
ax.set_title(f"Landscape (top-down, z = {self.z})")
|
||||||
|
|
||||||
|
def _draw_path(self, ax, landscape):
|
||||||
|
if landscape.path.z <= self.z:
|
||||||
|
ax.plot(
|
||||||
|
landscape.path.x_list,
|
||||||
|
landscape.path.y_list,
|
||||||
|
linestyle='-',
|
||||||
|
marker='|',
|
||||||
|
markersize=3,
|
||||||
|
linewidth=1
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Path is above the slice height z."
|
||||||
|
"It will not show on the plot."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _draw_point_sources(self, ax, landscape):
|
||||||
|
for s in landscape.point_sources:
|
||||||
|
x, y, z = s.pos
|
||||||
|
if z <= self.z:
|
||||||
|
dot = Circle(
|
||||||
|
(x, y),
|
||||||
|
radius=5,
|
||||||
|
color=s.color,
|
||||||
|
zorder=5
|
||||||
|
)
|
||||||
|
|
||||||
|
ax.text(
|
||||||
|
x + 0.06,
|
||||||
|
y + 0.06,
|
||||||
|
s.name+", z="+str(z),
|
||||||
|
color=s.color,
|
||||||
|
fontsize=10,
|
||||||
|
ha="left",
|
||||||
|
va="bottom",
|
||||||
|
zorder=6
|
||||||
|
)
|
||||||
|
|
||||||
|
ax.add_patch(dot)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Source {s.name} is above slice height z."
|
||||||
|
"It will not show on the plot."
|
||||||
|
)
|
||||||
100
src/pg_rad/plotting/result_plotter.py
Normal file
100
src/pg_rad/plotting/result_plotter.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
from matplotlib import pyplot as plt
|
||||||
|
from matplotlib.gridspec import GridSpec
|
||||||
|
|
||||||
|
from .landscape_plotter import LandscapeSlicePlotter
|
||||||
|
from pg_rad.simulator.outputs import SimulationOutput
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
|
||||||
|
|
||||||
|
class ResultPlotter:
|
||||||
|
def __init__(self, landscape: Landscape, output: SimulationOutput):
|
||||||
|
self.landscape = landscape
|
||||||
|
self.count_rate_res = output.count_rate
|
||||||
|
self.source_res = output.sources
|
||||||
|
|
||||||
|
def plot(self, landscape_z: float = 0):
|
||||||
|
fig = plt.figure(figsize=(12, 10), constrained_layout=True)
|
||||||
|
fig.suptitle(self.landscape.name)
|
||||||
|
gs = GridSpec(
|
||||||
|
3,
|
||||||
|
2,
|
||||||
|
width_ratios=[0.5, 0.5],
|
||||||
|
height_ratios=[0.7, 0.15, 0.15],
|
||||||
|
hspace=0.2)
|
||||||
|
|
||||||
|
ax1 = fig.add_subplot(gs[0, 0])
|
||||||
|
self._draw_count_rate(ax1)
|
||||||
|
|
||||||
|
ax2 = fig.add_subplot(gs[0, 1])
|
||||||
|
self._plot_landscape(ax2, landscape_z)
|
||||||
|
|
||||||
|
ax3 = fig.add_subplot(gs[1, :])
|
||||||
|
self._draw_table(ax3)
|
||||||
|
|
||||||
|
ax4 = fig.add_subplot(gs[2, :])
|
||||||
|
self._draw_source_table(ax4)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
def _plot_landscape(self, ax, z):
|
||||||
|
lp = LandscapeSlicePlotter()
|
||||||
|
ax = lp.plot(landscape=self.landscape, z=z, ax=ax, show=False)
|
||||||
|
return ax
|
||||||
|
|
||||||
|
def _draw_count_rate(self, ax):
|
||||||
|
x = self.count_rate_res.arc_length
|
||||||
|
y = self.count_rate_res.count_rate
|
||||||
|
ax.plot(x, y, label='Count rate', color='r')
|
||||||
|
ax.set_title('Count rate')
|
||||||
|
ax.set_xlabel('Arc length s [m]')
|
||||||
|
ax.set_ylabel('Counts')
|
||||||
|
ax.legend()
|
||||||
|
|
||||||
|
def _draw_table(self, ax):
|
||||||
|
ax.set_axis_off()
|
||||||
|
ax.set_title('Simulation parameters')
|
||||||
|
cols = ('Parameter', 'Value')
|
||||||
|
data = [
|
||||||
|
["Air density (kg/m^3)", round(self.landscape.air_density, 3)],
|
||||||
|
["Total path length (m)", round(self.landscape.path.length, 3)]
|
||||||
|
]
|
||||||
|
|
||||||
|
ax.table(
|
||||||
|
cellText=data,
|
||||||
|
colLabels=cols,
|
||||||
|
loc='center'
|
||||||
|
)
|
||||||
|
|
||||||
|
return ax
|
||||||
|
|
||||||
|
def _draw_source_table(self, ax):
|
||||||
|
ax.set_axis_off()
|
||||||
|
ax.set_title('Point sources')
|
||||||
|
|
||||||
|
cols = (
|
||||||
|
'Name',
|
||||||
|
'Isotope',
|
||||||
|
'Activity (MBq)',
|
||||||
|
'Position (m)',
|
||||||
|
'Dist. to path (m)'
|
||||||
|
)
|
||||||
|
|
||||||
|
# this formats position to tuple
|
||||||
|
data = [
|
||||||
|
[
|
||||||
|
s.name,
|
||||||
|
s.isotope,
|
||||||
|
s.activity,
|
||||||
|
"("+", ".join(f"{val:.2f}" for val in s.position)+")",
|
||||||
|
round(s.dist_from_path, 2)
|
||||||
|
]
|
||||||
|
for s in self.source_res
|
||||||
|
]
|
||||||
|
ax.table(
|
||||||
|
cellText=data,
|
||||||
|
colLabels=cols,
|
||||||
|
loc='center'
|
||||||
|
)
|
||||||
|
|
||||||
|
return ax
|
||||||
0
src/pg_rad/simulator/__init__.py
Normal file
0
src/pg_rad/simulator/__init__.py
Normal file
63
src/pg_rad/simulator/engine.py
Normal file
63
src/pg_rad/simulator/engine.py
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
from typing import List
|
||||||
|
|
||||||
|
from pg_rad.landscape.landscape import Landscape
|
||||||
|
from pg_rad.simulator.outputs import (
|
||||||
|
CountRateOutput,
|
||||||
|
SimulationOutput,
|
||||||
|
SourceOutput
|
||||||
|
)
|
||||||
|
from pg_rad.physics.fluence import calculate_fluence_along_path
|
||||||
|
from pg_rad.utils.projection import minimal_distance_to_path
|
||||||
|
from pg_rad.inputparser.specs import RuntimeSpec, SimulationOptionsSpec
|
||||||
|
|
||||||
|
|
||||||
|
class SimulationEngine:
|
||||||
|
"""Takes a fully built landscape and produces results."""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
landscape: Landscape,
|
||||||
|
runtime_spec=RuntimeSpec,
|
||||||
|
sim_spec=SimulationOptionsSpec
|
||||||
|
):
|
||||||
|
|
||||||
|
self.landscape = landscape
|
||||||
|
self.runtime_spec = runtime_spec
|
||||||
|
self.sim_spec = sim_spec
|
||||||
|
|
||||||
|
def simulate(self) -> SimulationOutput:
|
||||||
|
"""Compute everything and return structured output."""
|
||||||
|
|
||||||
|
count_rate_results = self._calculate_count_rate_along_path()
|
||||||
|
source_results = self._calculate_point_source_distance_to_path()
|
||||||
|
|
||||||
|
return SimulationOutput(
|
||||||
|
name=self.landscape.name,
|
||||||
|
count_rate=count_rate_results,
|
||||||
|
sources=source_results
|
||||||
|
)
|
||||||
|
|
||||||
|
def _calculate_count_rate_along_path(self) -> CountRateOutput:
|
||||||
|
arc_length, phi = calculate_fluence_along_path(self.landscape)
|
||||||
|
return CountRateOutput(arc_length, phi)
|
||||||
|
|
||||||
|
def _calculate_point_source_distance_to_path(self) -> List[SourceOutput]:
|
||||||
|
|
||||||
|
path = self.landscape.path
|
||||||
|
source_output = []
|
||||||
|
for s in self.landscape.point_sources:
|
||||||
|
dist_to_path = minimal_distance_to_path(
|
||||||
|
path.x_list,
|
||||||
|
path.y_list,
|
||||||
|
path.z,
|
||||||
|
s.pos)
|
||||||
|
|
||||||
|
source_output.append(
|
||||||
|
SourceOutput(
|
||||||
|
s.name,
|
||||||
|
s.isotope.name,
|
||||||
|
s.activity,
|
||||||
|
s.pos,
|
||||||
|
dist_to_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
return source_output
|
||||||
25
src/pg_rad/simulator/outputs.py
Normal file
25
src/pg_rad/simulator/outputs.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CountRateOutput:
|
||||||
|
arc_length: List[float]
|
||||||
|
count_rate: List[float]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SourceOutput:
|
||||||
|
name: str
|
||||||
|
isotope: str
|
||||||
|
activity: float
|
||||||
|
position: Tuple[float, float, float]
|
||||||
|
dist_from_path: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimulationOutput:
|
||||||
|
name: str
|
||||||
|
count_rate: CountRateOutput
|
||||||
|
sources: List[SourceOutput]
|
||||||
@ -1,43 +0,0 @@
|
|||||||
from pg_rad.objects import Object
|
|
||||||
from pg_rad.isotope import Isotope
|
|
||||||
|
|
||||||
class PointSource(Object):
|
|
||||||
_id_counter = 1
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
x: float,
|
|
||||||
y: float,
|
|
||||||
z: float,
|
|
||||||
activity: int,
|
|
||||||
isotope: Isotope,
|
|
||||||
name: str | None = None,
|
|
||||||
color: str = "red"):
|
|
||||||
"""A point source.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
x (float): X coordinate.
|
|
||||||
y (float): Y coordinate.
|
|
||||||
z (float): Z coordinate.
|
|
||||||
activity (int): Activity A in MBq.
|
|
||||||
isotope (Isotope): The isotope.
|
|
||||||
name (str | None, optional): Can give the source a unique name.
|
|
||||||
Defaults to None, making the name sequential.
|
|
||||||
(Source-1, Source-2, etc.).
|
|
||||||
color (str, optional): Matplotlib compatible color string. Defaults to "red".
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.id = PointSource._id_counter
|
|
||||||
PointSource._id_counter += 1
|
|
||||||
|
|
||||||
# default name derived from ID if not provided
|
|
||||||
if name is None:
|
|
||||||
name = f"Source {self.id}"
|
|
||||||
|
|
||||||
super().__init__(x, y, z, name, color)
|
|
||||||
|
|
||||||
self.activity = activity
|
|
||||||
self.isotope = isotope
|
|
||||||
self.color = color
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"PointSource(name={self.name}, pos={(self.x, self.y, self.z)}, isotope={self.isotope.name}, A={self.activity} MBq)"
|
|
||||||
0
src/pg_rad/utils/__init__.py
Normal file
0
src/pg_rad/utils/__init__.py
Normal file
168
src/pg_rad/utils/projection.py
Normal file
168
src/pg_rad/utils/projection.py
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pg_rad.exceptions.exceptions import OutOfBoundsError
|
||||||
|
|
||||||
|
|
||||||
|
def rel_to_abs_source_position(
|
||||||
|
x_list: List,
|
||||||
|
y_list: List,
|
||||||
|
path_z: int | float,
|
||||||
|
along_path: int | float,
|
||||||
|
side: str,
|
||||||
|
dist_from_path: int | float,
|
||||||
|
z_rel: int | float = 0
|
||||||
|
):
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
arc_length = np.cumsum(segment_lengths)
|
||||||
|
arc_length = np.insert(arc_length, 0, 0)
|
||||||
|
|
||||||
|
# find index of the segment where the source should be placed
|
||||||
|
s_i = np.searchsorted(arc_length, along_path, side='right') - 1
|
||||||
|
if not 0 <= s_i < len(segments):
|
||||||
|
raise OutOfBoundsError(
|
||||||
|
"along_path must be less than the length of the path!"
|
||||||
|
)
|
||||||
|
|
||||||
|
# fraction of segment length where to place the point source
|
||||||
|
segment_fraction = (
|
||||||
|
(along_path - arc_length[s_i])
|
||||||
|
/ segment_lengths[s_i]
|
||||||
|
)
|
||||||
|
|
||||||
|
# tangential vector at point where point source to be placed
|
||||||
|
point_on_path = path[s_i] + segment_fraction * segments[s_i]
|
||||||
|
tangent_vec = segments[s_i] / segment_lengths[s_i]
|
||||||
|
|
||||||
|
if side not in {"left", "right"}:
|
||||||
|
raise ValueError("side must be 'left' or 'right'")
|
||||||
|
|
||||||
|
orientation = 1 if side == "left" else -1
|
||||||
|
perp_vec = orientation * np.array([-tangent_vec[1], tangent_vec[0]])
|
||||||
|
|
||||||
|
x_abs, y_abs = point_on_path + dist_from_path * perp_vec
|
||||||
|
z_abs = path_z + z_rel
|
||||||
|
|
||||||
|
return x_abs, y_abs, z_abs
|
||||||
|
|
||||||
|
|
||||||
|
def minimal_distance_to_path(
|
||||||
|
x_list: List[float],
|
||||||
|
y_list: List[float],
|
||||||
|
path_z: float,
|
||||||
|
pos: Tuple[float, float, float]
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Compute minimal Euclidean distance from pos (x,y,z)
|
||||||
|
to interpolated polyline path at height path_z.
|
||||||
|
"""
|
||||||
|
x, y, z = pos
|
||||||
|
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
point_xy = np.array([x, y])
|
||||||
|
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
min_dist = np.inf
|
||||||
|
|
||||||
|
for p0, seg, seg_len in zip(path[:-1], segments, segment_lengths):
|
||||||
|
if seg_len == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
t = np.dot(point_xy - p0, seg) / (seg_len ** 2)
|
||||||
|
t = np.clip(t, 0.0, 1.0)
|
||||||
|
|
||||||
|
projection_xy = p0 + t * seg
|
||||||
|
|
||||||
|
dx, dy = point_xy - projection_xy
|
||||||
|
dz = z - path_z
|
||||||
|
|
||||||
|
dist = np.sqrt(dx**2 + dy**2 + dz**2)
|
||||||
|
|
||||||
|
if dist < min_dist:
|
||||||
|
min_dist = dist
|
||||||
|
|
||||||
|
return min_dist
|
||||||
|
|
||||||
|
|
||||||
|
def abs_to_rel_source_position(
|
||||||
|
x_list: List,
|
||||||
|
y_list: List,
|
||||||
|
path_z: int | float,
|
||||||
|
pos: Tuple[float, float, float],
|
||||||
|
) -> Tuple[float, float, str, float]:
|
||||||
|
"""
|
||||||
|
Convert absolute (x,y,z) position into:
|
||||||
|
along_path,
|
||||||
|
dist_from_path,
|
||||||
|
side ("left" or "right"),
|
||||||
|
z_rel
|
||||||
|
"""
|
||||||
|
|
||||||
|
x, y, z = pos
|
||||||
|
|
||||||
|
path = np.column_stack((x_list, y_list))
|
||||||
|
point = np.array([x, y])
|
||||||
|
|
||||||
|
segments = np.diff(path, axis=0)
|
||||||
|
segment_lengths = np.hypot(segments[:, 0], segments[:, 1])
|
||||||
|
|
||||||
|
arc_length = np.cumsum(segment_lengths)
|
||||||
|
arc_length = np.insert(arc_length, 0, 0)
|
||||||
|
|
||||||
|
min_dist = np.inf
|
||||||
|
best_projection = None
|
||||||
|
best_s_i = None
|
||||||
|
best_fraction = None
|
||||||
|
best_signed_dist = None
|
||||||
|
|
||||||
|
for (i, (p0, seg, seg_len)) in enumerate(
|
||||||
|
zip(path[:-1], segments, segment_lengths)
|
||||||
|
):
|
||||||
|
if seg_len == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# project point onto infinite line
|
||||||
|
t = np.dot(point - p0, seg) / (seg_len ** 2)
|
||||||
|
|
||||||
|
# clamp to segment
|
||||||
|
t_clamped = np.clip(t, 0.0, 1.0)
|
||||||
|
|
||||||
|
projection = p0 + t_clamped * seg
|
||||||
|
diff_vec = point - projection
|
||||||
|
dist = np.linalg.norm(diff_vec)
|
||||||
|
|
||||||
|
if dist < min_dist:
|
||||||
|
min_dist = dist
|
||||||
|
best_projection = projection
|
||||||
|
best_s_i = i
|
||||||
|
best_fraction = t_clamped
|
||||||
|
|
||||||
|
# tangent and perpendicular
|
||||||
|
tangent_vec = seg / seg_len
|
||||||
|
perp_vec = np.array([-tangent_vec[1], tangent_vec[0]])
|
||||||
|
|
||||||
|
signed_dist = np.dot(diff_vec, perp_vec)
|
||||||
|
best_signed_dist = signed_dist
|
||||||
|
|
||||||
|
if best_projection is None:
|
||||||
|
raise ValueError("Could not project point onto path.")
|
||||||
|
|
||||||
|
# Compute along_path
|
||||||
|
along_path = (
|
||||||
|
arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Side and distance
|
||||||
|
side = "left" if best_signed_dist > 0 else "right"
|
||||||
|
dist_from_path = abs(best_signed_dist)
|
||||||
|
|
||||||
|
# z relative
|
||||||
|
z_rel = z - path_z
|
||||||
|
|
||||||
|
return along_path, dist_from_path, side, z_rel
|
||||||
0
src/road_gen/__init__.py
Normal file
0
src/road_gen/__init__.py
Normal file
0
src/road_gen/generators/__init__.py
Normal file
0
src/road_gen/generators/__init__.py
Normal file
55
src/road_gen/generators/base_road_generator.py
Normal file
55
src/road_gen/generators/base_road_generator.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import secrets
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class BaseRoadGenerator:
|
||||||
|
"""A base generator object for generating a road of a specified length."""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
length: int | float,
|
||||||
|
ds: int | float,
|
||||||
|
velocity: int | float,
|
||||||
|
mu: float = 0.7,
|
||||||
|
g: float = 9.81,
|
||||||
|
seed: int | None = None
|
||||||
|
):
|
||||||
|
"""Initialize a BaseGenerator with a given or random seed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length (int | float): The total length of the road in meters.
|
||||||
|
ds (int | float): The step size in meters.
|
||||||
|
velocity (int | float): Velocity in meters per second.
|
||||||
|
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
|
||||||
|
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
|
||||||
|
seed (int | None, optional): Set a seed for the generator.
|
||||||
|
Defaults to a random seed.
|
||||||
|
"""
|
||||||
|
if seed is None:
|
||||||
|
seed = secrets.randbits(32)
|
||||||
|
|
||||||
|
if not isinstance(seed, int):
|
||||||
|
raise TypeError("seed must be an integer or None.")
|
||||||
|
|
||||||
|
if not isinstance(length, int | float):
|
||||||
|
raise TypeError("Length must be an integer or float in meters.")
|
||||||
|
|
||||||
|
if not isinstance(ds, int | float):
|
||||||
|
raise TypeError("Step size must be integer or float in meters.")
|
||||||
|
|
||||||
|
if not isinstance(velocity, int | float):
|
||||||
|
raise TypeError(
|
||||||
|
"Velocity must be integer or float in meters per second."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.length = length
|
||||||
|
self.ds = ds
|
||||||
|
|
||||||
|
self.velocity = velocity
|
||||||
|
self.min_radius = (velocity ** 2) / (g * mu)
|
||||||
|
|
||||||
|
self.seed = seed
|
||||||
|
self._rng = np.random.default_rng(seed)
|
||||||
|
|
||||||
|
def generate(self):
|
||||||
|
pass
|
||||||
143
src/road_gen/generators/segmented_road_generator.py
Normal file
143
src/road_gen/generators/segmented_road_generator.py
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .base_road_generator import BaseRoadGenerator
|
||||||
|
from road_gen.prefabs import prefabs
|
||||||
|
from road_gen.integrator.integrator import integrate_road
|
||||||
|
|
||||||
|
from pg_rad.configs import defaults
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SegmentedRoadGenerator(BaseRoadGenerator):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
length: int | float | list[int | float],
|
||||||
|
ds: int | float,
|
||||||
|
velocity: int | float,
|
||||||
|
mu: float = 0.7,
|
||||||
|
g: float = 9.81,
|
||||||
|
seed: int | None = None
|
||||||
|
):
|
||||||
|
"""Initialize a SegmentedRoadGenerator with given or random seed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length (int | float): The total length of the road in meters.
|
||||||
|
ds (int | float): The step size in meters.
|
||||||
|
velocity (int | float): Velocity in meters per second.
|
||||||
|
mu (float): Coefficient of friction. Defaults to 0.7 (dry asphalt).
|
||||||
|
g (float): Acceleration due to gravity (m/s^2). Defaults to 9.81.
|
||||||
|
seed (int | None, optional): Set a seed for the generator.
|
||||||
|
Defaults to random seed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if isinstance(length, list):
|
||||||
|
length = sum(
|
||||||
|
[seg_len for seg_len in length if seg_len is not None]
|
||||||
|
)
|
||||||
|
super().__init__(length, ds, velocity, mu, g, seed)
|
||||||
|
|
||||||
|
def generate(
|
||||||
|
self,
|
||||||
|
segments: list[str],
|
||||||
|
lengths: list[int | float] | None = None,
|
||||||
|
angles: list[int | float] | None = None,
|
||||||
|
alpha: float = defaults.DEFAULT_ALPHA,
|
||||||
|
min_turn_angle: float = defaults.DEFAULT_MIN_TURN_ANGLE,
|
||||||
|
max_turn_angle: float = defaults.DEFAULT_MAX_TURN_ANGLE
|
||||||
|
) -> Tuple[np.ndarray, np.ndarray]:
|
||||||
|
"""Generate a curvature profile from a list of segments.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
segments (list[str]): List of segments.
|
||||||
|
alpha (float, optional): Dirichlet concentration parameter.
|
||||||
|
A higher value leads to more uniform apportionment of the
|
||||||
|
length amongst the segments, while a lower value allows more
|
||||||
|
random apportionment. Defaults to 1.0.
|
||||||
|
min_turn_angle (float, optional): Minimum turn angle in degrees for
|
||||||
|
random sampling of turn radius. Does nothing if `angle_list` is
|
||||||
|
provided or no `turn_*` segement is specified in the `segments`
|
||||||
|
list.
|
||||||
|
min_turn_angle (float, optional): Maximum turn angle in degrees for
|
||||||
|
random sampling of turn radius. Does nothing if `angle_list` is
|
||||||
|
provided or no `turn_*` segement is specified in the `segments`
|
||||||
|
list.
|
||||||
|
Raises:
|
||||||
|
ValueError: Raised when a turn
|
||||||
|
is too tight given its segment length and the velocity.
|
||||||
|
To fix this, you can try to reduce the amount of segments or
|
||||||
|
increase length. Increasing alpha
|
||||||
|
(Dirichlet concentration parameter) can also help because this
|
||||||
|
reduces the odds of very small lengths being assigned to
|
||||||
|
turn segments.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple[np.ndarray, np.ndarray]: x and y coordinates of the
|
||||||
|
waypoints describing the random road.
|
||||||
|
"""
|
||||||
|
existing_prefabs = prefabs.PREFABS.keys()
|
||||||
|
if not all(segment in existing_prefabs for segment in segments):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid segment type provided. Available choices"
|
||||||
|
f"{existing_prefabs}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.segments = segments
|
||||||
|
self.alpha = alpha
|
||||||
|
num_points = np.ceil(self.length / self.ds).astype(int)
|
||||||
|
|
||||||
|
# divide num_points into len(segments) randomly sized parts.
|
||||||
|
if isinstance(self.length, list):
|
||||||
|
parts = self.length
|
||||||
|
else:
|
||||||
|
parts = self._rng.dirichlet(
|
||||||
|
np.full(len(segments), alpha),
|
||||||
|
size=1)[0]
|
||||||
|
parts = parts * num_points
|
||||||
|
parts = np.round(parts).astype(int)
|
||||||
|
|
||||||
|
# correct round off so the sum of parts is still total length L.
|
||||||
|
if sum(parts) != num_points:
|
||||||
|
parts[0] += num_points - sum(parts)
|
||||||
|
|
||||||
|
curvature = np.zeros(num_points)
|
||||||
|
current_index = 0
|
||||||
|
|
||||||
|
for seg_name, seg_length in zip(segments, parts):
|
||||||
|
seg_function = prefabs.PREFABS[seg_name]
|
||||||
|
|
||||||
|
if seg_name == 'straight':
|
||||||
|
curvature_s = seg_function(seg_length)
|
||||||
|
else:
|
||||||
|
R_min_angle = seg_length / np.deg2rad(max_turn_angle)
|
||||||
|
R_max_angle = seg_length / np.deg2rad(min_turn_angle)
|
||||||
|
|
||||||
|
# physics limit
|
||||||
|
R_min = max(self.min_radius, R_min_angle)
|
||||||
|
|
||||||
|
if R_min > R_max_angle:
|
||||||
|
raise ValueError(
|
||||||
|
f"{seg_name} with length {seg_length} does not have "
|
||||||
|
"a possible radius. The minimum for the provided "
|
||||||
|
"velocity and friction coefficient is "
|
||||||
|
f"{self.min_radius}, but the possible range is "
|
||||||
|
f"({R_min}, {R_max_angle})"
|
||||||
|
)
|
||||||
|
|
||||||
|
rand_radius = self._rng.uniform(R_min, R_max_angle)
|
||||||
|
|
||||||
|
if seg_name.startswith("u_turn"):
|
||||||
|
curvature_s = seg_function(rand_radius)
|
||||||
|
else:
|
||||||
|
curvature_s = seg_function(seg_length, rand_radius)
|
||||||
|
|
||||||
|
curvature[current_index:(current_index + seg_length)] = curvature_s
|
||||||
|
current_index += seg_length
|
||||||
|
|
||||||
|
x, y = integrate_road(curvature)
|
||||||
|
|
||||||
|
return x * self.ds, y * self.ds
|
||||||
0
src/road_gen/integrator/__init__.py
Normal file
0
src/road_gen/integrator/__init__.py
Normal file
21
src/road_gen/integrator/integrator.py
Normal file
21
src/road_gen/integrator/integrator.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def integrate_road(curvature: np.ndarray, ds: float = 1.0):
|
||||||
|
"""Integrate along the curvature field to obtain X and Y coordinates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
curvature (np.ndarray): The curvature field.
|
||||||
|
ds (float, optional): _description_. Defaults to 1.0.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
x, y (np.ndarray): X and Y waypoints.
|
||||||
|
"""
|
||||||
|
|
||||||
|
theta = np.zeros(len(curvature))
|
||||||
|
theta[1:] = np.cumsum(curvature[:-1] * ds)
|
||||||
|
|
||||||
|
x = np.cumsum(np.cos(theta) * ds)
|
||||||
|
y = np.cumsum(np.sin(theta) * ds)
|
||||||
|
|
||||||
|
return x, y
|
||||||
0
src/road_gen/prefabs/__init__.py
Normal file
0
src/road_gen/prefabs/__init__.py
Normal file
20
src/road_gen/prefabs/prefabs.py
Normal file
20
src/road_gen/prefabs/prefabs.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def straight(length: int) -> np.ndarray:
|
||||||
|
return np.zeros(length)
|
||||||
|
|
||||||
|
|
||||||
|
def turn_left(length: int, radius: float) -> np.ndarray:
|
||||||
|
return np.full(length, 1.0 / radius)
|
||||||
|
|
||||||
|
|
||||||
|
def turn_right(length: int, radius: float) -> np.ndarray:
|
||||||
|
return -turn_left(length, radius)
|
||||||
|
|
||||||
|
|
||||||
|
PREFABS = {
|
||||||
|
'straight': straight,
|
||||||
|
'turn_left': turn_left,
|
||||||
|
'turn_right': turn_right,
|
||||||
|
}
|
||||||
29
tests/test_attenuation_functions.py
Normal file
29
tests/test_attenuation_functions.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from pg_rad.physics import get_mass_attenuation_coeff
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("energy,mu", [
|
||||||
|
(1.00000E-03, 3.606E+03),
|
||||||
|
(1.00000E-02, 5.120E+00),
|
||||||
|
(1.00000E-01, 1.541E-01),
|
||||||
|
(1.00000E+00, 6.358E-02),
|
||||||
|
(1.00000E+01, 2.045E-02)
|
||||||
|
])
|
||||||
|
def test_exact_attenuation_retrieval(energy, mu):
|
||||||
|
"""
|
||||||
|
Test if retrieval of values that are exactly in the table is correct.
|
||||||
|
"""
|
||||||
|
func_mu = get_mass_attenuation_coeff(energy)
|
||||||
|
assert pytest.approx(func_mu, rel=1E-6) == mu
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("energy,mu", [
|
||||||
|
(0.662, 0.0778)
|
||||||
|
])
|
||||||
|
def test_attenuation_interpolation(energy, mu):
|
||||||
|
"""
|
||||||
|
Test retreival for Cs-137 mass attenuation coefficient.
|
||||||
|
"""
|
||||||
|
interp_mu = get_mass_attenuation_coeff(energy)
|
||||||
|
assert pytest.approx(interp_mu, rel=1E-2) == mu
|
||||||
53
tests/test_fluence_rate.py
Normal file
53
tests/test_fluence_rate.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from pg_rad.inputparser.parser import ConfigParser
|
||||||
|
from pg_rad.landscape.director import LandscapeDirector
|
||||||
|
from pg_rad.physics import calculate_fluence_at
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def phi_ref(test_landscape):
|
||||||
|
source = test_landscape.point_sources[0]
|
||||||
|
|
||||||
|
r = np.linalg.norm(np.array([10, 10, 0]) - np.array(source.pos))
|
||||||
|
|
||||||
|
A = source.activity * 1E6
|
||||||
|
b = source.isotope.b
|
||||||
|
mu_air = source.isotope.mu_mass_air * test_landscape.air_density
|
||||||
|
mu_air *= 0.1
|
||||||
|
|
||||||
|
return A * b * np.exp(-mu_air * r) / (4 * np.pi * r**2)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_landscape():
|
||||||
|
|
||||||
|
test_yaml = """
|
||||||
|
name: Test landscape
|
||||||
|
speed: 8.33
|
||||||
|
acquisition_time: 1
|
||||||
|
|
||||||
|
path:
|
||||||
|
length: 1000
|
||||||
|
segments:
|
||||||
|
- straight
|
||||||
|
|
||||||
|
sources:
|
||||||
|
test_source:
|
||||||
|
activity_MBq: 100
|
||||||
|
position: [0, 0, 0]
|
||||||
|
isotope: CS137
|
||||||
|
"""
|
||||||
|
|
||||||
|
cp = ConfigParser(test_yaml).parse()
|
||||||
|
landscape = LandscapeDirector.build_from_config(cp)
|
||||||
|
return landscape
|
||||||
|
|
||||||
|
|
||||||
|
def test_single_source_fluence(phi_ref, test_landscape):
|
||||||
|
phi = calculate_fluence_at(
|
||||||
|
test_landscape,
|
||||||
|
np.array([10, 10, 0]),
|
||||||
|
)
|
||||||
|
assert pytest.approx(phi[0], rel=1E-3) == phi_ref
|
||||||
@ -1,47 +0,0 @@
|
|||||||
import pathlib
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from pg_rad.dataloader import load_data
|
|
||||||
from pg_rad.path import Path, path_from_RT90
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def test_df():
|
|
||||||
csv_path = pathlib.Path(__file__).parent / "data/coordinates.csv"
|
|
||||||
return load_data(csv_path)
|
|
||||||
|
|
||||||
def test_piecewise_regression(test_df):
|
|
||||||
"""_Verify whether the intermediate points deviate less than 0.1 SD._"""
|
|
||||||
|
|
||||||
p_full = path_from_RT90(
|
|
||||||
test_df,
|
|
||||||
east_col="East",
|
|
||||||
north_col="North",
|
|
||||||
simplify_path=False
|
|
||||||
)
|
|
||||||
|
|
||||||
p_simpl = path_from_RT90(
|
|
||||||
test_df,
|
|
||||||
east_col="East",
|
|
||||||
north_col="North",
|
|
||||||
simplify_path=True
|
|
||||||
)
|
|
||||||
|
|
||||||
x_f = np.array(p_full.x_list)
|
|
||||||
y_f = np.array(p_full.y_list)
|
|
||||||
|
|
||||||
x_s = np.array(p_simpl.x_list)
|
|
||||||
y_s = np.array(p_simpl.y_list)
|
|
||||||
|
|
||||||
sd = np.std(y_f)
|
|
||||||
|
|
||||||
for xb, yb in zip(x_s[1:-1], y_s[1:-1]):
|
|
||||||
# find nearest original x index
|
|
||||||
idx = np.argmin(np.abs(x_f - xb))
|
|
||||||
deviation = abs(yb - y_f[idx])
|
|
||||||
|
|
||||||
assert deviation < 0.1 * sd, (
|
|
||||||
f"Breakpoint deviation too large: {deviation:.4f} "
|
|
||||||
f"(threshold {0.1 * sd:.4f}) at x={xb:.2f}"
|
|
||||||
)
|
|
||||||
@ -1,28 +1,35 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from pg_rad.objects import Source
|
from pg_rad.objects import PointSource
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def test_sources():
|
def test_sources():
|
||||||
|
iso = "CS137"
|
||||||
pos_a = np.random.rand(3)
|
pos_a = np.random.rand(3)
|
||||||
pos_b = np.random.rand(3)
|
pos_b = np.random.rand(3)
|
||||||
|
|
||||||
a = Source(*tuple(pos_a), strength = None)
|
a = PointSource(position=pos_a, activity_MBq=None, isotope=iso)
|
||||||
b = Source(*tuple(pos_b), strength = None)
|
b = PointSource(position=pos_b, activity_MBq=None, isotope=iso)
|
||||||
|
|
||||||
return pos_a, pos_b, a, b
|
return pos_a, pos_b, a, b
|
||||||
|
|
||||||
|
|
||||||
def test_if_distances_equal(test_sources):
|
def test_if_distances_equal(test_sources):
|
||||||
"""_Verify whether from object A to object B is the same as B to A._"""
|
"""
|
||||||
|
Verify whether distance from PointSource A to B is the same as B to A.
|
||||||
|
"""
|
||||||
|
|
||||||
_, _, a, b = test_sources
|
_, _, a, b = test_sources
|
||||||
|
|
||||||
assert a.distance_to(b) == b.distance_to(a)
|
assert a.distance_to(b) == b.distance_to(a)
|
||||||
|
|
||||||
|
|
||||||
def test_distance_calculation(test_sources):
|
def test_distance_calculation(test_sources):
|
||||||
"""_Verify whether distance between two static objects (e.g. sources)
|
"""
|
||||||
is calculated correctly._"""
|
Verify whether distance between PointSources is calculated correctly.
|
||||||
|
"""
|
||||||
|
|
||||||
pos_a, pos_b, a, b = test_sources
|
pos_a, pos_b, a, b = test_sources
|
||||||
|
|
||||||
Reference in New Issue
Block a user