mirror of
https://github.com/pim-n/pg-rad
synced 2026-03-23 21:58:12 +01:00
improve error handling and PEP8
This commit is contained in:
@ -59,15 +59,19 @@ class ConfigParser:
|
||||
point_sources=sources,
|
||||
)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def _load_yaml(self, config_source: str) -> Dict[str, Any]:
|
||||
if os.path.exists(config_source):
|
||||
with open(config_source) as f:
|
||||
return yaml.safe_load(f)
|
||||
return yaml.safe_load(config_source)
|
||||
data = yaml.safe_load(f)
|
||||
else:
|
||||
data = yaml.safe_load(config_source)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(
|
||||
"Provided path or string is not a valid YAML representation."
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
def _parse_metadata(self) -> MetadataSpec:
|
||||
try:
|
||||
@ -77,8 +81,6 @@ class ConfigParser:
|
||||
except KeyError as e:
|
||||
raise MissingConfigKeyError("global", {"name"}) from e
|
||||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def _parse_runtime(self) -> RuntimeSpec:
|
||||
required = {"speed", "acquisition_time"}
|
||||
missing = required - self.config.keys()
|
||||
@ -90,8 +92,6 @@ class ConfigParser:
|
||||
acquisition_time=float(self.config["acquisition_time"]),
|
||||
)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def _parse_options(self) -> SimulationOptionsSpec:
|
||||
options = self.config.get("options", {})
|
||||
|
||||
@ -102,24 +102,31 @@ class ConfigParser:
|
||||
allowed=allowed,
|
||||
)
|
||||
|
||||
return SimulationOptionsSpec(
|
||||
air_density=float(options.get(
|
||||
"air_density",
|
||||
defaults.DEFAULT_AIR_DENSITY
|
||||
)),
|
||||
seed=options.get("seed"),
|
||||
)
|
||||
air_density = options.get("air_density", defaults.DEFAULT_AIR_DENSITY)
|
||||
seed = options.get("seed")
|
||||
|
||||
# ----------------------------------------------------------
|
||||
if not isinstance(air_density, float) or air_density <= 0:
|
||||
raise ValueError(
|
||||
"options.air_density must be a positive float in kg/m^3."
|
||||
)
|
||||
if not isinstance(seed, int) or seed < 0:
|
||||
raise ValueError("Seed must be a positive integer value.")
|
||||
|
||||
return SimulationOptionsSpec(
|
||||
air_density=air_density,
|
||||
seed=seed,
|
||||
)
|
||||
|
||||
def _parse_path(self) -> PathSpec:
|
||||
allowed_csv = {"file", "east_col_name", "north_col_name", "z"}
|
||||
allowed_proc = {"segments", "length", "z"}
|
||||
|
||||
if "path" not in self.config:
|
||||
path = self.config.get("path")
|
||||
if path is None:
|
||||
raise MissingConfigKeyError("global", {"path"})
|
||||
|
||||
path = self.config["path"]
|
||||
if not isinstance(path, dict):
|
||||
raise ValueError("Path must be a dictionary.")
|
||||
|
||||
if "file" in path:
|
||||
self._warn_unknown_keys(
|
||||
@ -145,13 +152,15 @@ class ConfigParser:
|
||||
|
||||
raise ValueError("Invalid path configuration.")
|
||||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def _parse_procedural_path(
|
||||
self,
|
||||
path: Dict[str, Any]
|
||||
) -> ProceduralPathSpec:
|
||||
raw_segments = path["segments"]
|
||||
raw_segments = path.get("segments")
|
||||
|
||||
if not isinstance(raw_segments, List):
|
||||
raise ValueError("path.segments must be a list of segments.")
|
||||
|
||||
raw_length = path.get("length")
|
||||
|
||||
if raw_length is None:
|
||||
@ -199,11 +208,8 @@ class ConfigParser:
|
||||
) -> List[float]:
|
||||
num_lengths = len(raw_length_list)
|
||||
|
||||
if num_lengths == num_segments:
|
||||
if num_lengths == num_segments or num_lengths == 1:
|
||||
return raw_length_list
|
||||
elif num_lengths == 1:
|
||||
length_list = raw_length_list + [None] * (num_segments - 1)
|
||||
return length_list
|
||||
else:
|
||||
raise ValueError(
|
||||
"Path length must either be a single number or a list with "
|
||||
@ -223,7 +229,7 @@ class ConfigParser:
|
||||
|
||||
if angle is not None and not self._is_turn(seg["type"]):
|
||||
raise ValueError(
|
||||
f"A {seg["type"]} segment does not support an angle."
|
||||
f"A {seg['type']} segment does not support an angle."
|
||||
)
|
||||
|
||||
resolved.append(
|
||||
@ -251,7 +257,16 @@ class ConfigParser:
|
||||
if missing:
|
||||
raise MissingConfigKeyError(name, missing)
|
||||
|
||||
position = params["position"]
|
||||
activity = params.get("activity_MBq")
|
||||
isotope = params.get("isotope")
|
||||
|
||||
if not isinstance(activity, int | float) or activity <= 0:
|
||||
raise ValueError(
|
||||
f"sources.{name}.activity_MBq must be positive value "
|
||||
"in MegaBequerels."
|
||||
)
|
||||
|
||||
position = params.get("position")
|
||||
|
||||
if isinstance(position, list):
|
||||
if len(position) != 3:
|
||||
@ -262,8 +277,8 @@ class ConfigParser:
|
||||
specs.append(
|
||||
AbsolutePointSourceSpec(
|
||||
name=name,
|
||||
activity_MBq=float(params["activity_MBq"]),
|
||||
isotope=params["isotope"],
|
||||
activity_MBq=float(activity),
|
||||
isotope=isotope,
|
||||
x=float(position[0]),
|
||||
y=float(position[1]),
|
||||
z=float(position[2]),
|
||||
@ -274,8 +289,8 @@ class ConfigParser:
|
||||
specs.append(
|
||||
RelativePointSourceSpec(
|
||||
name=name,
|
||||
activity_MBq=float(params["activity_MBq"]),
|
||||
isotope=params["isotope"],
|
||||
activity_MBq=float(activity),
|
||||
isotope=isotope,
|
||||
along_path=float(position["along_path"]),
|
||||
dist_from_path=float(position["dist_from_path"]),
|
||||
side=position["side"],
|
||||
|
||||
@ -104,7 +104,8 @@ def main():
|
||||
except (
|
||||
OutOfBoundsError,
|
||||
DimensionError,
|
||||
InvalidIsotopeError
|
||||
InvalidIsotopeError,
|
||||
ValueError
|
||||
) as e:
|
||||
logger.critical(e)
|
||||
logger.critical(
|
||||
|
||||
@ -61,7 +61,7 @@ def minimal_distance_to_path(
|
||||
to interpolated polyline path at height path_z.
|
||||
"""
|
||||
x, y, z = pos
|
||||
|
||||
|
||||
path = np.column_stack((x_list, y_list))
|
||||
point_xy = np.array([x, y])
|
||||
|
||||
@ -103,7 +103,7 @@ def abs_to_rel_source_position(
|
||||
side ("left" or "right"),
|
||||
z_rel
|
||||
"""
|
||||
|
||||
|
||||
x, y, z = pos
|
||||
|
||||
path = np.column_stack((x_list, y_list))
|
||||
@ -121,7 +121,9 @@ def abs_to_rel_source_position(
|
||||
best_fraction = None
|
||||
best_signed_dist = None
|
||||
|
||||
for i, (p0, seg, seg_len) in enumerate(zip(path[:-1], segments, segment_lengths)):
|
||||
for (i, (p0, seg, seg_len)) in enumerate(
|
||||
zip(path[:-1], segments, segment_lengths)
|
||||
):
|
||||
if seg_len == 0:
|
||||
continue
|
||||
|
||||
@ -152,7 +154,9 @@ def abs_to_rel_source_position(
|
||||
raise ValueError("Could not project point onto path.")
|
||||
|
||||
# Compute along_path
|
||||
along_path = arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
|
||||
along_path = (
|
||||
arc_length[best_s_i] + best_fraction * segment_lengths[best_s_i]
|
||||
)
|
||||
|
||||
# Side and distance
|
||||
side = "left" if best_signed_dist > 0 else "right"
|
||||
@ -161,4 +165,4 @@ def abs_to_rel_source_position(
|
||||
# z relative
|
||||
z_rel = z - path_z
|
||||
|
||||
return along_path, dist_from_path, side, z_rel
|
||||
return along_path, dist_from_path, side, z_rel
|
||||
|
||||
Reference in New Issue
Block a user