diff --git a/src/pg_rad/dataloader/dataloader.py b/src/pg_rad/dataloader/dataloader.py index 866e18f..fd0a28a 100644 --- a/src/pg_rad/dataloader/dataloader.py +++ b/src/pg_rad/dataloader/dataloader.py @@ -6,6 +6,7 @@ from pg_rad.exceptions import DataLoadError, InvalidCSVError logger = logging.getLogger(__name__) + def load_data(filename: str) -> pd.DataFrame: logger.debug(f"Attempting to load file: {filename}") @@ -25,4 +26,4 @@ def load_data(filename: str) -> pd.DataFrame: raise DataLoadError("Unexpected error while loading data") from e logger.debug(f"File loaded: {filename}") - return df \ No newline at end of file + return df diff --git a/src/pg_rad/exceptions/exceptions.py b/src/pg_rad/exceptions/exceptions.py index d67d3b2..60a1b34 100644 --- a/src/pg_rad/exceptions/exceptions.py +++ b/src/pg_rad/exceptions/exceptions.py @@ -1,8 +1,10 @@ class ConvergenceError(Exception): """Raised when an algorithm fails to converge.""" + class DataLoadError(Exception): """Base class for data loading errors.""" + class InvalidCSVError(DataLoadError): - """Raised when a file is not a valid CSV.""" \ No newline at end of file + """Raised when a file is not a valid CSV.""" diff --git a/src/pg_rad/isotopes/isotope.py b/src/pg_rad/isotopes/isotope.py index ea8be9d..ea758b8 100644 --- a/src/pg_rad/isotopes/isotope.py +++ b/src/pg_rad/isotopes/isotope.py @@ -5,7 +5,7 @@ class Isotope: name (str): Full name (e.g. Caesium-137). E (float): Energy of the primary gamma in keV. b (float): Branching ratio for the gamma at energy E. - """ + """ def __init__( self, name: str, @@ -20,4 +20,4 @@ class Isotope: self.name = name self.E = E - self.b = b \ No newline at end of file + self.b = b diff --git a/src/pg_rad/landscape/landscape.py b/src/pg_rad/landscape/landscape.py index fec1dfe..1883694 100644 --- a/src/pg_rad/landscape/landscape.py +++ b/src/pg_rad/landscape/landscape.py @@ -9,26 +9,29 @@ from pg_rad.objects import PointSource logger = logging.getLogger(__name__) + class Landscape: """A generic Landscape that can contain a Path and sources. Args: - air_density (float, optional): Air density in kg / m^3. Defaults to 1.243. - size (int | tuple[int, int, int], optional): Size of the world. Defaults to 500. - scale (str, optional): The scale of the size argument passed. Defaults to 'meters'. - """ + air_density (float, optional): Air density, kg/m^3. Defaults to 1.243. + size (int | tuple[int, int, int], optional): Size of the world. + Defaults to 500. + scale (str, optional): The scale of the size argument passed. + Defaults to 'meters'. + """ def __init__( self, air_density: float = 1.243, size: int | tuple[int, int, int] = 500, - scale = 'meters', - ): + scale: str = 'meters' + ): if isinstance(size, int): self.world = np.zeros((size, size, size)) elif isinstance(size, tuple) and len(size) == 3: self.world = np.zeros(size) else: - raise TypeError("size must be an integer or a tuple of 3 integers.") + raise TypeError("size must be integer or a tuple of 3 integers.") self.air_density = air_density self.scale = scale @@ -36,8 +39,8 @@ class Landscape: self.path: Path = None self.sources: list[PointSource] = [] logger.debug("Landscape initialized.") - - def plot(self, z = 0): + + def plot(self, z: float | int = 0): """Plot a slice of the world at a height `z`. Args: @@ -45,7 +48,7 @@ class Landscape: Returns: fig, ax: Matplotlib figure objects. - """ + """ x_lim, y_lim, _ = self.world.shape fig, ax = plt.subplots() @@ -54,9 +57,9 @@ class Landscape: ax.set_xlabel(f"X [{self.scale}]") ax.set_ylabel(f"Y [{self.scale}]") - if not self.path == None: + if self.path is not None: ax.plot(self.path.x_list, self.path.y_list, 'bo-') - + for s in self.sources: if np.isclose(s.z, z): dot = Circle( @@ -78,19 +81,19 @@ class Landscape: ) ax.add_patch(dot) - + return fig, ax def add_sources(self, *sources: PointSource): """Add one or more point sources to the world. Args: - *sources (pg_rad.sources.PointSource): One or more sources, passed as - Source1, Source2, ... + *sources (pg_rad.sources.PointSource): One or more sources, + passed as Source1, Source2, ... Raises: - ValueError: If the source is outside the boundaries of the landscape. - """ - + ValueError: If the source is outside the boundaries of the + landscape. + """ max_x, max_y, max_z = self.world.shape[:3] if any( @@ -108,24 +111,23 @@ class Landscape: Set the path in the landscape. """ self.path = path - -def create_landscape_from_path(path: Path, max_z = 500): + + +def create_landscape_from_path(path: Path, max_z: float | int = 50): """Generate a landscape from a path, using its dimensions to determine the size of the landscape. Args: path (Path): A Path object describing the trajectory. - max_z (int, optional): Height of the world. Defaults to 500 meters. + max_z (int, optional): Height of the world. Defaults to 50 meters. Returns: - landscape (pg_rad.landscape.Landscape): A landscape with dimensions based on the provided Path. - """ + landscape (pg_rad.landscape.Landscape): A landscape with dimensions + based on the provided Path. + """ max_x = np.ceil(max(path.x_list)) max_y = np.ceil(max(path.y_list)) - landscape = Landscape( - size = (max_x, max_y, max_z) - ) - + landscape = Landscape(size=(max_x, max_y, max_z)) landscape.path = path - return landscape \ No newline at end of file + return landscape diff --git a/src/pg_rad/logging/logger.py b/src/pg_rad/logging/logger.py index 0697432..a460e20 100644 --- a/src/pg_rad/logging/logger.py +++ b/src/pg_rad/logging/logger.py @@ -3,18 +3,19 @@ import pathlib import yaml + def setup_logger(log_level: str = "WARNING"): levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] - - if not log_level in levels: + + if log_level not in levels: raise ValueError(f"Log level must be one of {levels}.") - + base_dir = pathlib.Path(__file__).resolve().parent config_file = base_dir / "configs" / "logging.yml" with open(config_file) as f: config = yaml.safe_load(f) - + config["loggers"]["root"]["level"] = log_level - logging.config.dictConfig(config) \ No newline at end of file + logging.config.dictConfig(config) diff --git a/src/pg_rad/objects/objects.py b/src/pg_rad/objects/objects.py index 57ed76b..16f03b9 100644 --- a/src/pg_rad/objects/objects.py +++ b/src/pg_rad/objects/objects.py @@ -1,6 +1,7 @@ import math from typing import Self + class BaseObject: def __init__( self, @@ -16,18 +17,20 @@ class BaseObject: x (float): X coordinate. y (float): Y coordinate. z (float): Z coordinate. - name (str, optional): Name for the object. Defaults to "Unnamed object". - color (str, optional): Matplotlib compatible color string. Defaults to "red". + name (str, optional): Name for the object. + Defaults to "Unnamed object". + color (str, optional): Matplotlib compatible color string. + Defaults to "red". """ - + self.x = x self.y = y self.z = z self.name = name self.color = color - + def distance_to(self, other: Self) -> float: return math.dist( (self.x, self.y, self.z), (other.x, other.y, other.z), - ) \ No newline at end of file + ) diff --git a/src/pg_rad/objects/sources.py b/src/pg_rad/objects/sources.py index 74c5623..1bc3a2d 100644 --- a/src/pg_rad/objects/sources.py +++ b/src/pg_rad/objects/sources.py @@ -5,8 +5,10 @@ from pg_rad.isotopes import Isotope logger = logging.getLogger(__name__) + class PointSource(BaseObject): _id_counter = 1 + def __init__( self, x: float, @@ -27,9 +29,10 @@ class PointSource(BaseObject): name (str | None, optional): Can give the source a unique name. Defaults to None, making the name sequential. (Source-1, Source-2, etc.). - color (str, optional): Matplotlib compatible color string. Defaults to "red". + color (str, optional): Matplotlib compatible color string. + Defaults to "red". """ - + self.id = PointSource._id_counter PointSource._id_counter += 1 @@ -42,8 +45,13 @@ class PointSource(BaseObject): self.activity = activity self.isotope = isotope self.color = color - + logger.debug(f"Source created: {self.name}") def __repr__(self): - return f"PointSource(name={self.name}, pos={(self.x, self.y, self.z)}, isotope={self.isotope.name}, A={self.activity} MBq)" \ No newline at end of file + repr_str = (f"PointSource(name={self.name}, " + + f"pos={(self.x, self.y, self.z)}, " + + f"A={self.activity} MBq), " + + f"isotope={self.isotope.name}.") + + return repr_str diff --git a/src/pg_rad/path/path.py b/src/pg_rad/path/path.py index f605ecb..5f44860 100644 --- a/src/pg_rad/path/path.py +++ b/src/pg_rad/path/path.py @@ -11,6 +11,7 @@ from pg_rad.exceptions import ConvergenceError logger = logging.getLogger(__name__) + class PathSegment: def __init__(self, a: tuple[float, float], b: tuple[float, float]): """A straight Segment of a Path, from (x_a, y_a) to (x_b, y_b). @@ -18,18 +19,18 @@ class PathSegment: Args: a (tuple[float, float]): The starting point (x_a, y_a). b (tuple[float, float]): The final point (x_b, y_b). - """ + """ self.a = a self.b = b def get_length(self) -> float: return math.dist(self.a, self.b) - + length = property(get_length) def __str__(self) -> str: return str(f"({self.a}, {self.b})") - + def __getitem__(self, index) -> float: if index == 0: return self.a @@ -38,26 +39,30 @@ class PathSegment: else: raise IndexError + class Path: def __init__( self, coord_list: Sequence[tuple[float, float]], z: float = 0, - path_simplify = False + path_simplify: bool = False ): """Construct a path of sequences based on a list of coordinates. Args: - coord_list (Sequence[tuple[float, float]]): List of x,y coordinates. + coord_list (Sequence[tuple[float, float]]): List of x,y + coordinates. z (float, optional): Height of the path. Defaults to 0. - path_simplify (bool, optional): Whether to pg_rad.path.simplify_path(). Defaults to False. - """ - + path_simplify (bool, optional): Whether to + pg_rad.path.simplify_path(). Defaults to False. + """ + if len(coord_list) < 2: - raise ValueError("Must provide at least two coordinates as a list of tuples, e.g. [(x1, y1), (x2, y2)]") + raise ValueError("Must provide at least two coordinates as a \ + of tuples, e.g. [(x1, y1), (x2, y2)]") x, y = tuple(zip(*coord_list)) - + if path_simplify: try: x, y = simplify_path(list(x), list(y)) @@ -69,7 +74,11 @@ class Path: coord_list = list(zip(x, y)) - self.segments = [PathSegment(i, ip1) for i, ip1 in zip(coord_list, coord_list[1:])] + self.segments = [ + PathSegment(i, ip1) + for i, ip1 in + zip(coord_list, coord_list[1:]) + ] self.z = z @@ -77,7 +86,7 @@ class Path: def get_length(self) -> float: return sum([s.length for s in self.segments]) - + length = property(get_length) def __getitem__(self, index) -> PathSegment: @@ -85,40 +94,43 @@ class Path: def __str__(self) -> str: return str([str(s) for s in self.segments]) - + def plot(self, **kwargs): """ Plot the path using matplotlib. - """ + """ plt.plot(self.x_list, self.y_list, **kwargs) + def simplify_path( x: Sequence[float], y: Sequence[float], keep_endpoints_equal: bool = False, n_breakpoints: int = 3 ): - """From full resolution x and y arrays, return a piecewise linearly approximated/simplified pair of x and y arrays. + """From full resolution x and y arrays, return a piecewise linearly + approximated/simplified pair of x and y arrays. This function uses the `piecewise_regression` package. From a full set of coordinate pairs, the function fits linear sections, automatically finding the number of breakpoints and their positions. - On why the default value of n_breakpoints is 3, from the `piecewise_regression` - docs: + On why the default value of n_breakpoints is 3, from the + `piecewise_regression` docs: "If you do not have (or do not want to use) initial guesses for the number of breakpoints, you can set it to n_breakpoints=3, and the algorithm will randomly generate start_values. With a 50% chance, the bootstrap restarting algorithm will either use the best currently converged breakpoints or - randomly generate new start_values, escaping the local optima in two ways in - order to find better global optima." + randomly generate new start_values, escaping the local optima in two ways + in order to find better global optima." Args: x (Sequence[float]): Full list of x coordinates. y (Sequence[float]): Full list of y coordinates. keep_endpoints_equal (bool, optional): Whether or not to force start - and end to be exactly equal to the original. This will worsen the linear - approximation at the beginning and end of path. Defaults to False. + and end to be exactly equal to the original. This will worsen the + linear approximation at the beginning and end of path. Defaults to + False. n_breakpoints (int, optional): Number of breakpoints. Defaults to 3. Returns: @@ -129,25 +141,27 @@ def simplify_path( ConvergenceError: If the fitting algorithm failed to simplify the path. Reference: - Pilgrim, C., (2021). piecewise-regression (aka segmented regression) in Python. Journal of Open Source Software, 6(68), 3859, https://doi.org/10.21105/joss.03859. + Pilgrim, C., (2021). piecewise-regression (aka segmented regression) + in Python. Journal of Open Source Software, + 6(68), 3859, https://doi.org/10.21105/joss.03859. """ - - logger.debug(f"Attempting piecewise regression on path.") + + logger.debug("Attempting piecewise regression on path.") pw_fit = piecewise_regression.Fit(x, y, n_breakpoints=n_breakpoints) pw_res = pw_fit.get_results() - if pw_res == None: + if pw_res is None: logger.warning("Piecewise regression failed to converge.") - raise ConvergenceError("Piecewise regression failed to converge.") - + raise ConvergenceError("Piecewise regression failed to converge.") + est = pw_res['estimates'] # extract and sort breakpoints breakpoints_x = sorted( v['estimate'] for k, v in est.items() if k.startswith('breakpoint') ) - + x_points = [x[0]] + breakpoints_x + [x[-1]] y_points = pw_fit.predict(x_points) @@ -158,28 +172,33 @@ def simplify_path( y_points[-1] = y[-1] logger.info( - f"Piecewise regression reduced path from {len(x)-1} to {len(x_points)-1} segments." + f"Piecewise regression reduced path from \ + {len(x)-1} to {len(x_points)-1} segments." ) return x_points, y_points + def path_from_RT90( df: pd.DataFrame, east_col: str = "East", north_col: str = "North", **kwargs ) -> Path: - """Construct a path from East and North formatted coordinates (RT90) in a Pandas DataFrame. + """Construct a path from East and North formatted coordinates (RT90) + in a Pandas DataFrame. Args: - df (pandas.DataFrame): DataFrame containing at least the two columns noted in the cols argument. + df (pandas.DataFrame): DataFrame containing at least the two columns + noted in the cols argument. east_col (str): The column name for the East coordinates. north_col (str): The column name for the North coordinates. Returns: - Path: A Path object built from the aquisition coordinates in the DataFrame. - """ - + Path: A Path object built from the aquisition coordinates in the + DataFrame. + """ + east_arr = np.array(df[east_col]) - min(df[east_col]) north_arr = np.array(df[north_col]) - min(df[north_col]) @@ -187,4 +206,4 @@ def path_from_RT90( path = Path(coord_pairs, **kwargs) logger.debug("Loaded path from provided RT90 coordinates.") - return path \ No newline at end of file + return path