diff --git a/src/pg_rad/__init__.py b/src/pg_rad/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pg_rad/configs/logging.yml b/src/pg_rad/configs/logging.yml new file mode 100644 index 0000000..5a8f1a5 --- /dev/null +++ b/src/pg_rad/configs/logging.yml @@ -0,0 +1,15 @@ +version: 1 +disable_existing_loggers: false +formatters: + simple: + format: '%(asctime)s - %(levelname)s: %(message)s' +handlers: + stdout: + class: logging.StreamHandler + formatter: simple + stream: ext://sys.stdout +loggers: + root: + level: INFO + handlers: + - stdout \ No newline at end of file diff --git a/src/pg_rad/dataloader.py b/src/pg_rad/dataloader.py new file mode 100644 index 0000000..029941f --- /dev/null +++ b/src/pg_rad/dataloader.py @@ -0,0 +1,26 @@ +import pandas as pd + +from pg_rad.logger import setup_logger +from pg_rad.exceptions import DataLoadError, InvalidCSVError + +logger = setup_logger(__name__) + +def load_data(filename: str) -> pd.DataFrame: + logger.debug(f"Attempting to load data from {filename}") + + try: + df = pd.read_csv(filename, delimiter=',') + + except FileNotFoundError as e: + logger.error(f"File not found: {filename}") + raise DataLoadError(f"File does not exist: {filename}") from e + + except pd.errors.ParserError as e: + logger.error(f"Invalid CSV format: {filename}") + raise InvalidCSVError(f"Invalid CSV file: {filename}") from e + + except Exception as e: + logger.exception(f"Unexpected error while loading {filename}") + raise DataLoadError("Unexpected error while loading data") from e + + return df \ No newline at end of file diff --git a/src/pg_rad/exceptions.py b/src/pg_rad/exceptions.py new file mode 100644 index 0000000..d67d3b2 --- /dev/null +++ b/src/pg_rad/exceptions.py @@ -0,0 +1,8 @@ +class ConvergenceError(Exception): + """Raised when an algorithm fails to converge.""" + +class DataLoadError(Exception): + """Base class for data loading errors.""" + +class InvalidCSVError(DataLoadError): + """Raised when a file is not a valid CSV.""" \ No newline at end of file diff --git a/src/pg_rad/logger.py b/src/pg_rad/logger.py new file mode 100644 index 0000000..d0ba6b4 --- /dev/null +++ b/src/pg_rad/logger.py @@ -0,0 +1,17 @@ +import logging +import logging.config +import pathlib + +import yaml + +def setup_logger(name): + logger = logging.getLogger(name) + + base_dir = pathlib.Path(__file__).resolve().parent + config_file = base_dir / "configs" / "logging.yml" + + with open(config_file) as f: + config = yaml.safe_load(f) + + logging.config.dictConfig(config) + return logger \ No newline at end of file diff --git a/src/pg_rad/path.py b/src/pg_rad/path.py new file mode 100644 index 0000000..2f4f873 --- /dev/null +++ b/src/pg_rad/path.py @@ -0,0 +1,187 @@ +from collections.abc import Sequence +import math + +from matplotlib import pyplot as plt +import numpy as np +import pandas as pd +import piecewise_regression + +from pg_rad.exceptions import ConvergenceError +from pg_rad.logger import setup_logger + +logger = setup_logger(__name__) + +class PathSegment: + def __init__(self, a: tuple[float, float], b: tuple[float, float]): + """_A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b)._ + + Args: + a (tuple[float, float]): _The starting point (x_a, y_a)._ + b (tuple[float, float]): _The final point (x_b, y_b)._ + """ + self.a = a + self.b = b + + def get_length(self) -> float: + return math.dist(self.a, self.b) + + length = property(get_length) + + def __str__(self) -> str: + return str(f"({self.a}, {self.b})") + + def __getitem__(self, index) -> float: + if index == 0: + return self.a + elif index == 1: + return self.b + else: + raise IndexError + +class Path: + def __init__( + self, + coord_list: Sequence[tuple[float, float]], + z: float = 0, + simplify_path = False + ): + """Construct a path of sequences based on a list of coordinates. + + Args: + coord_list (Sequence[tuple[float, float]]): _description_ + z (float, optional): _description_. Defaults to 0. + + Raises: + ValueError: _description_ + """ + + if len(coord_list) < 2: + raise ValueError("Must provide at least two coordinates as a list of tuples, e.g. [(x1, y1), (x2, y2)]") + + x, y = tuple(zip(*coord_list)) + + if simplify_path: + try: + x, y = piecewise_regression_on_path(list(x), list(y)) + except ConvergenceError: + logger.warning("Continuing without simplifying path.") + + self.x_list = list(x) + self.y_list = list(y) + + coord_list = list(zip(x, y)) + + self.segments = [PathSegment(i, ip1) for i, ip1 in zip(coord_list, coord_list[1:])] + + self.z = z + + def get_length(self) -> float: + return sum([s.length for s in self.segments]) + + length = property(get_length) + + def __getitem__(self, index) -> PathSegment: + return self.segments[index] + + def __str__(self) -> str: + return str([str(s) for s in self.segments]) + + def plot(self, **kwargs): + """ + Plot the path using matplotlib. + """ + plt.plot(self.x_list, self.y_list, **kwargs) + +def piecewise_regression_on_path( + x: Sequence[float], + y: Sequence[float], + keep_endpoints_equal: bool = False, + n_breakpoints: int = 3 + ): + """_Take a Path object and return a piece-wise linear approximated Path._ + + This function uses the `piecewise_regression` package. From a full set of + coordinate pairs, the function fits linear sections, automatically finding + the number of breakpoints and their positions. + + On why the default value of n_breakpoints is 3, from the `piecewise_regression` + docs: + "If you do not have (or do not want to use) initial guesses for the number + of breakpoints, you can set it to n_breakpoints=3, and the algorithm will + randomly generate start_values. With a 50% chance, the bootstrap restarting + algorithm will either use the best currently converged breakpoints or + randomly generate new start_values, escaping the local optima in two ways in + order to find better global optima." + + Args: + x (Sequence[float]): _Full list of x coordinates._ + y (Sequence[float]): _Full list of y coordinates._ + keep_endpoints_equal (bool, optional): _Whether or not to force start + and end to be exactly equal to the original. This will worsen the linear + approximation at the beginning and end of path. Defaults to False._ + n_breakpoints (int, optional): _Number of breakpoints. Defaults to 3._ + + Returns: + x (Sequence[float]): _Reduced list of x coordinates._ + y (Sequence[float]): _Reduced list of y coordinates._ + + Reference: + Pilgrim, C., (2021). piecewise-regression (aka segmented regression) in Python. Journal of Open Source Software, 6(68), 3859, https://doi.org/10.21105/joss.03859. + """ + + logger.debug(f"Attempting piecewise regression on path.") + + pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints) + pw_res = pw_fit.get_results() + + if pw_res == None: + logger.error("Piecewise regression failed to converge.") + raise ConvergenceError("Piecewise regression failed to converge.") + + est = pw_res['estimates'] + + # extract and sort breakpoints + breakpoints_x = sorted( + v['estimate'] for k, v in est.items() if k.startswith('breakpoint') + ) + + x_points = [x[0]] + breakpoints_x + [x[-1]] + + y_points = pw_fit.predict(x_points) + + if keep_endpoints_equal: + logger.debug("Forcing endpoint equality.") + y_points[0] = y[0] + y_points[-1] = y[-1] + + logger.info( + f"Piecewise regression reduced path from {len(x)-1} to {len(x_points)-1} segments." + ) + + return x_points, y_points + +def path_from_RT90( + df: pd.DataFrame, + east_col: str = "East", + north_col: str = "North", + **kwargs + ) -> Path: + + """_Construct a path from East and North formatted coordinates (RT90) in a Pandas DataFrame._ + + Args: + df (pd.DataFrame): _DataFrame containing at least the two columns noted in the cols argument._ + east_col (str): _The column name for the East coordinates._ + north_col (str): _The column name for the North coordinates._ + + Returns: + Path: _A Path object built from the aquisition coordinates in the DataFrame._ + """ + + east_arr = np.array(df[east_col]) - min(df[east_col]) + north_arr = np.array(df[north_col]) - min(df[north_col]) + + coord_pairs = list(zip(east_arr, north_arr)) + + path = Path(coord_pairs, **kwargs) + return path \ No newline at end of file