"""
Hydrologic functions
Anomaly detection and profile analysis for eddies.
"""
import functools
import numpy as np
import xarray as xr
from . import grid as sgrid
from . import meta as smeta
from . import num as snum
[docs]
class Anomaly:
"""Compute 3D anomalies inside and outside an eddy
Compares vertical profiles of a tracer (density, temperature, etc.)
inside and outside an eddy to compute anomalies.
Parameters
----------
eddy : RawEddy2D
Eddy object with location and contour information.
eddies : Eddies2D
Collection of all eddies (to exclude from background).
dens : xarray.DataArray
3D tracer field (e.g., density, temperature).
depth : xarray.DataArray, optional
3D depth field. Inferred from dens if not provided.
r_factor : float, default 1.2
Radius factor for selecting outside profiles.
nz : int, default 100
Number of vertical levels for interpolation.
"""
[docs]
def __init__(self, eddy, eddies, dens, depth=None, r_factor=1.2, nz=100, eddy_type=True):
self.lon = eddy.lon
self.lat = eddy.lat
self.eddy = eddy
self.eddy_type = eddy_type # True if anomaly sign based on eddy_type
if hasattr(eddy, "boundary_contour"):
self.radius = eddy.boundary_contour.radius # eddy.vmax_contour.radius # eddy.radius in meters
else:
self.radius = eddy.eff_radius # boundary contour radius in meters
self.dens = dens.squeeze()
if depth is not None:
self.depth = depth.squeeze()
else:
self.depth = smeta.get_depth(dens).squeeze()
self.xdim = smeta.get_xdim(self.dens, errors="raise")
self.ydim = smeta.get_ydim(self.dens, errors="raise")
self.zdim = smeta.get_zdim(self.dens, errors="raise")
self.nz = nz
self._jmax = len(self.dens[self.xdim])
self._imax = len(self.dens[self.ydim])
self._r = r_factor
self.eddies = eddies # The whole eddies file
if not hasattr(self.depth, self.xdim):
self.depth = self.depth.expand_dims(
{
self.xdim: self.dens.sizes[self.xdim],
self.ydim: self.dens.sizes[self.ydim],
}
).broadcast_like(self.dens)
@functools.cached_property
def _dist(self):
lon_name = smeta.get_lon(self.dens).name
lat_name = smeta.get_lat(self.dens).name
dist = np.sqrt((self.dens[lon_name] - self.lon) ** 2 + (self.dens[lat_name] - self.lat) ** 2)
# dist = dist.transpose(lat_name, lon_name)
if len(self.dens[lon_name].dims) == 1:
dim_x = self.dens[lon_name].dims[0]
dim_y = self.dens[lat_name].dims[0]
dist = dist.transpose(dim_y, dim_x)
return dist.values
@property
def _i(self):
# return np.unravel_index(np.argmin(self._dist), self.dens[lon_name].shape)[0]
return np.unravel_index(np.argmin(self._dist), self._dist.shape)[0]
@property
def _j(self):
# return np.unravel_index(np.argmin(self._dist), self.dens[lon_name].shape)[1]
return np.unravel_index(np.argmin(self._dist), self._dist.shape)[1]
@functools.cached_property
def depth_vector(self):
depth = self.depth.isel({self.xdim: self._j, self.ydim: self._i})
return xr.DataArray(
np.linspace(depth.values[0], depth.values[-1], self.nz),
dims="depth_int",
)
@functools.cached_property
def profil_inside(self):
inside = self.dens.isel({self.xdim: self._j, self.ydim: self._i})
if self.depth_vector[0] < self.depth_vector[-1]: # increasing case
return np.interp(
self.depth_vector,
self.depth.isel({self.xdim: self._j, self.ydim: self._i}),
inside,
)
else: # decreasing order bad for np.interp
return np.interp(
self.depth_vector[::-1],
self.depth.isel({self.xdim: self._j, self.ydim: self._i})[::-1],
inside[::-1],
)[::-1]
[docs]
def is_inside(self, x, y):
"""Test if grid points are inside the eddy maximum velocity contour
Parameters
----------
x : array-like
Grid indices along X.
y : array-like
Grid indices along Y.
Returns
-------
ndarray of bool
True for points inside the eddy contour.
"""
if len(smeta.get_lon(self.dens).shape) == 1:
lon = [smeta.get_lon(self.dens).isel({self.xdim: xi}) for xi in x]
else:
lon = [smeta.get_lon(self.dens).isel({self.xdim: xi, self.ydim: yi}) for xi, yi in zip(x, y)]
if len(smeta.get_lat(self.dens).shape) == 1:
lat = [smeta.get_lat(self.dens).isel({self.ydim: yi}) for yi in y]
else:
lat = [smeta.get_lat(self.dens).isel({self.xdim: xi, self.ydim: yi}) for xi, yi in zip(x, y)]
points = np.array([lon, lat]).T
if hasattr(self.eddy, "x_vmax"):
xx = self.eddy.x_vmax
yy = self.eddy.y_vmax
else:
xx = self.eddy.vmax_contour.lon
yy = self.eddy.vmax_contour.lat
result = snum.points_in_polygon(points, np.array([xx, yy]).T)
return result
def _xy(self, r):
dx, dy = sgrid.get_dx_dy(self.dens)
dxm = np.nanmean(dx)
dym = np.nanmean(dy)
nx = int(r * self.radius / dxm)
ny = int(r * self.radius / dym)
stepx = max(int(2 * nx / 10), 1)
stepy = max(int(2 * ny / 10), 1)
X = np.arange(max(self._j - nx, 0), min(self._j + nx, self._jmax - 1) + 1, stepx)
Y = np.arange(max(self._i - ny, 0), min(self._i + ny, self._imax - 1) + 1, stepy)
X, Y = np.meshgrid(X, Y)
X = X.flatten()
Y = Y.flatten()
return X, Y
@functools.cached_property
def _xy_inside(self):
X, Y = self._xy(self._r)
# test validy
valids = self.is_inside(X, Y)
X = [X[i] for i in range(len(X)) if valids[i]]
Y = [Y[i] for i in range(len(Y)) if valids[i]]
return (X, Y)
@functools.cached_property
def _profils_inside(self):
X = self._xy_inside[0]
Y = self._xy_inside[1]
return self.dens.isel(
{
self.xdim: xr.DataArray(
X,
dims="nb_profil",
),
self.ydim: xr.DataArray(
Y,
dims="nb_profil",
),
}
)
[docs]
def is_valid(self, x, y):
"""Test if grid points are outside all eddy boundary contours
Used to select background (outside) profiles that are not
contaminated by any eddy.
Parameters
----------
x : array-like
Grid indices along X.
y : array-like
Grid indices along Y.
Returns
-------
ndarray of bool
True for points outside all eddy contours.
"""
if len(smeta.get_lon(self.dens).shape) == 1:
lon = [smeta.get_lon(self.dens).isel({self.xdim: xi}) for xi in x]
else:
lon = [smeta.get_lon(self.dens).isel({self.xdim: xi, self.ydim: yi}) for xi, yi in zip(x, y)]
if len(smeta.get_lat(self.dens).shape) == 1:
lat = [smeta.get_lat(self.dens).isel({self.ydim: yi}) for yi in y]
else:
lat = [smeta.get_lat(self.dens).isel({self.xdim: xi, self.ydim: yi}) for xi, yi in zip(x, y)]
points = np.array([lon, lat]).T
result = np.ones(len(x)) * True
for eddy in self.eddies.eddies:
if hasattr(eddy, "x_eff"):
xx = eddy.x_eff
yy = eddy.y_eff
else:
xx = eddy.boundary_contour.lon
yy = eddy.boundary_contour.lat
result *= np.invert(snum.points_in_polygon(points, np.array([xx, yy]).T))
return result
@functools.cached_property
def _xy_outside(self):
test = True
r = self._r
while test: # increase r factor if no outside point found
X, Y = self._xy(r)
# test validy
valids = self.is_valid(X, Y)
X = [X[i] for i in range(len(X)) if valids[i]]
Y = [Y[i] for i in range(len(Y)) if valids[i]]
if len(X) > 0 and len(Y) > 0:
test = False
else:
r *= 2
return (X, Y)
@functools.cached_property
def _profils_outside(self):
X = self._xy_outside[0]
Y = self._xy_outside[1]
return self.dens.isel(
{
self.xdim: xr.DataArray(
X,
dims="nb_profil",
),
self.ydim: xr.DataArray(
Y,
dims="nb_profil",
),
}
)
@functools.cached_property
def _depths_inside(self):
X = self._xy_inside[0]
Y = self._xy_inside[1]
return self.depth.isel(
{
self.xdim: xr.DataArray(
X,
dims="nb_profil",
),
self.ydim: xr.DataArray(
Y,
dims="nb_profil",
),
}
)
@functools.cached_property
def _depths_outside(self):
X = self._xy_outside[0]
Y = self._xy_outside[1]
return self.depth.isel(
{
self.xdim: xr.DataArray(
X,
dims="nb_profil",
),
self.ydim: xr.DataArray(
Y,
dims="nb_profil",
),
}
)
@staticmethod
def _interp_profils(depth, var, depth_vector):
if depth_vector[0] < depth_vector[-1]:
return np.interp(
depth_vector,
depth,
var,
)
else:
return np.interp(
depth_vector[::-1],
depth[::-1],
var[::-1],
)[::-1]
@functools.cached_property
def profils_inside(self):
return xr.apply_ufunc(
self._interp_profils,
self._depths_inside.transpose("nb_profil", self.zdim),
self._profils_inside.transpose("nb_profil", self.zdim),
self.depth_vector,
input_core_dims=[
[self.zdim],
[self.zdim],
[self.depth_vector.dims[0]],
],
output_core_dims=[[self.depth_vector.dims[0]]],
dask_gufunc_kwargs={"output_sizes": {self.depth_vector.dims[0]: len(self.depth_vector)}},
vectorize=True,
dask="parallelized",
output_dtypes=[self.profil_inside.dtype],
).compute()
@functools.cached_property
def profils_outside(self):
return xr.apply_ufunc(
self._interp_profils,
self._depths_outside.transpose("nb_profil", self.zdim),
self._profils_outside.transpose("nb_profil", self.zdim),
self.depth_vector,
input_core_dims=[
[self.zdim],
[self.zdim],
[self.depth_vector.dims[0]],
],
output_core_dims=[[self.depth_vector.dims[0]]],
dask_gufunc_kwargs={"output_sizes": {self.depth_vector.dims[0]: len(self.depth_vector)}},
vectorize=True,
dask="parallelized",
output_dtypes=[self.profil_inside.dtype],
).compute()
@functools.cached_property
def mean_profil_inside(self):
return self.profils_inside.mean(dim="nb_profil")
@functools.cached_property
def std_profil_inside(self):
return self.profils_inside.std(dim="nb_profil")
@functools.cached_property
def mean_profil_outside(self):
return self.profils_outside.mean(dim="nb_profil")
@functools.cached_property
def std_profil_outside(self):
return self.profils_outside.std(dim="nb_profil")
@functools.cached_property
def anomaly(self):
return self.mean_profil_inside - self.mean_profil_outside
@functools.cached_property
def center_anomaly(self):
return self.profil_inside - self.mean_profil_outside
@functools.cached_property
def _icore_depth(self):
if np.isnan(self.anomaly).all():
return None
if self.eddy_type:
if self.eddy.eddy_type == "anticyclone":
icore = np.nanargmin(self.anomaly.values)
else:
icore = np.nanargmax(self.anomaly.values)
else:
icore = np.nanargmax(np.abs(self.anomaly.values))
return icore
@functools.cached_property
def core_depth(self):
return np.abs(self.depth_vector[self._icore_depth])
@functools.cached_property
def intensity(self):
return np.abs(self.anomaly[self._icore_depth])
@functools.cached_property
def signed_intensity(self):
return self.anomaly[self._icore_depth]
[docs]
def anomaly_at_depth(self, depth_level, signed=False):
"""Get the anomaly value at a specific depth
Parameters
----------
depth_level : float
Target depth (sign is adjusted automatically).
signed : bool, default False
If True, return signed anomaly. Otherwise, return absolute value.
Returns
-------
float
Anomaly value at the requested depth.
"""
if np.sign(depth_level) != np.sign(self.depth_vector[1]):
depth_level *= -1
iref = np.argmin(np.abs(self.depth_vector.values - depth_level))
if signed:
return self.anomaly[iref]
else:
return np.abs(self.anomaly[iref])
[docs]
def compute_anomalies(eddies, dens, nz=100, r_factor=1.2, eddy_type=True):
"""Add anomaly to detected eddies
Parameters
----------
eddies : Eddies2D
Collection of detected eddies.
dens : xarray.DataArray
3D tracer field (density, temperature, etc.).
nz : int, default 100
Number of vertical interpolation levels.
r_factor : float, default 1.2
Radius factor for background profile selection.
Notes
-----
Modifies eddies in-place by adding .anomaly attribute to each eddy.
Example
-------
>>> from shoot.hydrology import compute_anomalies
>>> compute_anomalies(eddies, ds.density) # doctest: +SKIP
>>> eddies.eddies[0].anomaly.core_depth # doctest: +SKIP
"""
for eddy in eddies.eddies:
eddy.anomaly = Anomaly(eddy, eddies, dens, r_factor=r_factor, nz=nz, eddy_type=eddy_type)