Source code for shoot.num

#!/usr/bin/env python3
"""
Numerical utilities

Optimized numeric functions for peak detection and geometric computations.
"""

import numba
import numpy as np
import xarray as xr

from . import meta as smeta


@numba.njit
def _find_signed_peaks_2d_ref(data, wx, wy):
    """Reference implementation for finding 2D peaks (serial, not parallel)

    Parameters
    ----------
    data : ndarray
        2D data array to search for peaks.
    wx : int
        Window width in x direction.
    wy : int
        Window width in y direction.

    Returns
    -------
    minima : ndarray
        Array of (i, j) indices for local minima.
    maxima : ndarray
        Array of (i, j) indices for local maxima.
    """
    ny, nx = data.shape
    mask = np.isnan(data)
    maxima = np.empty((0, 2), dtype=np.int64)
    minima = np.empty((0, 2), dtype=np.int64)
    wx2 = wx // 2
    wy2 = wy // 2
    for j in numba.prange(1, ny - 1):
        for i in range(1, nx - 1):
            if mask[j, i]:
                continue
            i0 = max(0, i - wx2)
            i1 = min(nx, i + wx2 + 1)
            j0 = max(0, j - wy2)
            j1 = min(ny, j + wy2 + 1)
            if mask[j - 1 : j + 2, i - 1 : i + 2].all():
                continue
            imax = -1
            jmax = -1
            imin = -1
            jmin = -1
            vmax = -np.inf
            vmin = np.inf
            for jl in range(j0, j1):
                for il in range(i0, i1):
                    if mask[jl, il]:
                        continue
                    if data[jl, il] > vmax:
                        imax = il
                        jmax = jl
                        vmax = data[jl, il]
                    if data[jl, il] < vmin:
                        imin = il
                        jmin = jl
                        vmin = data[jl, il]

            if imin == i and jmin == j:
                minima = np.append(minima, np.array([[i, j]]), axis=0)
            if imax == i and jmax == j:
                maxima = np.append(maxima, np.array([[i, j]]), axis=0)

    return minima, maxima


@numba.njit(parallel=True)
def _find_signed_peaks_2d_paral_save(data, wx, wy):
    """Parallel implementation for finding 2D peaks (memory-safe version)

    Parameters
    ----------
    data : ndarray
        2D data array to search for peaks.
    wx : int
        Window width in x direction.
    wy : int
        Window width in y direction.

    Returns
    -------
    minima : ndarray
        3D array of (nx, ny, 2) with peak indices, -1 for non-peaks.
    maxima : ndarray
        3D array of (nx, ny, 2) with peak indices, -1 for non-peaks.
    """
    ny, nx = data.shape
    mask = np.isnan(data)
    maxima = np.ones((nx, ny, 2), dtype=np.int64) * -1
    minima = np.ones((nx, ny, 2), dtype=np.int64) * -1
    wx2 = wx // 2
    wy2 = wy // 2

    for k in numba.prange(0, (ny - 2) * (nx - 2)):
        j = k // (nx - 2) + 1
        i = k + 1 - (j - 1) * (nx - 2)
        if mask[j, i]:
            continue
        i0 = max(0, i - wx2)
        i1 = min(nx, i + wx2 + 1)
        j0 = max(0, j - wy2)
        j1 = min(ny, j + wy2 + 1)
        if mask[j - 1 : j + 2, i - 1 : i + 2].all():
            continue
        imax = -1
        jmax = -1
        imin = -1
        jmin = -1
        vmax = -np.inf
        vmin = np.inf
        for jl in range(j0, j1):
            for il in range(i0, i1):
                if mask[jl, il]:
                    continue
                if data[jl, il] > vmax:
                    imax = il
                    jmax = jl
                    vmax = data[jl, il]
                if data[jl, il] < vmin:
                    imin = il
                    jmin = jl
                    vmin = data[jl, il]
        if imin == i and jmin == j:
            minima[i, j] = [i, j]
        if imax == i and jmax == j:
            maxima[i, j] = [i, j]
    return minima, maxima


@numba.njit(parallel=True)
def _find_signed_peaks_2d_paral(data, wx, wy):
    """Parallel implementation for finding 2D peaks

    Note: May miss extrema on the right edge of the domain.

    Parameters
    ----------
    data : ndarray
        2D data array to search for peaks.
    wx : int
        Window width in x direction.
    wy : int
        Window width in y direction.

    Returns
    -------
    minima : ndarray
        3D array of (nx, ny, 2) with peak indices, -1 for non-peaks.
    maxima : ndarray
        3D array of (nx, ny, 2) with peak indices, -1 for non-peaks.
    """
    ny, nx = data.shape
    mask = np.isnan(data)

    maxima = np.ones((nx, ny, 2), dtype=np.int64) * -1  # np.ones((nmax,2),dtype=np.int64)*-1#
    minima = np.ones((nx, ny, 2), dtype=np.int64) * -1
    wx2 = wx // 2
    wy2 = wy // 2

    for j in numba.prange(1, ny - 1):
        for i in range(1, nx - 1):
            if mask[j, i]:
                continue
            i0 = max(0, i - wx2)
            i1 = min(nx, i + wx2 + 1)
            j0 = max(0, j - wy2)
            j1 = min(ny, j + wy2 + 1)
            if mask[j - 1 : j + 2, i - 1 : i + 2].all():
                continue
            imax = -1
            jmax = -1
            imin = -1
            jmin = -1
            vmax = -np.inf
            vmin = np.inf
            for jl in range(j0, j1):
                for il in range(i0, i1):
                    if mask[jl, il]:
                        continue
                    if data[jl, il] > vmax:
                        imax = il
                        jmax = jl
                        vmax = data[jl, il]
                    if data[jl, il] < vmin:
                        imin = il
                        jmin = jl
                        vmin = data[jl, il]
            if imin == i and jmin == j:
                minima[i, j] = [i, j]
            if imax == i and jmax == j:
                maxima[i, j] = [i, j]
    return minima, maxima


[docs] def find_signed_peaks_2d(data, wx, wy, paral=False): """Find local extrema (minima and maxima) in 2D field Parameters ---------- data : ndarray 2D data array to search for peaks. wx : int Window width in x direction for local extrema detection. wy : int Window width in y direction for local extrema detection. paral : bool, default False Use parallel implementation if True, serial if False. Returns ------- minima : ndarray Array of (i, j) indices for local minima. maxima : ndarray Array of (i, j) indices for local maxima. """ if paral: minima, maxima = _find_signed_peaks_2d_paral(data, wx, wy) ny, nx = data.shape # reshape minima = minima.reshape((ny * nx, 2)) maxima = maxima.reshape((ny * nx, 2)) return ( minima[np.where(~np.all(minima == -1, axis=1))[0]], maxima[np.where(~np.all(maxima == -1, axis=1))[0]], ) else: return _find_signed_peaks_2d_ref(data, wx, wy)
# @numba.njit
[docs] def find_signed_peaks_2d_jb_old(lnam, closed_lines): """Find peaks within closed contour lines (old implementation) Parameters ---------- lnam : xarray.DataArray 2D LNAM field. closed_lines : list List of closed contour lines. Returns ------- minima : ndarray Array of (i, j) indices for local minima. maxima : ndarray Array of (i, j) indices for local maxima. """ lon = smeta.get_lon(lnam) lat = smeta.get_lat(lnam) ny, nx = lnam.shape mask = np.isnan(lnam) # find inside indexes dict_line = {nl: [] for nl in range(len(closed_lines))} for j in numba.prange(ny): for i in range(nx): if mask[j, i]: continue for nl, line in enumerate(closed_lines): if points_in_polygon([lon[i], lat[j]], line): dict_line[nl].append([i, j]) # find maximum maxima = np.empty((0, 2), dtype=np.int64) minima = np.empty((0, 2), dtype=np.int64) for nl in dict_line: ind_lat = xr.DataArray(dict_line[nl][:, 1], dims="latitude") ind_lon = xr.DataArray(dict_line[nl][:, 1], dims="longitude") imax, jmax = abs(lnam).isel(latitude=ind_lat, longitude=ind_lon).argmax() if lnam[imax, jmax] > 0: maxima.append(maxima, np.array([[imax, jmax]]), axis=0) else: minima.append(minima, np.array([[imax, jmax]]), axis=0) return minima, maxima
@numba.guvectorize( [(numba.float64[:], numba.float64[:, :], numba.boolean[:])], "(nd),(ne,nd)->()", ) def points_in_polygon(point, poly, inside): """Test if points are inside a polygon Parameters ---------- point : array-like of shape (2,) or (npts, 2) Point(s) to test, as (x, y) coordinates. poly : ndarray of shape (n, 2) Polygon vertices as (x, y) coordinates. inside : bool array Output array indicating whether each point is inside. """ n = poly.shape[0] inside[0] = False p1x, p1y = poly[0] x, y = point[:2] for i in range(n + 1): p2x, p2y = poly[i % n] if y > min(p1y, p2y): if y <= max(p1y, p2y): if x <= max(p1x, p2x): if p1y != p2y: xints = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x if p1x == p2x or x <= xints: inside[0] = not inside[0] p1x, p1y = p2x, p2y
[docs] def get_coord_name(data): """Extract longitude and latitude coordinate names from xarray object Parameters ---------- data : xarray.Dataset or xarray.DataArray Data object with coordinates. Returns ------- lon_name : str or None Name of longitude coordinate. lat_name : str or None Name of latitude coordinate. """ coords_name = [k for k in data.coords.keys()] lon_name = None lat_name = None for var in coords_name: if var[:3] in ['LON', 'Lon', 'lon', 'longitude', 'Longitude', 'lon_rho']: lon_name = var if var[:3] in ['LAT', 'Lat', 'lat', 'latitude', 'Latitude', 'lat_rho']: lat_name = var return lon_name, lat_name