mirror of
https://github.com/pim-n/pg-rad
synced 2026-03-10 19:48:12 +01:00
@ -5,6 +5,9 @@ build-backend = "setuptools.build_meta"
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"pg_rad.data" = ["*.csv"]
|
||||
|
||||
[project]
|
||||
name = "pg-rad"
|
||||
version = "0.2.1"
|
||||
@ -18,7 +21,6 @@ dependencies = [
|
||||
"matplotlib>=3.9.2",
|
||||
"numpy>=2",
|
||||
"pandas>=2.3.1",
|
||||
"piecewise_regression==1.5.0",
|
||||
"pyyaml>=6.0.2"
|
||||
]
|
||||
license = "MIT"
|
||||
|
||||
@ -3,7 +3,6 @@ __ignore__ = ["logger"]
|
||||
|
||||
from pg_rad.path import path
|
||||
|
||||
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,
|
||||
simplify_path,)
|
||||
from pg_rad.path.path import (Path, PathSegment, path_from_RT90,)
|
||||
|
||||
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90', 'simplify_path']
|
||||
__all__ = ['Path', 'PathSegment', 'path', 'path_from_RT90']
|
||||
|
||||
@ -5,9 +5,7 @@ import math
|
||||
from matplotlib import pyplot as plt
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import piecewise_regression
|
||||
|
||||
from pg_rad.exceptions import ConvergenceError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -44,8 +42,7 @@ class Path:
|
||||
def __init__(
|
||||
self,
|
||||
coord_list: Sequence[tuple[float, float]],
|
||||
z: float = 0,
|
||||
path_simplify: bool = False
|
||||
z: float = 0
|
||||
):
|
||||
"""Construct a path of sequences based on a list of coordinates.
|
||||
|
||||
@ -53,8 +50,6 @@ class Path:
|
||||
coord_list (Sequence[tuple[float, float]]): List of x,y
|
||||
coordinates.
|
||||
z (float, optional): Height of the path. Defaults to 0.
|
||||
path_simplify (bool, optional): Whether to
|
||||
pg_rad.path.simplify_path(). Defaults to False.
|
||||
"""
|
||||
|
||||
if len(coord_list) < 2:
|
||||
@ -63,12 +58,6 @@ class Path:
|
||||
|
||||
x, y = tuple(zip(*coord_list))
|
||||
|
||||
if path_simplify:
|
||||
try:
|
||||
x, y = simplify_path(list(x), list(y))
|
||||
except ConvergenceError:
|
||||
logger.warning("Continuing without simplifying path.")
|
||||
|
||||
self.x_list = list(x)
|
||||
self.y_list = list(y)
|
||||
|
||||
@ -102,83 +91,6 @@ class Path:
|
||||
plt.plot(self.x_list, self.y_list, **kwargs)
|
||||
|
||||
|
||||
def simplify_path(
|
||||
x: Sequence[float],
|
||||
y: Sequence[float],
|
||||
keep_endpoints_equal: bool = False,
|
||||
n_breakpoints: int = 3
|
||||
):
|
||||
"""From full resolution x and y arrays, return a piecewise linearly
|
||||
approximated/simplified pair of x and y arrays.
|
||||
|
||||
This function uses the `piecewise_regression` package. From a full set of
|
||||
coordinate pairs, the function fits linear sections, automatically finding
|
||||
the number of breakpoints and their positions.
|
||||
|
||||
On why the default value of n_breakpoints is 3, from the
|
||||
`piecewise_regression` docs:
|
||||
"If you do not have (or do not want to use) initial guesses for the number
|
||||
of breakpoints, you can set it to n_breakpoints=3, and the algorithm will
|
||||
randomly generate start_values. With a 50% chance, the bootstrap restarting
|
||||
algorithm will either use the best currently converged breakpoints or
|
||||
randomly generate new start_values, escaping the local optima in two ways
|
||||
in order to find better global optima."
|
||||
|
||||
Args:
|
||||
x (Sequence[float]): Full list of x coordinates.
|
||||
y (Sequence[float]): Full list of y coordinates.
|
||||
keep_endpoints_equal (bool, optional): Whether or not to force start
|
||||
and end to be exactly equal to the original. This will worsen the
|
||||
linear approximation at the beginning and end of path. Defaults to
|
||||
False.
|
||||
n_breakpoints (int, optional): Number of breakpoints. Defaults to 3.
|
||||
|
||||
Returns:
|
||||
x (list[float]): Reduced list of x coordinates.
|
||||
y (list[float]): Reduced list of y coordinates.
|
||||
|
||||
Raises:
|
||||
ConvergenceError: If the fitting algorithm failed to simplify the path.
|
||||
|
||||
Reference:
|
||||
Pilgrim, C., (2021). piecewise-regression (aka segmented regression)
|
||||
in Python. Journal of Open Source Software,
|
||||
6(68), 3859, https://doi.org/10.21105/joss.03859.
|
||||
"""
|
||||
|
||||
logger.debug("Attempting piecewise regression on path.")
|
||||
|
||||
pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints)
|
||||
pw_res = pw_fit.get_results()
|
||||
|
||||
if pw_res is None:
|
||||
logger.warning("Piecewise regression failed to converge.")
|
||||
raise ConvergenceError("Piecewise regression failed to converge.")
|
||||
|
||||
est = pw_res['estimates']
|
||||
|
||||
# extract and sort breakpoints
|
||||
breakpoints_x = sorted(
|
||||
v['estimate'] for k, v in est.items() if k.startswith('breakpoint')
|
||||
)
|
||||
|
||||
x_points = [x[0]] + breakpoints_x + [x[-1]]
|
||||
|
||||
y_points = pw_fit.predict(x_points)
|
||||
|
||||
if keep_endpoints_equal:
|
||||
logger.debug("Forcing endpoint equality.")
|
||||
y_points[0] = y[0]
|
||||
y_points[-1] = y[-1]
|
||||
|
||||
logger.info(
|
||||
f"Piecewise regression reduced path from \
|
||||
{len(x)-1} to {len(x_points)-1} segments."
|
||||
)
|
||||
|
||||
return x_points, y_points
|
||||
|
||||
|
||||
def path_from_RT90(
|
||||
df: pd.DataFrame,
|
||||
east_col: str = "East",
|
||||
|
||||
@ -1,47 +0,0 @@
|
||||
import pathlib
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from pg_rad.dataloader import load_data
|
||||
from pg_rad.path import Path, path_from_RT90
|
||||
|
||||
@pytest.fixture
|
||||
def test_df():
|
||||
csv_path = pathlib.Path(__file__).parent / "data/coordinates.csv"
|
||||
return load_data(csv_path)
|
||||
|
||||
def test_piecewise_regression(test_df):
|
||||
"""_Verify whether the intermediate points deviate less than 0.1 SD._"""
|
||||
|
||||
p_full = path_from_RT90(
|
||||
test_df,
|
||||
east_col="East",
|
||||
north_col="North",
|
||||
simplify_path=False
|
||||
)
|
||||
|
||||
p_simpl = path_from_RT90(
|
||||
test_df,
|
||||
east_col="East",
|
||||
north_col="North",
|
||||
simplify_path=True
|
||||
)
|
||||
|
||||
x_f = np.array(p_full.x_list)
|
||||
y_f = np.array(p_full.y_list)
|
||||
|
||||
x_s = np.array(p_simpl.x_list)
|
||||
y_s = np.array(p_simpl.y_list)
|
||||
|
||||
sd = np.std(y_f)
|
||||
|
||||
for xb, yb in zip(x_s[1:-1], y_s[1:-1]):
|
||||
# find nearest original x index
|
||||
idx = np.argmin(np.abs(x_f - xb))
|
||||
deviation = abs(yb - y_f[idx])
|
||||
|
||||
assert deviation < 0.1 * sd, (
|
||||
f"Breakpoint deviation too large: {deviation:.4f} "
|
||||
f"(threshold {0.1 * sd:.4f}) at x={xb:.2f}"
|
||||
)
|
||||
Reference in New Issue
Block a user