Source code for shoot.eddies.eddies3d

#!/usr/bin/env python3
"""
Created on Wed Jul  3 15:39:51 2024 by jbroust
"""

import functools

import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import linear_sum_assignment

from .. import dyn as sdyn
from .. import geo as sgeo
from .. import meta as smeta
from .. import plot as splot
from . import eddies2d


[docs] class Associate: """Associate eddies between vertical levels based on spatial proximity Uses the Hungarian algorithm (linear sum assignment) to match eddies between a parent level and a new level based on distance and eddy type. Parameters ---------- parent_eddies : list Reference eddies from the parent vertical level. new_eddies : list Eddies from the new vertical level to associate. max_distance : float, default 10 Maximum distance (km) for eddy centers to be associated. Attributes ---------- cost : ndarray Cost matrix for eddy association (cached property). """
[docs] def __init__(self, parent_eddies, new_eddies, max_distance=10): """Initialize eddy association Parameters ---------- parent_eddies : list Reference eddies from the parent vertical level. new_eddies : list Eddies from the new vertical level to associate. max_distance : float, default 10 Maximum distance (km) for eddy centers to be associated. """ self.parent_eddies = parent_eddies # reference eddies self.new_eddies = new_eddies # next time eddies self._max_distance = max_distance # maximum distance for centers to be associated
[docs] def search_dist(self, eddyj, eddyi): istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5) n = 0 Ravg = 0 for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)): Ravg += self.track_eddies[eddyj.track_id].eddies[i].vmax_contour.radius n += 1 # print('Dij components', self._C*(1+self._Dt)/2, Ravg/n/1e3, eddyi.vmax_contour.radius/1e3) Dij = self._C * (1 + self._Dt) / 2 + Ravg / n + eddyi.vmax_contour.radius return Dij
@functools.cached_property def cost(self): """Cost function between each eddy pair""" M = np.zeros((len(self.new_eddies), len(self.parent_eddies))) for i in range(len(self.new_eddies)): for j in range(len(self.parent_eddies)): dlat = self.parent_eddies[j].glat - self.new_eddies[i].glat dlon = self.parent_eddies[j].glon - self.new_eddies[i].glon x = sgeo.deg2m(dlon, self.parent_eddies[j].glat) y = sgeo.deg2m(dlat) dxy = np.sqrt(x**2 + y**2) / 1000 dxy = dxy if dxy < self._max_distance else 1e6 M[i, j] = dxy if self.parent_eddies[j].eddy_type == self.new_eddies[i].eddy_type else 1e6 return M
[docs] def order(self): M = self.cost idel = [] for i in range(M.shape[0]): if (M[i] > 1e3).all(): idel.append(i) Mclean = np.delete(M, idel, axis=0) # delete impossible solutions raw, col = linear_sum_assignment(Mclean) for i, j in zip(raw, col): np.delete(self.new_eddies, idel)[i].z_id = self.parent_eddies[j].z_id
[docs] class EddiesByDepth: """3D eddy detection by associating eddies across depth levels Detects eddies at each vertical level and associates them vertically to create 3D coherent structures. Parameters ---------- u : xarray.DataArray 3D zonal velocity field. v : xarray.DataArray 3D meridional velocity field. depth : xarray.DataArray Depth coordinate. eddies3d : dict Dictionary of Eddies2D objects at each depth level. nb_eddies : int Total number of 3D eddies detected. Attributes ---------- eddies3d : dict Eddies organized by depth level. nb_eddies : int Total count of 3D eddies. """
[docs] def __init__(self, u, v, depth, eddies3d, nb_eddies): """Initialize 3D eddies by depth Parameters ---------- u : xarray.DataArray 3D zonal velocity field. v : xarray.DataArray 3D meridional velocity field. depth : xarray.DataArray Depth coordinate. eddies3d : dict Dictionary of Eddies2D objects at each depth level. nb_eddies : int Total number of 3D eddies detected. """ self.u = u self.v = v self.depth = depth # to change self.eddies3d = eddies3d # dictionary with Eddies2D at each depth self.nb_eddies = nb_eddies
[docs] @classmethod def detect_eddies_3d( cls, u, v, window_center, window_fit=None, dx=None, dy=None, min_radius=None, ssh_method='streamline', paral=False, max_distance=10, **kwargs, ): depth = u.depth eddies3d = {} eddies_tmp = None nb_eddies = 0 for z in range(len(depth) - 1, -1, -1): uz = u.isel(depth=z) vz = v.isel(depth=z) eddies_z = eddies2d.Eddies2D.detect_eddies( uz, vz, window_center, window_fit=window_fit, min_radius=min_radius, paral=paral ) if z == len(depth) - 1: for i, eddy in enumerate(eddies_z.eddies): eddy.z_id = i nb_eddies = len(eddies_z.eddies) else: Associate(eddies_tmp.eddies, eddies_z.eddies, max_distance=max_distance).order() for eddy in eddies_z.eddies: if not hasattr(eddy, 'z_id'): eddy.z_id = nb_eddies nb_eddies += 1 eddies3d[z] = eddies_z eddies_tmp = eddies_z return cls(u, v, depth, eddies3d, nb_eddies)
[docs] class RawEddy3D: """A 3D eddy composed of 2D eddies at multiple depth levels Parameters ---------- depths : ndarray Depth values where this eddy was detected. eddies : list of GriddedEddy2D 2D eddy detections at each depth level. """
[docs] def __init__(self, depths, eddies): self.depths = depths self.eddies = eddies # list of GriddedEddy2D
@property def min_depth(self): return min(np.abs(self.depths)) @property def max_depth(self): return max(np.abs(self.depths)) @property def vmax(self): "return the maximum speed of the eddy" return max([e.vmax_contour.mean_velocity for e in self.eddies]) @property def vmax_depth(self): """return the depth of the maximum speed""" ivmax = np.argmax([e.vmax_contour.mean_velocity for e in self.eddies]) return np.abs(self.depths[ivmax])
[docs] class Eddies3D: """Collection of 3D eddies detected across depth levels Associates 2D eddy detections vertically to build coherent 3D structures. Use :meth:`detect_eddies_3d` for detection. Parameters ---------- u : xarray.DataArray 3D zonal velocity field. v : xarray.DataArray 3D meridional velocity field. depths : ndarray Depth values. eddies : list of RawEddy3D 3D eddy objects. eddies_byslice : EddiesByDepth 2D detections organized by depth level. """
[docs] def __init__(self, u, v, depths, eddies, eddies_byslice): self.eddies = eddies self.eddies_byslice = eddies_byslice self.depths = depths self.u = u self.v = v
[docs] @classmethod def detect_eddies_3d( cls, u, v, window_center, window_fit=None, dx=None, dy=None, min_radius=None, ssh_method='streamline', paral=False, max_distance=10, **kwargs, ): eddies = EddiesByDepth.detect_eddies_3d( u, v, window_center, window_fit=window_fit, dx=dx, dy=dy, min_radius=min_radius, ssh_method=ssh_method, paral=paral, max_distance=max_distance, **kwargs, ) eddies_3d = [] for z_id in range(eddies.nb_eddies): e_depth = [] e = [] for i, k in enumerate(eddies.eddies3d): eddies_2d = eddies.eddies3d[k] for eddy in eddies_2d.eddies: if eddy.z_id == z_id: e.append(eddy) e_depth.append(eddies.depth[k].values) eddies_3d.append(RawEddy3D(np.array(e_depth), e)) return cls(u, v, eddies.depth.values, eddies_3d, eddies)
[docs] def plot2d(self, depth=0, boundary=False, vmax=False, quiver=False, ns=5): """ plot the 2D vorticity field superimposed with eddies detection at the nearest layer depth """ lon = smeta.get_lon(self.u) lat = smeta.get_lat(self.u) i = np.argmin(np.abs(depth - np.abs(self.depths))) fig, ax = splot.create_map( lon, lat, figsize=(8, 5), ) sdyn.get_relvort(self.u.isel(depth=i), self.v.isel(depth=i)).plot( x="lon_rho", y="lat_rho", cmap="cmo.curl", ax=ax, add_colorbar=False, transform=splot.pcarr, ) if quiver: plt.quiver( lon[::ns, ::ns].values, lat[::ns, ::ns].values, self.u.isel(depth=i)[::ns, ::ns].values, self.v.isel(depth=i)[::ns, ::ns].values, color="k", transform=splot.pcarr, ) for eddy3d in self.eddies: inearest = np.argmin(np.abs(depth - np.abs(eddy3d.depths))) eddy2d = eddy3d.eddies[inearest] eddy2d.plot(boundary=boundary, vmax=vmax, transform=splot.pcarr) cb = plt.scatter( eddy2d.lon, eddy2d.lat, c=eddy3d.max_depth, cmap="Spectral_r", vmin=np.min(np.abs(self.depths)), vmax=np.max(np.abs(self.depths)), transform=splot.pcarr, ) plt.colorbar(cb, label="maximum depth")