Source code for shoot.dyn

#!/usr/bin/env python3
"""
Ocean dynamics utilities

Functions for computing kinematic quantities from velocity fields including
vorticity, divergence, angular momentum, and geostrophic currents.
"""

import math

import numba
import numpy as np
import xarray as xr
import xoa.coords as xcoords

from . import grid as sgrid

GRAVITY = 9.81
OMEGA = 2 * np.pi / 86400


@numba.guvectorize(
    [(numba.float64[:, :], numba.float64[:, :], numba.int64, numba.float64, numba.float64[:, :])],
    "(ny,nx),(ny,nx),(),()->(ny,nx)",
)
def _get_lnam_(uu, vv, wx, dx2dy, lnam):
    ny, nx = uu.shape
    mask = np.isnan(uu) | np.isnan(vv)
    wx2 = wx // 2
    wy = (int(np.ceil(wx / dx2dy)) // 2) * 2 + 1
    wy2 = wy // 2
    lnam[:, :] = np.nan
    for j in numba.prange(wy2, ny - wy2 - 1):
        for i in range(wx2, nx - wx2 - 1):
            if mask[j - wy2 : j + wy2 + 1, i - wx2 : i + wx2 + 1].any():
                continue
            # if mask[j, i]:
            #     continue
            # xc = xx[j, i]
            # yc = yy[j, i]
            # print(j, i)
            denom1 = 0.0
            denom2 = 0.0
            # count = 0.0
            lnam[j, i] = 0.0
            for jl in range(-wy2, wy2 + 1):
                for il in range(-wx2, wx2 + 1):
                    # if mask[j + jl, i + il]:
                    #     continue
                    lnam[j, i] += il * vv[j + jl, i + il]
                    lnam[j, i] -= jl * uu[j + jl, i + il] * dx2dy
                    denom1 += il * uu[j + jl, i + il]
                    denom1 += jl * vv[j + jl, i + il] * dx2dy
                    denom2 += math.sqrt(uu[j + jl, i + il] ** 2 + vv[j + jl, i + il] ** 2) * math.sqrt(
                        il**2 + (jl * dx2dy) ** 2
                    )
                    # count += 1.0
            if (denom1 + denom2) > 1e-6:
                # print(lnam[j, i], denom1, denom2)
                lnam[j, i] /= denom1 + denom2


def _get_lnam_wrapper_(uu, vv, wx, dx2dy):
    uu = uu.astype("d")
    vv = vv.astype("d")
    wx = int(wx)
    dx2dy = float(dx2dy)
    return _get_lnam_(uu, vv, wx, dx2dy)


[docs] def get_lnam(u, v, window, dx=None, dy=None): """Compute local normalized angular momentum Parameters ---------- u : xarray.DataArray Zonal velocity component. v : xarray.DataArray Meridional velocity component. window : float Window size in kilometers. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- xarray.DataArray Local normalized angular momentum. """ dx, dy = sgrid.get_dx_dy(u, dx=dx, dy=dy) dxm = np.nanmean(dx) dym = np.nanmean(dy) wx = (int(np.ceil(window * 1e3 / dxm)) // 2) * 2 + 1 xdim = xcoords.get_xdim(u, errors="raise") ydim = xcoords.get_ydim(u, errors="raise") lnam = xr.apply_ufunc( _get_lnam_wrapper_, u, v, input_core_dims=[[ydim, xdim], [ydim, xdim]], output_core_dims=[[ydim, xdim]], dask="parallelized", kwargs={"wx": wx, "dx2dy": float(dym / dxm)}, vectorize=False, ) return lnam.transpose(*u.dims)
[docs] def get_div(u, v, dx=None, dy=None): """Compute horizontal divergence Parameters ---------- u : xarray.DataArray Zonal velocity component. v : xarray.DataArray Meridional velocity component. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- xarray.DataArray Horizontal divergence in s^-1. """ dx, dy = sgrid.get_dx_dy(u, dx=dx, dy=dy) xdim = xcoords.get_xdim(u, errors="raise") ydim = xcoords.get_ydim(u, errors="raise") input_core_dims = [[ydim, xdim], [ydim, xdim]] if np.shape(dx) == 0: input_core_dims.extend([[], []]) else: input_core_dims.extend([[ydim, xdim], [ydim, xdim]]) div = xr.apply_ufunc( _get_div_, u, v, dx, dy, join="override", input_core_dims=input_core_dims, output_core_dims=[[ydim, xdim]], dask="parallelized", ) div = div.transpose(*u.dims) return div
def _get_div_(u, v, dx, dy): sx = np.gradient(u, axis=-1) / dx sy = np.gradient(v, axis=-2) / dy div = sx + sy div[np.isnan(u) | np.isnan(v)] = np.nan return div
[docs] def get_okuboweiss(u, v, dx=None, dy=None): """Compute Okubo-Weiss parameter The Okubo-Weiss parameter distinguishes vortex-dominated (OW < 0) from strain-dominated (OW > 0) regions. Parameters ---------- u : xarray.DataArray Zonal velocity component. v : xarray.DataArray Meridional velocity component. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- xarray.DataArray Okubo-Weiss parameter in s^-2. """ dx, dy = sgrid.get_dx_dy(u, dx=dx, dy=dy) xdim = xcoords.get_xdim(u, errors="raise") ydim = xcoords.get_ydim(u, errors="raise") input_core_dims = [[ydim, xdim], [ydim, xdim]] if np.shape(dx) == 0: input_core_dims.extend([[], []]) else: input_core_dims.extend([[ydim, xdim], [ydim, xdim]]) ow = xr.apply_ufunc( _get_okuboweiss_, u, v, dx, dy, join="override", input_core_dims=input_core_dims, output_core_dims=[[ydim, xdim]], dask="allowed", # "allowed", # "parallelized", output_dtypes=[u.dtype], # dask_gufunc_kwargs={"meta": np.ones((1))}, ) ow = ow.transpose(*u.dims) return ow
def _get_okuboweiss_(u, v, dx, dy): sn = np.gradient(u, axis=-1) / dx - np.gradient(v, axis=-2) / dy ss = np.gradient(v, axis=-1) / dx + np.gradient(u, axis=-2) / dy om = np.gradient(v, axis=-1) / dx - np.gradient(u, axis=-2) / dy ow = sn**2 + ss**2 - om**2 ow[np.isnan(u) | np.isnan(v)] = np.nan return ow
[docs] def get_relvort(u, v, dx=None, dy=None): """Compute relative vorticity Parameters ---------- u : xarray.DataArray Zonal velocity component. v : xarray.DataArray Meridional velocity component. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- xarray.DataArray Relative vorticity in s^-1. Example ------- >>> import xarray as xr >>> import numpy as np >>> lon = xr.DataArray(np.linspace(0, 1, 50), dims="lon", ... attrs={"standard_name": "longitude"}) >>> lat = xr.DataArray(np.linspace(43, 44, 40), dims="lat", ... attrs={"standard_name": "latitude"}) >>> u = xr.DataArray(np.random.rand(40, 50), dims=("lat", "lon"), ... coords={"lon": lon, "lat": lat}) >>> v = xr.DataArray(np.random.rand(40, 50), dims=("lat", "lon"), ... coords={"lon": lon, "lat": lat}) >>> rv = get_relvort(u, v) # doctest: +SKIP """ dx, dy = sgrid.get_dx_dy(u, dx=dx, dy=dy) xdim = xcoords.get_xdim(u, errors="raise") ydim = xcoords.get_ydim(u, errors="raise") input_core_dims = [[ydim, xdim], [ydim, xdim]] if np.shape(dx) == 0: input_core_dims.extend([[], []]) else: input_core_dims.extend([[ydim, xdim], [ydim, xdim]]) rv = xr.apply_ufunc( _get_relvort_, u, v, dx, dy, input_core_dims=input_core_dims, output_core_dims=[[ydim, xdim]], join="inner", dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}, ) rv = rv.transpose(*u.dims) return rv
def _get_relvort_(u, v, dx, dy): rv = np.gradient(v, axis=-1) / dx rv -= np.gradient(u, axis=-2) / dy rv[np.isnan(u) | np.isnan(v)] = np.nan return rv
[docs] def get_coriolis(lat): """Compute Coriolis parameter Parameters ---------- lat : float or array-like Latitude in degrees. Returns ------- float or array-like Coriolis parameter (f = 2Ω sin(lat)) in s^-1. """ return 2 * OMEGA * np.sin(np.radians(lat))
[docs] def get_geos_old(ssh, dx=None, dy=None): """Compute geostrophic currents from SSH (deprecated) Parameters ---------- ssh : xarray.DataArray Sea surface height. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- u : xarray.DataArray Zonal geostrophic velocity. v : xarray.DataArray Meridional geostrophic velocity. """ dx, dy = sgrid.get_dx_dy(ssh, dx=dx, dy=dy) dims = list(ssh.dims) xaxis = dims.index(xcoords.get_xdim(ssh, errors="raise")) yaxis = dims.index(xcoords.get_ydim(ssh, errors="raise")) dhdx = np.gradient(ssh.values, axis=xaxis) / dx dhdy = np.gradient(ssh.values, axis=yaxis) / dy corio = get_coriolis(xcoords.get_lat(ssh)) u = xr.DataArray(-GRAVITY * dhdy, dims=ssh.dims, coords=ssh.coords) / corio v = xr.DataArray(GRAVITY * dhdx, dims=ssh.dims, coords=ssh.coords) / corio return u, v
[docs] def get_geos(ssh, dx=None, dy=None): """Compute geostrophic currents from SSH Uses geostrophic balance: f×u_g = -g∇η Parameters ---------- ssh : xarray.DataArray Sea surface height in meters. dx : xarray.DataArray, optional Grid resolution along X in meters. dy : xarray.DataArray, optional Grid resolution along Y in meters. Returns ------- u : xarray.DataArray Zonal geostrophic velocity in m/s. v : xarray.DataArray Meridional geostrophic velocity in m/s. Example ------- >>> import xarray as xr, numpy as np >>> from shoot.dyn import get_geos >>> lon = xr.DataArray(np.linspace(0, 2, 50), dims="lon", ... attrs={"standard_name": "longitude"}) >>> lat = xr.DataArray(np.linspace(43, 44, 40), dims="lat", ... attrs={"standard_name": "latitude"}) >>> ssh = xr.DataArray(np.random.rand(40, 50) * 0.1, dims=("lat", "lon"), ... coords={"lon": lon, "lat": lat}) >>> u, v = get_geos(ssh) # doctest: +SKIP """ dx, dy = sgrid.get_dx_dy(ssh, dx=dx, dy=dy) xdim = xcoords.get_xdim(ssh, errors="raise") ydim = xcoords.get_ydim(ssh, errors="raise") corio = get_coriolis(xcoords.get_lat(ssh)) input_core_dims = [[ydim, xdim]] if np.shape(dx) == 0: input_core_dims.extend([[], []]) else: input_core_dims.extend([[ydim, xdim], [ydim, xdim]]) if corio.ndim == 1: input_core_dims.append([ydim]) else: input_core_dims.append([ydim, xdim]) ugeos, vgeos = xr.apply_ufunc( _get_geos_, ssh, dx, dy, corio, input_core_dims=input_core_dims, output_core_dims=[[ydim, xdim], [ydim, xdim]], join="inner", dask="allowed", # "allowed", # "parallelized", dask_gufunc_kwargs={"allow_rechunk": True}, # output_dtypes=[ssh.dtype, ssh.dtype], ) return ugeos.transpose(*ssh.dims), vgeos.transpose(*ssh.dims)
def _get_geos_(ssh, dx, dy, corio): if corio.ndim == 1: corio = corio.reshape(corio.shape[0], 1) dhdx = np.gradient(ssh, axis=-1) / (dx * corio) dhdy = np.gradient(ssh, axis=-2) / (dy * corio) bad = np.isnan(ssh) dhdx[bad] = np.nan dhdy[bad] = np.nan return -dhdy * GRAVITY, dhdx * GRAVITY