#!/usr/bin/env python3
"""
Created on Thu Jan 9 13:24:21 2025
@author: jbroust
"""
import functools
import logging
import numpy as np
import xarray as xr
import xoa.geo as xgeo
from scipy.optimize import linear_sum_assignment
from .. import geo as sgeo
from . import eddies2d as seddies
logger = logging.getLogger(__name__)
[docs]
class Associate:
"""Associate eddies between time steps for tracking
Uses the Chelton et al. (2011) tracking algorithm with cost function
based on distance, Rossby number, and radius similarity.
Parameters
----------
track_eddies : list
List of Track objects containing eddy trajectories.
parent_eddies : list
Eddies from the previous time step.
new_eddies : list
Eddies from the current time step to associate.
Dt : float
Time step (days) between parent and new eddies.
Tc : float
Characteristic time scale for tracking.
C : float, default 6.5*1e3/86400
Characteristic velocity scale (m/s).
Attributes
----------
cost : ndarray
Cost matrix for eddy association (cached property).
"""
[docs]
def __init__(
self,
track_eddies,
parent_eddies,
new_eddies,
Dt,
Tc,
C=6.5 * 1e3 / 86400,
):
"""Initialize eddy tracking association
Parameters
----------
track_eddies : list
List of Track objects containing eddy trajectories.
parent_eddies : list
Eddies from the previous time step.
new_eddies : list
Eddies from the current time step to associate.
Dt : float
Time step (days) between parent and new eddies.
Tc : float
Characteristic time scale for tracking.
C : float, default 6.5*1e3/86400
Characteristic velocity scale (m/s).
"""
self.parent_eddies = parent_eddies # reference eddies
self.new_eddies = new_eddies # next time eddies
self.track_eddies = track_eddies
self._Dt = Dt # actual time step between eddies 1 and eddies 2
self._Tc = Tc
self._C = C
[docs]
def search_dist(self, eddyj, eddyi):
"""Compute search distance for eddy association"""
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
Ravg = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
Ravg += self.track_eddies[eddyj.track_id].eddies[i].vmax_contour.radius
n += 1
# print('Dij components', self._C*(1+self._Dt)/2, Ravg/n/1e3, eddyi.vmax_contour.radius/1e3)
Dij = self._C * (1 + self._Dt) / 2 + Ravg / n + eddyi.vmax_contour.radius
return Dij
[docs]
def ro_avg(self, eddyj):
"""Average Rossby number over last 5 time steps"""
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
ro = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
ro += self.track_eddies[eddyj.track_id].eddies[i].ro
n += 1
return ro / n
[docs]
def rad_avg(self, eddyj):
"""Average radius over last 5 time steps"""
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
radius = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
radius += self.track_eddies[eddyj.track_id].eddies[i].radius
n += 1
return radius / n
@functools.cached_property
def cost(self):
"""Cost function between each eddy pair"""
M = np.zeros((len(self.new_eddies), len(self.parent_eddies)))
for i in range(len(self.new_eddies)):
for j in range(len(self.parent_eddies)):
dlat = self.parent_eddies[j].lat - self.new_eddies[i].lat
dlon = self.parent_eddies[j].lon - self.new_eddies[i].lon
x = sgeo.deg2m(dlon, self.parent_eddies[j].lat)
y = sgeo.deg2m(dlat)
D_ij = self.search_dist(self.parent_eddies[j], self.new_eddies[i])
# Distance term
dxy = np.sqrt(x**2 + y**2)
M[i, j] = (dxy**2) / (D_ij**2) if dxy < D_ij else 1e6
# dynamical similarity
roj = self.ro_avg(self.parent_eddies[j])
rj = self.rad_avg(self.parent_eddies[j])
DR = (self.parent_eddies[j].radius - self.new_eddies[i].radius) / (
rj + self.new_eddies[i].radius
)
DR0 = (self.parent_eddies[j].ro - self.new_eddies[i].ro) / (roj + self.new_eddies[i].ro)
# Warning: avoid coupling cyclone with anticyclone
M[i, j] += (
DR**2 + DR0**2 if self.parent_eddies[j].eddy_type == self.new_eddies[i].eddy_type else 1e6
)
# temporal proximity
M[i, j] += (0.5 * self._Dt / self._Tc) ** 2
return np.sqrt(M)
[docs]
def order(self):
M = self.cost
idel = []
for i in range(M.shape[0]):
if (M[i] > 1e3).all():
idel.append(i)
Mclean = np.delete(M, idel, axis=0) # delete impossible solutions
raw, col = linear_sum_assignment(Mclean)
for i, j in zip(raw, col):
np.delete(self.new_eddies, idel)[i].track_id = self.parent_eddies[j].track_id
[docs]
class AssociateMulti:
"""Associate eddies with multiple backward time steps for tracking"""
[docs]
def __init__(
self,
track_eddies,
parent_eddies, ## list of backward eddies
new_eddies,
Dt, ##list of dt
Tc,
C=6.5 * 1e3 / 86400,
):
self.parent_eddies = parent_eddies # reference backward eddies
self.new_eddies = new_eddies # next time eddies
self.track_eddies = track_eddies
self._Dt = Dt # time steps
self._Tc = Tc
self._C = C
[docs]
def search_dist(self, eddyj, eddyi, dt):
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
Ravg = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
try:
Ravg += self.track_eddies[eddyj.track_id].eddies[i].vmax_contour.radius
except AttributeError:
Ravg += self.track_eddies[eddyj.track_id].eddies[i].vmax_radius
n += 1
try:
Dij = self._C * (1 + dt) / 2 + Ravg / n + eddyi.vmax_contour.radius
except AttributeError:
Dij = self._C * (1 + dt) / 2 + Ravg / n + eddyi.vmax_radius
return Dij
[docs]
def ro_avg(self, eddyj):
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
ro = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
ro += self.track_eddies[eddyj.track_id].eddies[i].ro
n += 1
return ro / n
[docs]
def rad_avg(self, eddyj):
istart = max(0, len(self.track_eddies[eddyj.track_id].eddies) - 5)
n = 0
radius = 0
for i in range(istart, len(self.track_eddies[eddyj.track_id].eddies)):
radius += self.track_eddies[eddyj.track_id].eddies[i].radius
n += 1
return radius / n
@functools.cached_property
def cost(self):
"""Cost function between each eddy pair"""
nj = np.sum([len(self.parent_eddies[k]) for k in range(len(self.parent_eddies))])
M = np.zeros((len(self.new_eddies), nj))
for i in range(len(self.new_eddies)):
cmp = 0
for k in range(len(self.parent_eddies)):
for j in range(len(self.parent_eddies[k])):
dlat = self.parent_eddies[k][j].lat - self.new_eddies[i].lat
dlon = self.parent_eddies[k][j].lon - self.new_eddies[i].lon
x = xgeo.deg2m(dlon, self.parent_eddies[k][j].lat)
y = xgeo.deg2m(dlat)
D_ij = self.search_dist(
self.parent_eddies[k][j],
self.new_eddies[i],
self._Dt[k],
)
# Distance term
dxy = np.sqrt(x**2 + y**2)
M[i, cmp] = (dxy**2) / (D_ij**2) if dxy < D_ij else 1e6
# dynamical similarity
roj = self.ro_avg(self.parent_eddies[k][j])
rj = self.rad_avg(self.parent_eddies[k][j])
DR = (self.parent_eddies[k][j].radius - self.new_eddies[i].radius) / (
rj + self.new_eddies[i].radius
)
DR0 = (self.parent_eddies[k][j].ro - self.new_eddies[i].ro) / (
roj + self.new_eddies[i].ro
)
# Warning: avoid coupling cyclone with anticyclone
M[i, cmp] += (
DR**2 + DR0**2
if self.parent_eddies[k][j].eddy_type == self.new_eddies[i].eddy_type
else 1e6
)
# temporal proximity
M[i, cmp] += (0.5 * self._Dt[k] / self._Tc) ** 2
cmp += 1
return np.sqrt(M)
[docs]
def indexmatching(self):
index = {}
cmp = 0
for k in range(len(self.parent_eddies)):
for kj in range(len(self.parent_eddies[k])):
index[cmp] = (k, kj)
cmp += 1
return index
[docs]
def order(self):
# compute global matrix
M = self.cost
index = self.indexmatching()
jdel = [] # eliminate already parent eddies that can't be assigned again
for j in range(M.shape[1]):
if self.parent_eddies[index[j][0]][index[j][1]].is_parent:
jdel.append(j)
del index[j]
Mclean = np.delete(M, jdel, axis=1)
index = {i: index[k] for i, k in enumerate(index)}
# Clean this matrix with impossible connections
idel = [] # new eddies impossible to match
for i in range(M.shape[0]):
if (Mclean[i] > 1e3).all():
idel.append(i)
Mclean = np.delete(Mclean, idel, axis=0) # delete impossible solutions
# Connect rows with columns with Hungarian algorithm
raw, col = linear_sum_assignment(Mclean)
for i, j in zip(raw, col):
k = index[j][0]
kj = index[j][1]
if Mclean[i, j] > 1000:
continue
np.delete(self.new_eddies, idel)[i].track_id = self.parent_eddies[k][kj].track_id
self.parent_eddies[k][kj].is_parent = True
[docs]
class Track:
"""Individual eddy track containing a temporal sequence of eddies
Represents a single eddy trajectory through time. Stores the sequence
of associated :class:`~shoot.eddies.eddies2d.GriddedEddy2D` objects
and their corresponding times.
Parameters
----------
eddy : GriddedEddy2D or list
Initial eddy or list of eddies.
time : numpy.datetime64 or list
Time(s) of the eddy detection(s).
number : int
Unique track identifier.
dt : float
Time step between detections in seconds.
Tc : float
Characteristic time scale for tracking in seconds.
C : float, default 6.5*1e3/86400
Characteristic velocity scale (m/s).
"""
[docs]
def __init__(
self,
eddy,
time,
number,
dt,
Tc,
C=6.5 * 1e3 / 86400, # 6.5 km.day in m/s
):
self.eddies = [eddy] if not isinstance(eddy, list) else eddy
self.number = number
self.active = True
self.times = [time] if not isinstance(time, list) else time
self._dt = dt
self._Tc = Tc
self._C = C
[docs]
@classmethod
def reconstruct(cls, eddies, times, number, dt, Tc):
return cls(eddies, times, number, dt, Tc)
[docs]
def update(self, eddy, time):
self.eddies.append(eddy)
self.times.append(time)
@property
def ds(self):
return xr.Dataset(
{
"date_first_detection": (("eddies"), [self.times[0]]),
"date_last_detection": (("eddies"), [self.times[-1]]),
"life_time": (
("eddies"),
[(self.times[-1] - self.times[0]) / np.timedelta64(1, "D")],
),
"x_start": (("eddies"), [self.eddies[0].lon]),
"y_start": (("eddies"), [self.eddies[0].lat]),
"x_end": (("eddies"), [self.eddies[-1].lon]),
"y_end": (("eddies"), [self.eddies[-1].lat]),
"track_type": (("eddies"), [self.eddies[0].eddy_type]),
},
)
[docs]
class Tracks:
"""Collection of eddy tracks over time
Performs eddy tracking on an :class:`~shoot.eddies.eddies2d.EvolEddies2D`
object using the Chelton et al. (2011) algorithm with backward association.
Parameters
----------
eddies : EvolEddies2D
Detected eddies at multiple time steps.
nback : int
Number of backward time steps for multi-step association.
C : float, default 6.5*1e3/86400
Characteristic velocity scale (m/s).
"""
[docs]
def __init__(
self,
eddies,
nback,
C=6.5 * 1e3 / 86400, # 6.5 km.day in m/s
**attrs,
):
self.eddies = eddies # EvolEddies object
self.times = [e.time for e in eddies.eddies] # corresponding time vector
self.nback = nback
self._dt = eddies.dt
self._Tc = nback * eddies.dt
self._C = C
self.nb_step = int(self._Tc / self._dt) # maximum backward timesteps
self.nb_tracks = 0
self.attrs = attrs
self.track_eddies = {} # list of tracks
[docs]
@classmethod
def reconstruct(cls, ds, nback):
"""Reconstruct tracks from a tracked xarray dataset"""
## Reconstruct eddies
eddies = seddies.EvolEddies2D.reconstruct(ds)
## Reconstruct tracks
track_eddies = {} # dictionary of tracks
for i in np.unique(ds.track_id):
tmp = ds.where(ds.track_id == i, drop=True) # select eddies belonging to track i
trace_times = list(tmp.time.values)
trace_number = i
trace_eddies = []
for j in range(len(tmp.obs)):
trace_eddies.append(seddies.Eddy.reconstruct(tmp.isel(obs=j), track=True))
track_eddies[i] = Track.reconstruct(
trace_eddies,
trace_times,
trace_number,
eddies.dt,
eddies.dt * nback,
)
my_tracks = cls(eddies, nback)
my_tracks.track_eddies = track_eddies
return my_tracks
@property
def ds(self):
ds = None
for nb_track in self.track_eddies:
track = self.track_eddies[nb_track]
if ds is None:
ds = track.ds
else:
ds = xr.concat([ds, track.ds], dim="eddies")
ds_eddies = self.eddies.ds
return xr.merge([ds_eddies, ds], compat="override")
[docs]
def to_netcdf(self, path_nc):
"Save to NetCDF format"
self.ds.to_netcdf(path_nc)
[docs]
def track_init(self):
logger.info("Initializing tracks from first time step")
for i, eddy in enumerate(self.eddies.eddies[0].eddies): # initialized with the first detected eddies
self.track_eddies[i] = Track(eddy, self.times[0], i, self._dt, self._Tc)
eddy.track_id = i # update eddy track number
self.nb_tracks += 1
[docs]
def track_steps(self):
logger.info("Processing tracking steps")
for i in range(1, len(self.times)): # compute tracking on all following time steps
t = self.times[i]
logger.debug("Tracking time step: %s", t)
new_eddies = self.eddies.eddies[i].eddies
self.update_multi(
[self.eddies.eddies[i - k].eddies for k in range(1, min(i, self.nback) + 1)],
new_eddies,
[self._dt * k for k in range(1, min(i, self.nback) + 1)],
)
for eddy in new_eddies:
if eddy.track_id is None: # Create a new track
self.track_eddies[self.nb_tracks] = Track(eddy, t, self.nb_tracks, self._dt, self._Tc)
eddy.track_id = self.nb_tracks # update eddy track number
self.nb_tracks += 1
else: ## append to existing track
self.track_eddies[eddy.track_id].update(eddy, t)
[docs]
def track_step(self):
"""
Track only for the last eddies.
Supposes the job has been done before for others.
Useful for update concerns.
"""
i = len(self.times) - 1
t = self.times[i]
new_eddies = self.eddies.eddies[i].eddies
self.update_multi(
[self.eddies.eddies[i - k].eddies for k in range(1, min(i, self.nback) + 1)],
new_eddies,
[self._dt * k for k in range(1, min(i, self.nback) + 1)],
)
for eddy in new_eddies:
if eddy.track_id is None: # Create a new track
self.track_eddies[self.nb_tracks] = Track(eddy, t, self.nb_tracks, self._dt, self._Tc)
eddy.track_id = self.nb_tracks # update eddy track number
self.nb_tracks += 1
else: ## append to existing track
self.track_eddies[eddy.track_id].update(eddy, t)
[docs]
def tracking(self):
"""compute the eddy tracking"""
self.track_init()
self.track_steps()
# return self.track_eddies
[docs]
def update(self, parent_eddies, new_eddies, Dt):
"""Update based on last detected eddies"""
Associate(self.track_eddies, parent_eddies, new_eddies, Dt, self._Tc).order()
[docs]
def update_multi(self, parent_eddies, new_eddies, Dt):
"""Update based on several preceding time eddies"""
AssociateMulti(self.track_eddies, parent_eddies, new_eddies, Dt, self._Tc).order()
[docs]
def refresh(self, new_eddies):
"""refresh a track with a new Eddies object (next time)"""
self.eddies.add(new_eddies)
self.times.append(new_eddies.time)
self.track_step()
[docs]
def track_eddies(eddies, nback):
"""Track eddies across time steps
Parameters
----------
eddies : EvolEddies2D
Detections at several time steps.
The time interval should preferably be constant.
nback : int
Number of backward time steps for eddy tracking.
Typically around 10 days for satellite altimetry
and 1 or 2 days for numerical simulations.
Returns
-------
Tracks
Tracks object with eddy trajectories.
Example
-------
>>> from shoot.eddies.track import track_eddies
>>> tracks = track_eddies(evol_eddies, nback=10) # doctest: +SKIP
>>> tracks.to_netcdf("tracked.nc") # doctest: +SKIP
"""
tracks = Tracks(eddies, nback)
tracks.tracking()
return tracks
[docs]
def update_tracks(ds, new_eddies, nback):
"""Update tracking based on new eddies detection
Parameters
----------
ds : xarray.Dataset
Already tracked period.
new_eddies : Eddies2D
Detections at the new time step.
nback : int
Number of backward time steps for eddy tracking.
Typically around 10 days for satellite altimetry
and 1 or 2 days for numerical simulations.
Returns
-------
Tracks
Updated tracks object.
"""
tracks = Tracks.reconstruct(ds, nback)
tracks.refresh(new_eddies)
return tracks