Source code for shoot.eddies.associate

#!/usr/bin/env python3
"""
Created on Thu Jan  9 13:24:21 2025

@author: jbroust
"""

import functools

import numpy as np
from scipy.optimize import linear_sum_assignment
from shapely.geometry import Polygon

from .. import geo as sgeo
from .. import num as snum


[docs] class Associate: """Associate eddies with reference eddies based on spatial proximity and similarity"""
[docs] def __init__(self, eddies, ref_eddies, dmax): self.ref_eddies = ref_eddies # list reference eddies self.eddies = eddies # list of eddies self.dmax = dmax * 1000 # convert to meter as all is performed in meters
@functools.cached_property def cost(self): """Cost function between each eddy pair""" M = np.zeros((len(self.eddies), len(self.ref_eddies))) for i in range(len(self.eddies)): for j in range(len(self.ref_eddies)): dlat = self.ref_eddies[j].lat - self.eddies[i].lat dlon = self.ref_eddies[j].lon - self.eddies[i].lon x = sgeo.deg2m(dlon, self.ref_eddies[j].lat) y = sgeo.deg2m(dlat) # Distance term dxy = np.sqrt(x**2 + y**2) M[i, j] = (dxy**2) / (self.dmax**2) if dxy < self.dmax else 1e6 # dynamical similarity DR = (self.ref_eddies[j].radius - self.eddies[i].radius) / ( self.ref_eddies[j].radius + self.eddies[i].radius ) DR0 = (self.ref_eddies[j].ro - self.eddies[i].ro) / ( self.ref_eddies[j].ro + self.eddies[i].ro ) # Warning: avoid coupling cyclone with anticyclone M[i, j] += DR**2 + DR0**2 if self.ref_eddies[j].eddy_type == self.eddies[i].eddy_type else 1e6 return np.sqrt(M)
[docs] def order(self): """Assign eddy IDs using linear sum assignment""" M = self.cost idel = [] for i in range(M.shape[0]): if (M[i] > 1e3).all(): idel.append(i) Mclean = np.delete(M, idel, axis=0) # delete impossible solutions raw, col = linear_sum_assignment(Mclean) for i, j in zip(raw, col): if Mclean[i, j] > 1e3: continue np.delete(self.eddies, idel)[i].id = self.ref_eddies[j].id
[docs] class BiMod: """Bipartite matching for eddy association with validation metrics Performs eddy association and computes validation metrics including match proportion, area overlap, and distance between matched eddies. Parameters ---------- eddies : Eddies2D Eddies to associate with reference. ref_eddies : Eddies2D Reference eddies for comparison. dmax : float Maximum distance (km) for association. Attributes ---------- pmatch : float Proportion of eddies that match with reference (property). parea : float Average intersection area ratio between matched eddies (property). dist : float Average distance (km) between matched eddy centers (property). """
[docs] def __init__(self, eddies, ref_eddies, dmax): """Initialize bipartite matcher Parameters ---------- eddies : Eddies2D Eddies to associate with reference. ref_eddies : Eddies2D Reference eddies for comparison. dmax : float Maximum distance (km) for association. """ self.eddies = eddies self.ref_eddies = ref_eddies self.dmax = dmax
[docs] @classmethod def compare(cls, eddies, ref_eddies, max_distance=50): """add id to ref eddies and associate eddies to this ids when possible""" ## attribute id to ref eddies for i, eddy in enumerate(ref_eddies.eddies): eddy.id = i for i, eddy in enumerate(eddies.eddies): eddy.id = None Associate(eddies.eddies, ref_eddies.eddies, max_distance).order() return cls(eddies, ref_eddies, max_distance)
@property def pmatch(self): """Proportion of eddies that match with reference eddies""" self._intersects() nb_nomatch = np.sum( [ # (eddy.id is None) | (not eddy.intersect) not eddy.intersect for eddy in self.eddies.eddies ] ) return (len(self.eddies.eddies) - nb_nomatch) / len(self.eddies.eddies) def _intersects(self): """Check if matched eddies' contours intersect Sets the `intersect` attribute for each eddy indicating whether its contour intersects with its matched reference eddy's contour. """ for eddy in self.eddies.eddies: eddy.intersect = False if eddy.id is None: eddy.intersect = None continue for reddy in self.ref_eddies.eddies: if reddy.id == eddy.id: if hasattr(eddy, "x_vmax"): # Eddy case points = np.array([eddy.x_vmax, eddy.y_vmax]).T if snum.points_in_polygon(points, np.array([reddy.x_vmax, reddy.y_vmax]).T).any(): eddy.intersect = True else: # GriddedEddy2D case points = np.array([eddy.vmax_contour.lon, eddy.vmax_contour.lat]).T if snum.points_in_polygon( points, np.array( [ reddy.vmax_contour.lon, reddy.vmax_contour.lat, ] ).T, ).any(): eddy.intersect = True break @property def parea(self): """Average intersection area ratio between matched eddies""" area = 0 nb_eddy = 0 for eddy in self.eddies.eddies: if eddy.id is None: continue for reddy in self.ref_eddies.eddies: if reddy.id == eddy.id: x_ell, y_ell = eddy.ellipse.sample x_rell, y_rell = reddy.ellipse.sample poly = Polygon([(x, y) for x, y in zip(x_ell, y_ell)]) rpoly = Polygon([(x, y) for x, y in zip(x_rell, y_rell)]) intersection = poly.intersection(rpoly) area += intersection.area / rpoly.area nb_eddy += 1 break return area / nb_eddy if nb_eddy > 0 else np.nan @property def dist(self): """Average distance between matched eddy centers""" ddist = 0 nb_eddy = 0 for eddy in self.eddies.eddies: if eddy.id is None: continue for reddy in self.ref_eddies.eddies: if reddy.id == eddy.id: dlat = eddy.lat - reddy.lat dlon = eddy.lon - reddy.lon x = sgeo.deg2m(dlon, reddy.lat) y = sgeo.deg2m(dlat) ddist += np.sqrt(x**2 + y**2) / 1000 nb_eddy += 1 break return ddist / nb_eddy if nb_eddy > 0 else np.nan