Compare commits

..

12 Commits

11 changed files with 217 additions and 256 deletions

View File

@ -5,6 +5,9 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
"pg_rad.data" = ["*.csv"]
[project]
name = "pg-rad"
version = "0.2.1"
@ -18,7 +21,6 @@ dependencies = [
"matplotlib>=3.9.2",
"numpy>=2",
"pandas>=2.3.1",
"piecewise_regression==1.5.0",
"pyyaml>=6.0.2"
]
license = "MIT"

View File

@ -4,7 +4,7 @@ __ignore__ = ["logger"]
from pg_rad.exceptions import exceptions
from pg_rad.exceptions.exceptions import (ConvergenceError, DataLoadError,
InvalidCSVError,)
InvalidCSVError, OutOfBoundsError,)
__all__ = ['ConvergenceError', 'DataLoadError', 'InvalidCSVError',
'exceptions']
'OutOfBoundsError', 'exceptions']

View File

@ -8,3 +8,7 @@ class DataLoadError(Exception):
class InvalidCSVError(DataLoadError):
"""Raised when a file is not a valid CSV."""
class OutOfBoundsError(Exception):
"""Raised when an object is attempted to be placed out of bounds."""

View File

@ -1,4 +1,10 @@
from .isotope import Isotope
CS137 = Isotope("Cs-137", E=661.66, b=0.851)
class CS137(Isotope):
def __init__(self):
super.__init__(
name="Cs-137",
E=661.66,
b=0.851
)

View File

@ -3,6 +3,6 @@ __ignore__ = ["logger"]
from pg_rad.landscape import landscape
from pg_rad.landscape.landscape import (Landscape, create_landscape_from_path,)
from pg_rad.landscape.landscape import (Landscape, LandscapeBuilder,)
__all__ = ['Landscape', 'create_landscape_from_path', 'landscape']
__all__ = ['Landscape', 'LandscapeBuilder', 'landscape']

View File

@ -1,117 +1,46 @@
import logging
from typing import Self
from matplotlib import pyplot as plt
from matplotlib.patches import Circle
import numpy as np
from numpy.typing import ArrayLike
from pg_rad.path import Path
from pg_rad.dataloader import load_data
from pg_rad.exceptions import OutOfBoundsError
from pg_rad.objects import PointSource
from pg_rad.path import Path, path_from_RT90
from pg_rad.physics.fluence import phi_single_source
logger = logging.getLogger(__name__)
class Landscape:
"""A generic Landscape that can contain a Path and sources.
Args:
air_density (float, optional): Air density, kg/m^3. Defaults to 1.243.
size (int | tuple[int, int, int], optional): Size of the world.
Defaults to 500.
scale (str, optional): The scale of the size argument passed.
Defaults to 'meters'.
"""
A generic Landscape that can contain a Path and sources.
"""
def __init__(
self,
air_density: float = 1.243,
size: int | tuple[int, int, int] = 500,
scale: str = 'meters'
path: Path,
point_sources: list[PointSource] = [],
size: tuple[int, int, int] = [500, 500, 50],
air_density: float = 1.243
):
if isinstance(size, int):
self.world = np.zeros((size, size, size))
elif isinstance(size, tuple) and len(size) == 3:
self.world = np.zeros(size)
else:
raise TypeError("size must be integer or a tuple of 3 integers.")
self.air_density = air_density
self.scale = scale
self.path: Path = None
self.sources: list[PointSource] = []
logger.debug("Landscape initialized.")
def plot(self, z: float | int = 0):
"""Plot a slice of the world at a height `z`.
"""Initialize a landscape.
Args:
z (int, optional): Height of slice. Defaults to 0.
path (Path): A Path object.
point_sources (list[PointSource], optional): List of point sources.
air_density (float, optional): Air density in kg/m^3.
Defaults to 1.243.
size (tuple[int, int, int], optional): (x,y,z) dimensions of world
in meters. Defaults to [500, 500, 50].
Returns:
fig, ax: Matplotlib figure objects.
"""
x_lim, y_lim, _ = self.world.shape
fig, ax = plt.subplots()
ax.set_xlim(right=x_lim)
ax.set_ylim(top=y_lim)
ax.set_xlabel(f"X [{self.scale}]")
ax.set_ylabel(f"Y [{self.scale}]")
if self.path is not None:
ax.plot(self.path.x_list, self.path.y_list, 'bo-')
for s in self.sources:
if np.isclose(s.z, z):
dot = Circle(
(s.x, s.y),
radius=5,
color=s.color,
zorder=5
)
ax.text(
s.x + 0.06,
s.y + 0.06,
s.name,
color=s.color,
fontsize=10,
ha="left",
va="bottom",
zorder=6
)
ax.add_patch(dot)
return fig, ax
def add_sources(self, *sources: PointSource):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
ValueError: If the source is outside the boundaries of the
landscape.
TypeError: _description_
"""
if not any(
(0 <= source.pos[0] <= self.world.shape[0] or
0 <= source.pos[1] <= self.world.shape[1] or
0 <= source.pos[2] <= self.world.shape[2])
for source in sources
):
raise ValueError("One or more sources are outside the landscape!")
self.sources.extend(sources)
def set_path(self, path: Path):
"""
Set the path in the landscape.
"""
if not isinstance(path, Path):
raise TypeError("path must be of type Path.")
self.path = path
self.point_sources = point_sources
self.size = size
self.air_density = air_density
logger.debug("Landscape initialized.")
def calculate_fluence_at(self, pos: tuple):
total_phi = 0.
@ -128,25 +57,94 @@ class Landscape:
return total_phi
def calculate_fluence_along_path(self):
if self.path is None:
raise ValueError("Path is not set!")
pass
def create_landscape_from_path(path: Path, max_z: float | int = 50):
"""Generate a landscape from a path, using its dimensions to determine
the size of the landscape.
class LandscapeBuilder:
def __init__(self):
self._path = None
self._point_sources = []
self._size = None
self._air_density = None
Args:
path (Path): A Path object describing the trajectory.
max_z (int, optional): Height of the world. Defaults to 50 meters.
def set_air_density(self, air_density) -> Self:
"""Set the air density of the world."""
self._air_density = air_density
return self
Returns:
landscape (pg_rad.landscape.Landscape): A landscape with dimensions
based on the provided Path.
"""
max_x = np.ceil(max(path.x_list))
max_y = np.ceil(max(path.y_list))
def set_landscape_size(self, size: tuple[int, int, int]) -> Self:
"""Set the size of the landscape in meters (x,y,z)."""
if self._path and any(p > s for p, s in zip(self._path.size, size)):
raise OutOfBoundsError(
"Cannot set landscape size smaller than the path."
)
landscape = Landscape(size=(max_x, max_y, max_z))
landscape.path = path
return landscape
self._size = size
logger.debug("Size of the landscape has been updated.")
return self
def set_path_from_experimental_data(
self,
filename: str,
z: int,
east_col: str = "East",
north_col: str = "North"
) -> Self:
df = load_data(filename)
self._path = path_from_RT90(
df=df,
east_col=east_col,
north_col=north_col
)
# The size of the landscape will be updated if
# 1) _size is not set, or
# 2) _size is too small to contain the path.
needs_resize = (
not self._size
or any(p > s for p, s in zip(self._path.size, self._size))
)
if needs_resize:
if not self._size:
logger.info("Landscape size set to path dimensions.")
else:
logger.warning(
"Path exceeds current landscape size. "
"Expanding landscape to accommodate it."
)
self.set_landscape_size(self._path.size)
return self
def set_point_sources(self, *sources):
"""Add one or more point sources to the world.
Args:
*sources (pg_rad.sources.PointSource): One or more sources,
passed as Source1, Source2, ...
Raises:
OutOfBoundsError: If any source is outside the boundaries of the
landscape.
"""
if any(
any(p < 0 or p >= s for p, s in zip(source.pos, self._size))
for source in sources
):
raise OutOfBoundsError(
"One or more sources attempted to "
"be placed outside the landscape."
)
self._point_sources = sources
def build(self):
return Landscape(
path=self._path,
point_sources=self._point_sources,
size=self._size,
air_density=self._air_density
)

View File

@ -3,7 +3,6 @@ __ignore__ = ["logger"]
from pg_rad.path import path
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,
simplify_path,)
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,)
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90', 'simplify_path']
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90']

View File

@ -5,9 +5,7 @@ import math
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import piecewise_regression
from pg_rad.exceptions import ConvergenceError
logger = logging.getLogger(__name__)
@ -44,8 +42,7 @@ class Path:
def __init__(
self,
coord_list: Sequence[tuple[float, float]],
z: float = 0,
path_simplify: bool = False
z: float = 0
):
"""Construct a path of sequences based on a list of coordinates.
@ -53,8 +50,6 @@ class Path:
coord_list (Sequence[tuple[float, float]]): List of x,y
coordinates.
z (float, optional): Height of the path. Defaults to 0.
path_simplify (bool, optional): Whether to
pg_rad.path.simplify_path(). Defaults to False.
"""
if len(coord_list) < 2:
@ -63,12 +58,6 @@ class Path:
x, y = tuple(zip(*coord_list))
if path_simplify:
try:
x, y = simplify_path(list(x), list(y))
except ConvergenceError:
logger.warning("Continuing without simplifying path.")
self.x_list = list(x)
self.y_list = list(y)
@ -81,6 +70,11 @@ class Path:
]
self.z = z
self.size = (
np.ceil(max(self.x_list)),
np.ceil(max(self.y_list)),
z
)
logger.debug("Path created.")
@ -102,83 +96,6 @@ class Path:
plt.plot(self.x_list, self.y_list, **kwargs)
def simplify_path(
x: Sequence[float],
y: Sequence[float],
keep_endpoints_equal: bool = False,
n_breakpoints: int = 3
):
"""From full resolution x and y arrays, return a piecewise linearly
approximated/simplified pair of x and y arrays.
This function uses the `piecewise_regression` package. From a full set of
coordinate pairs, the function fits linear sections, automatically finding
the number of breakpoints and their positions.
On why the default value of n_breakpoints is 3, from the
`piecewise_regression` docs:
"If you do not have (or do not want to use) initial guesses for the number
of breakpoints, you can set it to n_breakpoints=3, and the algorithm will
randomly generate start_values. With a 50% chance, the bootstrap restarting
algorithm will either use the best currently converged breakpoints or
randomly generate new start_values, escaping the local optima in two ways
in order to find better global optima."
Args:
x (Sequence[float]): Full list of x coordinates.
y (Sequence[float]): Full list of y coordinates.
keep_endpoints_equal (bool, optional): Whether or not to force start
and end to be exactly equal to the original. This will worsen the
linear approximation at the beginning and end of path. Defaults to
False.
n_breakpoints (int, optional): Number of breakpoints. Defaults to 3.
Returns:
x (list[float]): Reduced list of x coordinates.
y (list[float]): Reduced list of y coordinates.
Raises:
ConvergenceError: If the fitting algorithm failed to simplify the path.
Reference:
Pilgrim, C., (2021). piecewise-regression (aka segmented regression)
in Python. Journal of Open Source Software,
6(68), 3859, https://doi.org/10.21105/joss.03859.
"""
logger.debug("Attempting piecewise regression on path.")
pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints)
pw_res = pw_fit.get_results()
if pw_res is None:
logger.warning("Piecewise regression failed to converge.")
raise ConvergenceError("Piecewise regression failed to converge.")
est = pw_res['estimates']
# extract and sort breakpoints
breakpoints_x = sorted(
v['estimate'] for k, v in est.items() if k.startswith('breakpoint')
)
x_points = [x[0]] + breakpoints_x + [x[-1]]
y_points = pw_fit.predict(x_points)
if keep_endpoints_equal:
logger.debug("Forcing endpoint equality.")
y_points[0] = y[0]
y_points[-1] = y[-1]
logger.info(
f"Piecewise regression reduced path from \
{len(x)-1} to {len(x_points)-1} segments."
)
return x_points, y_points
def path_from_RT90(
df: pd.DataFrame,
east_col: str = "East",

View File

@ -0,0 +1,7 @@
# do not expose internal logger when running mkinit
__ignore__ = ["logger"]
from pg_rad.plotting import landscape_plotter
from pg_rad.plotting.landscape_plotter import (LandscapeSlicePlotter,)
__all__ = ['LandscapeSlicePlotter', 'landscape_plotter']

View File

@ -0,0 +1,75 @@
import logging
from matplotlib import pyplot as plt
from matplotlib.patches import Circle
from pg_rad.landscape import Landscape
logger = logging.getLogger(__name__)
class LandscapeSlicePlotter:
def plot(self, landscape: Landscape, z: int = 0):
"""Plot a top-down slice of the landscape at a height z.
Args:
landscape (Landscape): the landscape to plot
z (int, optional): Height at which to plot slice. Defaults to 0.
""" """
"""
self.z = z
fig, ax = plt.subplots()
self._draw_base(ax, landscape)
self._draw_path(ax, landscape)
self._draw_point_sources(ax, landscape)
ax.set_aspect("equal")
plt.show()
def _draw_base(self, ax, landscape):
width, height = landscape.size[:2]
ax.set_xlim(right=width)
ax.set_ylim(top=height)
ax.set_xlabel("X [m]")
ax.set_ylabel("Y [m]")
ax.set_title(f"Landscape (top-down, z = {self.z})")
def _draw_path(self, ax, landscape):
if landscape.path.z < self.z:
ax.plot(landscape.path.x_list, landscape.path.y_list, 'bo-')
else:
logger.warning(
"Path is above the slice height z."
"It will not show on the plot."
)
def _draw_point_sources(self, ax, landscape):
for s in landscape.point_sources:
if s.z <= self.z:
dot = Circle(
(s.x, s.y),
radius=5,
color=s.color,
zorder=5
)
ax.text(
s.x + 0.06,
s.y + 0.06,
s.name,
color=s.color,
fontsize=10,
ha="left",
va="bottom",
zorder=6
)
ax.add_patch(dot)
else:
logger.warning(
f"Source {s.name} is above slice height z."
"It will not show on the plot."
)

View File

@ -1,47 +0,0 @@
import pathlib
import numpy as np
import pytest
from pg_rad.dataloader import load_data
from pg_rad.path import Path, path_from_RT90
@pytest.fixture
def test_df():
csv_path = pathlib.Path(__file__).parent / "data/coordinates.csv"
return load_data(csv_path)
def test_piecewise_regression(test_df):
"""_Verify whether the intermediate points deviate less than 0.1 SD._"""
p_full = path_from_RT90(
test_df,
east_col="East",
north_col="North",
simplify_path=False
)
p_simpl = path_from_RT90(
test_df,
east_col="East",
north_col="North",
simplify_path=True
)
x_f = np.array(p_full.x_list)
y_f = np.array(p_full.y_list)
x_s = np.array(p_simpl.x_list)
y_s = np.array(p_simpl.y_list)
sd = np.std(y_f)
for xb, yb in zip(x_s[1:-1], y_s[1:-1]):
# find nearest original x index
idx = np.argmin(np.abs(x_f - xb))
deviation = abs(yb - y_f[idx])
assert deviation < 0.1 * sd, (
f"Breakpoint deviation too large: {deviation:.4f} "
f"(threshold {0.1 * sd:.4f}) at x={xb:.2f}"
)