Source code for shoot.contours

#!/usr/bin/env python3
"""
Contouring utilities

Functions for extracting and analyzing closed contours from 2D fields.
"""

import contourpy as cpy
import numpy as np
import scipy.ndimage as scin
import xarray as xr

from . import geo as sgeo
from . import meta as smeta
from . import num as snum


[docs] def get_closed_contours(lon_center, lat_center, ssh, nlevels=50, robust=0.03): """Extract closed contours enclosing a center point Parameters ---------- lon_center : float Center longitude in degrees. lat_center : float Center latitude in degrees. ssh : xarray.DataArray 2D field to contour (typically SSH). nlevels : int, default 50 Maximum number of contour levels. robust : float, default 0.03 Quantile threshold to exclude extreme values. Returns ------- list of xarray.Dataset Each dataset contains a closed contour with coordinates and metadata, including lon/lat coordinates, SSH level, and center position. Example ------- >>> from shoot.contours import get_closed_contours >>> contours = get_closed_contours(5.0, 43.0, ssh_field) # doctest: +SKIP >>> len(contours) # number of nested closed contours # doctest: +SKIP """ lon = smeta.get_lon(ssh) lat = smeta.get_lat(ssh) lat2d, lon2d = xr.broadcast(lat, lon) cont_gen = cpy.contour_generator(z=ssh.values) vmin, vmax = np.nanquantile(ssh, [robust, 1 - robust]) point = np.array([lon_center, lat_center]) dss = [] if len(np.arange(vmin, vmax + 0.005, 0.005)) < nlevels: ran = np.arange(vmin, vmax + 0.005, 0.005) else: ran = np.linspace(vmin, vmax, nlevels) for level in ran: for line in cont_gen.lines(level): if (line[0] == line[-1]).all(): # check if it is closed contour xx = interp_to_line(lon2d.values, line) yy = interp_to_line(lat2d.values, line) if snum.points_in_polygon(point, np.array([xx, yy]).T): # Check if it contains the center if np.any(np.isnan(ssh)): # Chek if it contains land points inside nan_indexes = np.where(np.isnan(ssh)) nan_points = np.array( [[lon2d[i, j], lat2d[i, j]] for i, j in zip(nan_indexes[0], nan_indexes[1])] ) if np.any(snum.points_in_polygon(nan_points, np.array([xx, yy]).T)): continue dss.append( xr.Dataset( {"line": (("npts", "ncoords"), line)}, coords={ "lon": ( "npts", xx, {"long_name": "Longitude"}, ), "lat": ("npts", yy, {"long_name": "Latitude"}), }, attrs={ "ssh": level, "lon_center": lon_center, "lat_center": lat_center, }, ) ) return dss
[docs] def interp_to_line(data, line): """Interpolate 2D field values along a contour line Parameters ---------- data : ndarray 2D field to interpolate. line : ndarray Contour line coordinates of shape (n, 2). Returns ------- ndarray Interpolated values along the contour. """ coords = line.T[::-1] mask = np.isnan(data).astype("d") dataf = np.nan_to_num(data) lm = scin.map_coordinates(mask, coords) ldata = scin.map_coordinates(dataf, coords) lbad = ~np.isclose(lm + 1, 1.0) ldata[lbad] = np.nan return ldata
[docs] def add_contour_uv(ds, u, v): """Add velocity components and angular momentum to contour dataset Parameters ---------- ds : xarray.Dataset Contour dataset with 'line' variable. u : ndarray Zonal velocity field. v : ndarray Meridional velocity field. Returns ------- xarray.Dataset Input dataset with added velocity and momentum variables. """ if "u" not in ds: uc = interp_to_line(u, ds.line.values) vc = interp_to_line(v, ds.line.values) ds["u"] = ("npts", uc, {"long_name": "Velocity along X"}) ds["v"] = ("npts", vc, {"long_name": "Velocity along Y"}) xdist = sgeo.deg2m(ds.lon - ds.lon_center, ds.lat_center).values ydist = sgeo.deg2m(ds.lat - ds.lat_center).values am = xdist * vc - ydist * uc ds["am"] = ( "npts", am, {"long_name": "Angular momentum", "units": "m2.s-2"}, ) ds.attrs["mean_velocity"] = float(np.sqrt(ds.u**2 + ds.v**2).mean()) ds.attrs["mean_angular_momentum"] = float(ds.am.mean()) ds.attrs["radius"] = float(np.mean(np.sqrt(xdist**2 + ydist**2))) return ds
[docs] def add_contour_dx_dy(ds): """Add spatial metrics to contour dataset Parameters ---------- ds : xarray.Dataset Contour dataset with lon/lat coordinates. Returns ------- xarray.Dataset Input dataset with added dx, dy, and length attributes. """ if "dx" not in ds: dx = sgeo.deg2m(np.gradient(ds.lon.values), ds.lat.values.mean()) dy = sgeo.deg2m(np.gradient(ds.lat.values)) ds["dx"] = ("npts", dx, {"units": "m"}) ds["dy"] = ("npts", dy, {"units": "m"}) ds.attrs["length"] = float(np.sqrt(dx**2 + dy**2).sum()) return ds
[docs] class ContourMeanSpeedGetter: """Callable class to compute mean speed along a contour Parameters ---------- u : ndarray Zonal velocity field. v : ndarray Meridional velocity field. """
[docs] def __init__(self, u, v): """Initialize with velocity fields Parameters ---------- u : ndarray Zonal velocity field. v : ndarray Meridional velocity field. """ self.u, self.v = u, v
def __call__(self, ds): """Compute mean speed along contour Parameters ---------- ds : xarray.Dataset Contour dataset with 'line' variable. Returns ------- float Mean speed along the contour. """ add_contour_uv(ds.line.values, self.u, self.v) return float(np.sqrt(ds.u**2 + ds.v**2).mean())
[docs] def get_lnam_peaks(lnam, K=0.7): """Find local extrema of LNAM field within closed contours Parameters ---------- lnam : xarray.DataArray 2D LNAM (Local Normalized Angular Momentum) field. K : float, default 0.7 Contour level threshold for detecting closed regions. Returns ------- minima : ndarray Array of (i, j) indices for LNAM minima (anticyclones). maxima : ndarray Array of (i, j) indices for LNAM maxima (cyclones). Lines_coords : list List of contour line coordinates [lon, lat] for each closed contour. """ # compute lines lon = smeta.get_lon(lnam) lat = smeta.get_lat(lnam) lat2d, lon2d = xr.broadcast(lat, lon) lon_name, lat_name = snum.get_coord_name(lnam) cont_gen = cpy.contour_generator(z=abs(lnam)) lines = cont_gen.lines(K) maxima = np.empty((0, 2), dtype=np.int64) minima = np.empty((0, 2), dtype=np.int64) Lines_coords = [] for i, line in enumerate(lines): if not (line[0] == line[-1]).all(): # skip not closed contours continue xx = interp_to_line(lon2d.values, line) yy = interp_to_line(lat2d.values, line) Lines_coords.append([xx, yy]) # get extremum of the polygon lon_min = xx.min() lon_max = xx.max() lat_min = yy.min() lat_max = yy.max() # get nearest x,y values jmin = (abs(lat2d - lat_min) + abs(lon2d - lon_min)).argmin(lnam.dims)[lnam.dims[0]].data - 1 imin = (abs(lat2d - lat_min) + abs(lon2d - lon_min)).argmin(lnam.dims)[lnam.dims[1]].data - 1 jmax = (abs(lat2d - lat_max) + abs(lon2d - lon_max)).argmin(lnam.dims)[lnam.dims[0]].data + 1 imax = (abs(lat2d - lat_max) + abs(lon2d - lon_max)).argmin(lnam.dims)[lnam.dims[1]].data + 1 # compute max inside the polygon ijmax = ( abs(lnam) .isel( { lnam.dims[0]: slice(jmin, jmax), lnam.dims[1]: slice(imin, imax), } ) .argmax(lnam.dims) ) jmax_in = ijmax[lnam.dims[0]].data # lat imax_in = ijmax[lnam.dims[1]].data # lon lat_center = abs(lnam).isel({lnam.dims[0]: slice(jmin, jmax), lnam.dims[1]: slice(imin, imax)})[ jmax_in, imax_in ][lat_name] lon_center = abs(lnam).isel({lnam.dims[0]: slice(jmin, jmax), lnam.dims[1]: slice(imin, imax)})[ jmax_in, imax_in ][lon_name] jcenter = (abs(lat2d - lat_center) + abs(lon2d - lon_center)).argmin(lnam.dims)[lnam.dims[0]].data icenter = (abs(lat2d - lat_center) + abs(lon2d - lon_center)).argmin(lnam.dims)[lnam.dims[1]].data if lnam[jcenter, icenter] > 0: maxima = np.append(maxima, np.array([[icenter, jcenter]]), axis=0) else: minima = np.append(minima, np.array([[icenter, jcenter]]), axis=0) return minima, maxima, Lines_coords
[docs] def area(ds): """Compute area enclosed by a contour Parameters ---------- ds : xarray.Dataset Contour dataset with lon, lat coordinates and lon_center, lat_center attributes. Returns ------- float Area enclosed by the contour in m². """ xdist = sgeo.deg2m(ds.lon - ds.lon_center, ds.lat_center).values ydist = sgeo.deg2m(ds.lat - ds.lat_center).values xydist = np.sqrt(xdist**2 + ydist**2) xyavg = 0.5 * (xydist[:-1] + xydist[1:]) dx = np.sqrt( sgeo.deg2m(ds.lon[:-1] - ds.lon[1:], ds.lat_center).values ** 2 + sgeo.deg2m(ds.lat[:-1] - ds.lat[1:]).values ** 2 ) theta = np.arccos((-dx + xydist[1:] + xydist[:-1]) / (xydist[1:] + xydist[:-1])) return np.sum(xyavg * theta)