Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 223 additions & 12 deletions improver/generate_ancillaries/generate_distance_to_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
"""A module for generating a distance to feature ancillary cube."""

import os
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union

import numpy as np
import pyproj
from geopandas import GeoDataFrame, GeoSeries, clip
from iris.coords import AuxCoord, DimCoord
from iris.cube import Cube
from numpy import array, min, round
from shapely.geometry import Point
from shapely.geometry import Point, Polygon
from shapely.geometry.base import BaseGeometry
from shapely.ops import unary_union

from improver import BasePlugin

Expand Down Expand Up @@ -42,6 +46,7 @@ def __init__(
self,
epsg_projection: int,
new_name: Optional[str] = None,
angle_pairs: Optional[List[Tuple[float, float]]] = None,
buffer: float = 30000,
clip_geometry_flag: bool = False,
parallel: bool = False,
Expand All @@ -59,6 +64,11 @@ def __init__(
Areas projection suitable for the UK.
new_name:
The name of the output cube.
angle_pairs:
Optional list of angular sector bounds in degrees clockwise from
true north. Each element should be (start_angle, end_angle).
If provided, the shortest distance to geometry in each sector is
returned as an additional leading dimension on the output cube.
buffer:
A buffer distance in m. If the geometry is clipped, this distance will
be added onto the outermost site locations to define the domain to clip
Expand All @@ -78,11 +88,50 @@ def __init__(
"""
self.epsg_projection = epsg_projection
self.new_name = new_name
self.angle_pairs = self.validate_angle_pairs(angle_pairs)
self.buffer = buffer
self.clip_geometry_flag = clip_geometry_flag
self.parallel = parallel
self.n_parallel_jobs = n_parallel_jobs

@staticmethod
def validate_angle_pairs(
angle_pairs: Optional[List[Tuple[float, float]]],
) -> Optional[List[Tuple[float, float]]]:
"""Validate sector angle pairs.

Args:
angle_pairs:
Optional list of 2-tuples of angles in degrees.

Returns:
Validated angle pairs.

Raises:
ValueError:
If any angle pair is malformed.
"""

if angle_pairs is None:
return None

validated_pairs = []
for pair in angle_pairs:
if len(pair) != 2:
raise ValueError(
"Each item in angle_pairs must contain exactly two angles."
)
start_angle, end_angle = pair
if start_angle < 0 or start_angle > 360 or end_angle < 0 or end_angle > 360:
raise ValueError("Angles in angle_pairs must be between 0 and 360.")
if start_angle == end_angle and not (start_angle == 0 and end_angle == 360):
raise ValueError(
"Identical start and end angles are invalid unless using [0, 360]."
)
validated_pairs.append((float(start_angle), float(end_angle)))

return validated_pairs

@staticmethod
def get_clip_values(points: List[float], buffer: float) -> List[float]:
"""Get the coordinates to use when clipping the geometry. This is determined by
Expand Down Expand Up @@ -200,7 +249,64 @@ def project_geometry(
site_points = site_points.to_crs(self.epsg_projection)
return site_points, geometry_reprojection

def distance_to(self, site_points: GeoSeries, geometry: GeoDataFrame) -> List[int]:
@staticmethod
def _create_sector_geometry(
point: Point,
angle_pair: Tuple[float, float],
max_radius: float,
n_arc_points: int = 180,
) -> Optional[BaseGeometry]:
"""Create a sector polygon for the given site point.

Args:
point:
Site location in projected coordinates.
angle_pair:
(start_angle, end_angle) in degrees clockwise from true north.
max_radius:
Radius used to create the sector polygon in metres.
n_arc_points:
Number of arc points used to approximate curved boundaries.

Returns:
A Polygon representing the sector, or None for full-circle sectors.
"""

start_angle, end_angle = angle_pair
if start_angle == 0 and end_angle == 360:
return None

start_angle = start_angle % 360
end_angle = end_angle % 360

sector_parts = (
[(start_angle, end_angle)]
if end_angle > start_angle
else [(start_angle, 360.0), (0.0, end_angle)]
)

polygons = []
for part_start, part_end in sector_parts:
angles = np.linspace(part_start, part_end, n_arc_points)
arc_points = [
(
point.x + max_radius * np.sin(np.deg2rad(angle)),
point.y + max_radius * np.cos(np.deg2rad(angle)),
)
for angle in angles
]
polygons.append(
Polygon([(point.x, point.y), *arc_points, (point.x, point.y)])
)

return unary_union(polygons)

def distance_to(
self,
site_points: GeoSeries,
geometry: GeoDataFrame,
angle_pair: Optional[Tuple[float, float]] = None,
) -> List[float]:
"""Calculate the distance from each site point to the nearest feature in the
geometry.

Expand All @@ -209,40 +315,82 @@ def distance_to(self, site_points: GeoSeries, geometry: GeoDataFrame) -> List[in
A GeoSeries containing the site points in the target projection.
geometry:
A GeoDataFrame containing the geometry in the target projection.
angle_pair:
Optional angular sector bounds, clockwise from true north.
Returns:
A list of distances from each site point to the nearest feature in the
geometry rounded to the nearest metre."""

def _distance_to_nearest(point: Point, geometry: GeoDataFrame) -> float:
bounds = geometry.total_bounds
corners = np.array(
[
[bounds[0], bounds[1]],
[bounds[0], bounds[3]],
[bounds[2], bounds[1]],
[bounds[2], bounds[3]],
]
)

def _distance_to_nearest(
point: Point,
geometry: GeoDataFrame,
angle_pair: Optional[Tuple[float, float]] = None,
) -> float:
"""Calculate the distance from a point to the nearest feature in the
geometry.
Args:
point:
A shapely Point object representing the site location.
geometry:
A GeoDataFrame containing the geometry in the target projection.
angle_pair:
Optional angular sector bounds, clockwise from true north.
Returns:
The distance from the point to the nearest feature in the geometry
rounded to the nearest metre.
"""
return round(min(point.distance(geometry.geometry)))

geometry_to_search = geometry.geometry
if angle_pair is not None:
max_radius = (
np.max(np.hypot(corners[:, 0] - point.x, corners[:, 1] - point.y))
+ 1.0
)
sector_polygon = self._create_sector_geometry(
point, angle_pair, max_radius=max_radius
)

if sector_polygon is not None:
intersected = geometry_to_search.intersection(sector_polygon)
geometry_to_search = intersected[
~intersected.is_empty & intersected.notnull()
]
if geometry_to_search.empty:
return np.nan

return round(min(point.distance(geometry_to_search)))

if self.parallel:
from joblib import Parallel, delayed

parallel = Parallel(n_jobs=self.n_parallel_jobs, prefer="threads")
output_generator = parallel(
delayed(_distance_to_nearest)(point, geometry) for point in site_points
delayed(_distance_to_nearest)(point, geometry, angle_pair)
for point in site_points
)
distance_results = list(output_generator)
else:
distance_results = []
for point in site_points:
distance_results.append(_distance_to_nearest(point, geometry))
distance_results.append(
_distance_to_nearest(point, geometry, angle_pair)
)

return distance_results

def create_output_cube(self, site_cube: Cube, data: List[int]) -> Cube:
def create_output_cube(
self, site_cube: Cube, data: Union[List[float], List[List[float]]]
) -> Cube:
"""Create an output cube that will have the same metadata as the input site
cube except the units are changed to metres and, if requested, the name of the
output cube will be changed.
Expand All @@ -252,13 +400,70 @@ def create_output_cube(self, site_cube: Cube, data: List[int]) -> Cube:
The input cube containing site locations that are defined by latitude
and longitude coordinates.
data:
A list of distances from each site point to the nearest feature in the
geometry.
Distances from each site point to the nearest feature in the geometry,
optionally grouped by sector.
Returns:
A new cube containing the distances with the same metadata as input site
cube but with updated units and name."""

output_cube = site_cube.copy(data=array(data))
if self.angle_pairs is None:
output_cube = site_cube.copy(data=array(data))
else:
sector_data = array(data)
n_sectors = len(self.angle_pairs)
sector_coord = DimCoord(
np.arange(n_sectors, dtype=np.int32), long_name="sector", units="1"
)
dim_coords_and_dims = [(sector_coord, 0)]
for coord in site_cube.dim_coords:
(dim,) = site_cube.coord_dims(coord)
dim_coords_and_dims.append((coord.copy(), dim + 1))

aux_coords_and_dims = []
for coord in site_cube.aux_coords:
dims = site_cube.coord_dims(coord)
if dims:
aux_coords_and_dims.append(
(coord.copy(), tuple(dim + 1 for dim in dims))
)
else:
aux_coords_and_dims.append((coord.copy(), None))

angle_means = []
for pair in self.angle_pairs:
if pair[0] > pair[1]:
angle_means.append((((pair[0] + pair[1]) / 2) - 180) % 360)
else:
angle_means.append(np.mean(pair))

aux_coords_and_dims.append(
(
AuxCoord(
np.array(angle_means, dtype=np.float32),
bounds=np.array(self.angle_pairs, dtype=np.float32),
long_name="sector_angle_from_true_north",
units="degrees",
),
0,
)
)

output_kwargs = {
"units": site_cube.units,
"attributes": site_cube.attributes.copy(),
"cell_methods": site_cube.cell_methods,
"dim_coords_and_dims": dim_coords_and_dims,
"aux_coords_and_dims": aux_coords_and_dims,
}
if site_cube.standard_name is not None:
output_kwargs["standard_name"] = site_cube.standard_name
else:
output_kwargs["long_name"] = site_cube.long_name
if site_cube.var_name is not None:
output_kwargs["var_name"] = site_cube.var_name

output_cube = Cube(sector_data, **output_kwargs)

if self.new_name:
output_cube.rename(self.new_name)
output_cube.units = "m"
Expand Down Expand Up @@ -316,6 +521,12 @@ def process(self, site_cube: Cube, geometry: GeoDataFrame) -> Cube:
clipped_geometry = geometry_projection

# Calculate the distance to the nearest feature in the geometry
distance_to_results = self.distance_to(site_coords, clipped_geometry)
if self.angle_pairs is None:
distance_to_results = self.distance_to(site_coords, clipped_geometry)
else:
distance_to_results = [
self.distance_to(site_coords, clipped_geometry, angle_pair=angle_pair)
for angle_pair in self.angle_pairs
]

return self.create_output_cube(site_cube, distance_to_results)
56 changes: 56 additions & 0 deletions improver_tests/generate_ancillaries/test_DistanceTo.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,59 @@ def test_distance_to_with_unsuitable_projection(single_site_cube, geometry_point
)
with pytest.raises(ValueError, match=msg):
DistanceTo(3112)(single_site_cube, geometry_point_laea)


def test_distance_to_with_angle_pairs(single_site_cube):
"""Test that sector-based distances are returned with a leading sector dimension."""

single_site_cube.coord("latitude").points = 49.543481633
single_site_cube.coord("longitude").points = -1.387510304

data = [
Point(3500500, 3000800), # north, 300 m
Point(3500900, 3000500), # east, 400 m
Point(3500500, 3000400), # south, 100 m
Point(3500300, 3000500), # west, 200 m
]
geometry = GeoDataFrame(geometry=data, crs="EPSG:3035")

angle_pairs = [(315, 45), (45, 135), (135, 225), (225, 315)]
output_cube = DistanceTo(3035, angle_pairs=angle_pairs)(single_site_cube, geometry)

expected = np.array([[300], [400], [100], [200]])
np.testing.assert_allclose(output_cube.data, expected)
assert output_cube.shape == (4, 1)
np.testing.assert_allclose(output_cube.coord("sector").points, [0, 1, 2, 3])
np.testing.assert_allclose(
output_cube.coord("sector_angle_from_true_north").points,
[0, 90, 180, 270],
)
np.testing.assert_allclose(
output_cube.coord("sector_angle_from_true_north").bounds,
[[315, 45], [45, 135], [135, 225], [225, 315]],
)


def test_distance_to_with_angle_pairs_no_feature_in_sector(single_site_cube):
"""Test sectors with no geometry intersection return NaN."""

single_site_cube.coord("latitude").points = 49.543481633
single_site_cube.coord("longitude").points = -1.387510304

geometry = GeoDataFrame(geometry=[Point(3500900, 3000500)], crs="EPSG:3035")

angle_pairs = [(315, 45), (45, 135)]
output_cube = DistanceTo(3035, angle_pairs=angle_pairs)(single_site_cube, geometry)

assert np.isnan(output_cube.data[0, 0])
assert output_cube.data[1, 0] == 400


def test_distance_to_invalid_angle_pairs(single_site_cube, geometry_point_laea):
"""Test invalid sector definitions raise a ValueError."""

with pytest.raises(
ValueError,
match=r"Identical start and end angles are invalid unless using \[0, 360\].",
):
DistanceTo(3035, angle_pairs=[(90, 90)])(single_site_cube, geometry_point_laea)