Add path generation functionality.

This commit is contained in:
Pim Nelissen
2026-01-27 15:18:21 +01:00
parent afabff6e73
commit bb2f81fc20
6 changed files with 253 additions and 0 deletions

0
src/pg_rad/__init__.py Normal file
View File

View File

@ -0,0 +1,15 @@
version: 1
disable_existing_loggers: false
formatters:
simple:
format: '%(asctime)s - %(levelname)s: %(message)s'
handlers:
stdout:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stdout
loggers:
root:
level: INFO
handlers:
- stdout

26
src/pg_rad/dataloader.py Normal file
View File

@ -0,0 +1,26 @@
import pandas as pd
from pg_rad.logger import setup_logger
from pg_rad.exceptions import DataLoadError, InvalidCSVError
logger = setup_logger(__name__)
def load_data(filename: str) -> pd.DataFrame:
logger.debug(f"Attempting to load data from {filename}")
try:
df = pd.read_csv(filename, delimiter=',')
except FileNotFoundError as e:
logger.error(f"File not found: {filename}")
raise DataLoadError(f"File does not exist: {filename}") from e
except pd.errors.ParserError as e:
logger.error(f"Invalid CSV format: {filename}")
raise InvalidCSVError(f"Invalid CSV file: {filename}") from e
except Exception as e:
logger.exception(f"Unexpected error while loading {filename}")
raise DataLoadError("Unexpected error while loading data") from e
return df

8
src/pg_rad/exceptions.py Normal file
View File

@ -0,0 +1,8 @@
class ConvergenceError(Exception):
"""Raised when an algorithm fails to converge."""
class DataLoadError(Exception):
"""Base class for data loading errors."""
class InvalidCSVError(DataLoadError):
"""Raised when a file is not a valid CSV."""

17
src/pg_rad/logger.py Normal file
View File

@ -0,0 +1,17 @@
import logging
import logging.config
import pathlib
import yaml
def setup_logger(name):
logger = logging.getLogger(name)
base_dir = pathlib.Path(__file__).resolve().parent
config_file = base_dir / "configs" / "logging.yml"
with open(config_file) as f:
config = yaml.safe_load(f)
logging.config.dictConfig(config)
return logger

187
src/pg_rad/path.py Normal file
View File

@ -0,0 +1,187 @@
from collections.abc import Sequence
import math
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import piecewise_regression
from pg_rad.exceptions import ConvergenceError
from pg_rad.logger import setup_logger
logger = setup_logger(__name__)
class PathSegment:
def __init__(self, a: tuple[float, float], b: tuple[float, float]):
"""_A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b)._
Args:
a (tuple[float, float]): _The starting point (x_a, y_a)._
b (tuple[float, float]): _The final point (x_b, y_b)._
"""
self.a = a
self.b = b
def get_length(self) -> float:
return math.dist(self.a, self.b)
length = property(get_length)
def __str__(self) -> str:
return str(f"({self.a}, {self.b})")
def __getitem__(self, index) -> float:
if index == 0:
return self.a
elif index == 1:
return self.b
else:
raise IndexError
class Path:
def __init__(
self,
coord_list: Sequence[tuple[float, float]],
z: float = 0,
simplify_path = False
):
"""Construct a path of sequences based on a list of coordinates.
Args:
coord_list (Sequence[tuple[float, float]]): _description_
z (float, optional): _description_. Defaults to 0.
Raises:
ValueError: _description_
"""
if len(coord_list) < 2:
raise ValueError("Must provide at least two coordinates as a list of tuples, e.g. [(x1, y1), (x2, y2)]")
x, y = tuple(zip(*coord_list))
if simplify_path:
try:
x, y = piecewise_regression_on_path(list(x), list(y))
except ConvergenceError:
logger.warning("Continuing without simplifying path.")
self.x_list = list(x)
self.y_list = list(y)
coord_list = list(zip(x, y))
self.segments = [PathSegment(i, ip1) for i, ip1 in zip(coord_list, coord_list[1:])]
self.z = z
def get_length(self) -> float:
return sum([s.length for s in self.segments])
length = property(get_length)
def __getitem__(self, index) -> PathSegment:
return self.segments[index]
def __str__(self) -> str:
return str([str(s) for s in self.segments])
def plot(self, **kwargs):
"""
Plot the path using matplotlib.
"""
plt.plot(self.x_list, self.y_list, **kwargs)
def piecewise_regression_on_path(
x: Sequence[float],
y: Sequence[float],
keep_endpoints_equal: bool = False,
n_breakpoints: int = 3
):
"""_Take a Path object and return a piece-wise linear approximated Path._
This function uses the `piecewise_regression` package. From a full set of
coordinate pairs, the function fits linear sections, automatically finding
the number of breakpoints and their positions.
On why the default value of n_breakpoints is 3, from the `piecewise_regression`
docs:
"If you do not have (or do not want to use) initial guesses for the number
of breakpoints, you can set it to n_breakpoints=3, and the algorithm will
randomly generate start_values. With a 50% chance, the bootstrap restarting
algorithm will either use the best currently converged breakpoints or
randomly generate new start_values, escaping the local optima in two ways in
order to find better global optima."
Args:
x (Sequence[float]): _Full list of x coordinates._
y (Sequence[float]): _Full list of y coordinates._
keep_endpoints_equal (bool, optional): _Whether or not to force start
and end to be exactly equal to the original. This will worsen the linear
approximation at the beginning and end of path. Defaults to False._
n_breakpoints (int, optional): _Number of breakpoints. Defaults to 3._
Returns:
x (Sequence[float]): _Reduced list of x coordinates._
y (Sequence[float]): _Reduced list of y coordinates._
Reference:
Pilgrim, C., (2021). piecewise-regression (aka segmented regression) in Python. Journal of Open Source Software, 6(68), 3859, https://doi.org/10.21105/joss.03859.
"""
logger.debug(f"Attempting piecewise regression on path.")
pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints)
pw_res = pw_fit.get_results()
if pw_res == None:
logger.error("Piecewise regression failed to converge.")
raise ConvergenceError("Piecewise regression failed to converge.")
est = pw_res['estimates']
# extract and sort breakpoints
breakpoints_x = sorted(
v['estimate'] for k, v in est.items() if k.startswith('breakpoint')
)
x_points = [x[0]] + breakpoints_x + [x[-1]]
y_points = pw_fit.predict(x_points)
if keep_endpoints_equal:
logger.debug("Forcing endpoint equality.")
y_points[0] = y[0]
y_points[-1] = y[-1]
logger.info(
f"Piecewise regression reduced path from {len(x)-1} to {len(x_points)-1} segments."
)
return x_points, y_points
def path_from_RT90(
df: pd.DataFrame,
east_col: str = "East",
north_col: str = "North",
**kwargs
) -> Path:
"""_Construct a path from East and North formatted coordinates (RT90) in a Pandas DataFrame._
Args:
df (pd.DataFrame): _DataFrame containing at least the two columns noted in the cols argument._
east_col (str): _The column name for the East coordinates._
north_col (str): _The column name for the North coordinates._
Returns:
Path: _A Path object built from the aquisition coordinates in the DataFrame._
"""
east_arr = np.array(df[east_col]) - min(df[east_col])
north_arr = np.array(df[north_col]) - min(df[north_col])
coord_pairs = list(zip(east_arr, north_arr))
path = Path(coord_pairs, **kwargs)
return path