# pylint: disable=too-many-lines
import copy
import os
import typing
import warnings
import deprecation
import geopandas as gpd
import harmonica as hm
import numpy as np
import pandas as pd
import pygmt
import verde as vd
import xarray as xr
import xrft
from numpy.typing import NDArray
from pyproj import Transformer
import polartoolkit
from polartoolkit import logger, maps, regions
[docs]
def default_hemisphere(hemisphere: str | None) -> str:
"""
Returns the default hemisphere set in the users environment variables or raises a
error.
Parameters
----------
hemisphere : str | None
hemisphere to use, either "north" or "south", or None to use the default set in
the users environment variables.
Returns
-------
str
hemisphere to use, either "north" or "south"
"""
if hemisphere is None:
try:
hemisphere = os.environ["POLARTOOLKIT_HEMISPHERE"]
if hemisphere not in ["north", "south"]:
msg = f"hemisphere must be either 'north' or 'south', not {hemisphere}"
raise ValueError(msg)
return hemisphere
except KeyError as e:
msg = (
"hemisphere not set, either set it as a temp environment variable in "
"python (os.environ['POLARTOOLKIT_HEMISPHERE']='north'), set it as a "
"permanent operating system environment variable (i.e. for Unix, add "
"'export POLARTOOLKIT_HEMISPHERE=south' to the end of your .bashrc "
"file) or pass it as an argument (hemisphere='north')"
)
raise KeyError(msg) from e
return hemisphere
[docs]
def rmse(data: typing.Any, as_median: bool = False) -> float:
"""
function to give the root mean/median squared error (RMSE) of data
Parameters
----------
data : numpy.ndarray[typing.Any, typing.Any]
input data
as_median : bool, optional
choose to give root median squared error instead, by default False
Returns
-------
float
RMSE value
"""
if as_median:
value: float = np.sqrt(np.nanmedian(data**2).item())
else:
value = np.sqrt(np.nanmean(data**2).item())
return value
[docs]
def get_grid_region(
grid: str | xr.DataArray,
) -> tuple[float, ...]:
"""
Returns the region of the specified grid.
Parameters
----------
grid : str or xarray.DataArray
Input grid to get region from. Filename string or loaded grid.
Returns
-------
tuple
array with the region boundary in the format (xmin, xmax, ymin, ymax)
"""
if isinstance(grid, xr.DataArray) and len(grid.dims) > 2:
grid = grid.squeeze()
try:
region: tuple[float, ...] = tuple(
float(pygmt.grdinfo(grid, per_column="n", o=i)[:-1]) for i in range(4)
)
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid region can't be extracted")
raise e
return region
[docs]
def get_grid_spacing(
grid: str | xr.DataArray,
) -> float | None:
"""
Returns the spacing of the specified grid.
Parameters
----------
grid : str or xarray.DataArray
Input grid to get spacing from. Filename string or loaded grid.
Returns
-------
float | None
Spacing of the grid or None if it can't be extracted.
"""
if isinstance(grid, xr.DataArray) and len(grid.dims) > 2:
grid = grid.squeeze()
try:
spacing: float | None = float(pygmt.grdinfo(grid, per_column="n", o=7)[:-1])
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid spacing can't be extracted")
spacing = None
return spacing
[docs]
def get_grid_registration(
grid: str | xr.DataArray,
) -> str | None:
"""
Returns the registration of the specified grid.
Parameters
----------
grid : str or xarray.DataArray
Input grid to get registration from. Filename string or loaded grid.
Returns
-------
str | None
"g" for gridline or "p" for pixel registration.
"""
if isinstance(grid, xr.DataArray) and len(grid.dims) > 2:
grid = grid.squeeze()
try:
reg = grid.gmt.registration # type: ignore[union-attr]
registration: str | None = "g" if reg == 0 else "p"
except AttributeError:
logger.warning(
"grid registration not extracted, re-trying with file loaded as xarray grid"
)
# grid = xr.load_dataarray(grid)
with xr.open_dataarray(grid) as da:
try:
reg = da.gmt.registration
registration = "g" if reg == 0 else "p"
except AttributeError:
logger.warning("grid registration can't be extracted, setting to 'g'.")
registration = "g"
except Exception as e: # pylint: disable=broad-exception-caught
logger.exception(e)
logger.warning("grid registration can't be extracted")
registration = None
return registration
[docs]
def get_grid_info(
grid: str | xr.DataArray,
print_info: bool = False,
) -> tuple[
float | None,
tuple[float, float, float, float] | None,
float | None,
float | None,
str | None,
]:
"""
Returns information of the specified grid.
Parameters
----------
grid : str or xarray.DataArray
Input grid to get info from. Filename string or loaded grid.
print_info : bool, optional
If true, prints out the grid info, by default False
Returns
-------
tuple
(string of grid spacing,
array with the region boundary,
data min,
data max,
grid registration)
"""
# if isinstance(grid, str):
# grid = xr.load_dataarray(grid)
# try:
# grid = xr.load_dataarray(grid).squeeze()
# except ValueError:
# print("loading grid as dataarray didn't work")
# raise
# pass
# grid = xr.open_rasterio(grid)
# grid = rioxarray.open_rasterio(grid)
if isinstance(grid, xr.DataArray) and len(grid.dims) > 2:
grid = grid.squeeze()
try:
spacing: float | None = float(pygmt.grdinfo(grid, per_column="n", o=7)[:-1])
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid spacing can't be extracted")
spacing = None
try:
region: typing.Any = tuple(
float(pygmt.grdinfo(grid, per_column="n", o=i)[:-1]) for i in range(4)
)
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid region can't be extracted")
region = None
try:
zmin: float | None = float(pygmt.grdinfo(grid, per_column="n", o=4)[:-1])
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid zmin can't be extracted")
zmin = None
try:
zmax = float(pygmt.grdinfo(grid, per_column="n", o=5)[:-1])
except Exception as e: # pylint: disable=broad-exception-caught
# pygmt.exceptions.GMTInvalidInput:
logger.exception(e)
logger.warning("grid zmax can't be extracted")
zmax = None
try:
reg = grid.gmt.registration # type: ignore[union-attr]
registration: str | None = "g" if reg == 0 else "p"
except AttributeError:
logger.warning(
"grid registration not extracted, re-trying with file loaded as xarray grid"
)
# grid = xr.load_dataarray(grid)
with xr.open_dataarray(grid) as da:
try:
reg = da.gmt.registration
registration = "g" if reg == 0 else "p"
except AttributeError:
logger.warning("grid registration can't be extracted, setting to 'g'.")
registration = "g"
except Exception as e: # pylint: disable=broad-exception-caught
logger.exception(e)
logger.warning("grid registration can't be extracted")
registration = None
if print_info:
info = (
f"grid spacing: {spacing} m\n"
f"grid region: {region}\n"
f"grid zmin: {zmin}\n"
f"grid zmax: {zmax}\n"
f"grid registration: {registration}"
)
print(info) # noqa: T201
return spacing, region, zmin, zmax, registration
[docs]
def dd2dms(dd: float) -> str:
"""
Convert decimal degrees to minutes, seconds. Modified from
https://stackoverflow.com/a/10286690/18686384
Parameters
----------
dd : float
input decimal degrees
Returns
-------
str
degrees in the format "DD:MM:SS"
"""
is_positive = dd >= 0
dd = abs(dd)
minutes, seconds = divmod(dd * 3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees if is_positive else -degrees
return f"{int(degrees)}:{int(minutes)}:{seconds}"
[docs]
def region_to_df(
region: tuple[typing.Any, typing.Any, typing.Any, typing.Any] | pd.DataFrame,
coord_names: tuple[str, str] = ("easting", "northing"),
reverse: bool = False,
) -> tuple[typing.Any, typing.Any, typing.Any, typing.Any] | pd.DataFrame:
"""
Convert region bounds in format [xmin, xmax, ymin, ymax] to pandas dataframe with
coordinates of region corners, or reverse this if `reverse` is True.
Parameters
----------
region : tuple[typing.Any, typing.Any, typing.Any, typing.Any] | pandas.DataFrame
bounding region in format [xmin, xmax, ymin, ymax] or, if `reverse` is True,
a DataFrame with coordinate columns with names set by `cood_names`
coord_names : tuple[str, str], optional
names of input or output coordinate columns, by default ("easting", "northing")
reverse : bool, optional
If True, convert from df to region tuple, else, convert from region tuple to df,
by default False
Returns
-------
tuple[typing.Any, typing.Any, typing.Any, typing.Any] | pandas.DataFrame
Dataframe with easting and northing columns, and a row for each corner of the
region, or, if `reverse` is True, an array in the format
[xmin, xmax, ymin, ymax].
"""
if reverse:
return (
region[coord_names[0]][0], # type: ignore[call-overload]
region[coord_names[0]][1], # type: ignore[call-overload]
region[coord_names[1]][0], # type: ignore[call-overload]
region[coord_names[1]][2], # type: ignore[call-overload]
)
bl = (region[0], region[2])
br = (region[1], region[2])
tl = (region[0], region[3])
tr = (region[1], region[3])
return pd.DataFrame(data=[bl, br, tl, tr], columns=(coord_names[0], coord_names[1]))
[docs]
def region_xy_to_ll(
region: tuple[typing.Any, typing.Any, typing.Any, typing.Any],
hemisphere: str | None = None,
dms: bool = False,
) -> tuple[typing.Any, typing.Any, typing.Any, typing.Any]:
"""
Convert region in format [xmin, xmax, ymin, ymax] in projected meters to lat / lon
Parameters
----------
hemisphere : str, optional,
choose between the "north" or "south" hemispheres
region : tuple[typing.Any, typing.Any, typing.Any, typing.Any]
region boundaries in format [xmin, xmax, ymin, ymax] in meters
dms: bool
if True, will return results as deg:min:sec instead of decimal degrees, by
default False
Returns
-------
tuple[typing.Any, typing.Any, typing.Any, typing.Any]
region boundaries in format [lon_min, lon_max, lat_min, lat_max]
"""
hemisphere = default_hemisphere(hemisphere)
df = region_to_df(region)
if hemisphere == "north":
df_proj = epsg3413_to_latlon(df, reg=True)
elif hemisphere == "south":
df_proj = epsg3031_to_latlon(df, reg=True)
else:
msg = "hemisphere must be 'north' or 'south'"
raise ValueError(msg)
return tuple([dd2dms(x) for x in df_proj] if dms is True else df_proj)
[docs]
def region_ll_to_xy(
region: tuple[float, float, float, float],
hemisphere: str | None = None,
) -> tuple[float, float, float, float]:
"""
Convert region in format [lon_min, lon_max, lat_min, lat_max] to projected meters in
the north or south polar stereographic projections.
Parameters
----------
hemisphere : str, optional,
choose between the "north" or "south" hemispheres
region : tuple[float, float, float, float]
region boundaries in format [xmin, xmax, ymin, ymax] in decimal degrees
Returns
-------
tuple[float, float, float, float]
region boundaries in format [x_min, x_max, y_min, y_max]
"""
hemisphere = default_hemisphere(hemisphere)
df = region_to_df(region, coord_names=("lon", "lat"))
if hemisphere == "north":
df_proj: tuple[float, float, float, float] = latlon_to_epsg3413(df, reg=True)
elif hemisphere == "south":
df_proj = latlon_to_epsg3031(df, reg=True)
else:
msg = "hemisphere must be 'north' or 'south'"
raise ValueError(msg)
return df_proj
[docs]
def region_to_bounding_box(
region: tuple[typing.Any, typing.Any, typing.Any, typing.Any],
) -> tuple[typing.Any, typing.Any, typing.Any, typing.Any]:
"""
Convert region in format [xmin, xmax, ymin, ymax] to bounding box format used for
icepyx:
[
lower left longitude,
lower left latitude,
upper right longitude,
upper right latitude
]
Same format as [xmin, ymin, xmax, ymax], used for `bbox` parameter of
geopandas.read_file
Parameters
----------
region : tuple[typing.Any, typing.Any, typing.Any, typing.Any]
region boundaries in format [xmin, xmax, ymin, ymax] in meters or degrees.
Returns
-------
tuple[typing.Any, typing.Any, typing.Any, typing.Any]
region boundaries in bounding box format.
"""
return (region[0], region[2], region[1], region[3])
[docs]
def epsg3031_to_latlon(
df: pd.DataFrame | tuple[typing.Any],
reg: bool = False,
input_coord_names: tuple[str, str] | None = None,
output_coord_names: tuple[str, str] = ("lon", "lat"),
) -> pd.DataFrame | tuple[typing.Any]:
"""
Convert coordinates from EPSG:3031 Antarctic Polar Stereographic in meters to
EPSG:4326 WGS84 in decimal degrees.
Parameters
----------
df : pandas.DataFrame or tuple[typing.Any]
input dataframe with easting and northing columns, or tuple [x,y]
reg : bool, optional
if true, returns a GMT formatted region string, by default False
input_coord_names : tuple | None, optional
set names for input coordinate columns, by default ("x", "y") or
("easting", "northing")
output_coord_names : tuple | None, optional
set names for output coordinate columns, by default ("lon", "lat")
Returns
-------
pandas.DataFrame or tuple[typing.Any]
Updated dataframe with new latitude and longitude columns, numpy.ndarray in
format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon]
"""
return reproject(
df,
"epsg:3031",
"epsg:4326",
reg=reg,
input_coord_names=input_coord_names,
output_coord_names=output_coord_names,
)
[docs]
def epsg3413_to_latlon(
df: pd.DataFrame | tuple[typing.Any],
reg: bool = False,
input_coord_names: tuple[str, str] | None = None,
output_coord_names: tuple[str, str] = ("lon", "lat"),
) -> pd.DataFrame | tuple[typing.Any]:
"""
Convert coordinates from EPSG:3413 North Polar Stereographic in meters to
EPSG:4326 WGS84 in decimal degrees.
Parameters
----------
df : pandas.DataFrame or tuple[typing.Any]
input dataframe with easting and northing columns, or tuple [x,y]
reg : bool, optional
if true, returns a GMT formatted region string, by default False
input_coord_names : tuple | None, optional
set names for input coordinate columns, by default ("x", "y") or
("easting", "northing")
output_coord_names : tuple | None, optional
set names for output coordinate columns, by default ("lon", "lat")
Returns
-------
pandas.DataFrame or tuple[typing.Any]
Updated dataframe with new latitude and longitude columns, numpy.ndarray in
format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon]
"""
return reproject(
df,
"epsg:3413",
"epsg:4326",
reg=reg,
input_coord_names=input_coord_names,
output_coord_names=output_coord_names,
)
[docs]
def latlon_to_epsg3031(
df: pd.DataFrame | NDArray[typing.Any],
reg: bool = False,
input_coord_names: tuple[str, str] | None = None,
output_coord_names: tuple[str, str] = ("easting", "northing"),
) -> pd.DataFrame | NDArray[typing.Any]:
"""
Convert coordinates from EPSG:4326 WGS84 in decimal degrees to EPSG:3031 Antarctic
Polar Stereographic in meters.
Parameters
----------
df : pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any]
input dataframe with latitude and longitude columns
reg : bool, optional
if true, returns a GMT formatted region string, by default False
input_coord_names : tuple | None, optional
set names for input coordinate columns, by default ("lon", "lat")
output_coord_names : tuple | None, optional
set names for output coordinate columns, by default ("easting", "northing")
Returns
-------
pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any]
Updated dataframe with new easting and northing columns or numpy.ndarray in
format [xmin, xmax, ymin, ymax]
"""
return reproject(
df,
"epsg:4326",
"epsg:3031",
reg=reg,
input_coord_names=input_coord_names,
output_coord_names=output_coord_names,
)
[docs]
def latlon_to_epsg3413(
df: pd.DataFrame | NDArray[typing.Any],
reg: bool = False,
input_coord_names: tuple[str, str] | None = None,
output_coord_names: tuple[str, str] = ("easting", "northing"),
) -> pd.DataFrame | NDArray[typing.Any]:
"""
Convert coordinates from EPSG:4326 WGS84 in decimal degrees to EPSG:3413 North Polar
Stereographic in meters.
Parameters
----------
df : pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any]
input dataframe with latitude and longitude columns
reg : bool, optional
if true, returns a GMT formatted region string, by default False
input_coord_names : tuple | None, optional
set names for input coordinate columns, by default ("lon", "lat")
output_coord_names : tuple | None, optional
set names for output coordinate columns, by default ("easting", "northing")
Returns
-------
pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any]
Updated dataframe with new easting and northing columns or numpy.ndarray in
format [xmin, xmax, ymin, ymax]
"""
return reproject(
df,
"epsg:4326",
"epsg:3413",
reg=reg,
input_coord_names=input_coord_names,
output_coord_names=output_coord_names,
)
[docs]
def reproject(
df: pd.DataFrame | tuple[typing.Any],
input_crs: str,
output_crs: str,
input_coord_names: tuple[str, str] | None = None,
output_coord_names: tuple[str, str] | None = None,
reg: bool = False,
) -> pd.DataFrame | tuple[typing.Any]:
"""
Convert coordinates from input CRS to output CRS. Coordinates can be supplied as a
dataframe with coordinate columns set by input_coord_names, or as a tuple of a list
of x coordinates and a list of y coordinates.
Parameters
----------
df : pandas.DataFrame or tuple[typing.Any]
input dataframe with easting/longitude and northing/latitude columns, or tuple
[x,y]
input_crs : str
input CRS in EPSG format, e.g. "epsg:4326"
output_crs : str
output CRS in EPSG format, e.g. "epsg:3413"
reg : bool, optional
if true, returns a GMT formatted region string, by default False
input_coord_names : tuple, optional
set names for input coordinate columns, by default "x"/"y" or
"easting"/"northing" if input_crs is "epsg:3413" or "epsg:3031", or if input_crs
is "epsg_4326", "lon"/"lat"
output_coord_names : tuple, optional
set names for output coordinate columns, by default "x"/"y" if output_crs is
"epsg:3413" or "epsg:3031", or if output_crs is "epsg_4326", "lon"/"lat".
Returns
-------
pandas.DataFrame or tuple[typing.Any]
Updated dataframe with new latitude and longitude columns, numpy.ndarray in
format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon]
"""
# make crs lowercase
input_crs = input_crs.lower()
output_crs = output_crs.lower()
transformer = Transformer.from_crs(
input_crs,
output_crs,
always_xy=True,
)
if isinstance(df, pd.DataFrame):
df = df.copy()
# use sensible default coord names
if input_crs == "epsg:4326":
if input_coord_names is None:
input_coord_names = ("lon", "lat")
elif input_coord_names is None:
# check for coord column names
if ("x" in df.columns) and ("y" in df.columns):
input_coord_names = ("x", "y")
elif ("easting" in df.columns) and ("northing" in df.columns):
input_coord_names = ("easting", "northing")
if output_crs == "epsg:4326":
if output_coord_names is None:
output_coord_names = ("lon", "lat")
else:
if output_coord_names is None:
# check for coord column names
if ("x" in df.columns) and ("y" in df.columns):
output_coord_names = ("x", "y")
elif ("easting" in df.columns) and ("northing" in df.columns):
output_coord_names = ("easting", "northing")
if output_coord_names is None:
output_coord_names = ("easting", "northing")
( # pylint: disable=unpacking-non-sequence
df[output_coord_names[0]],
df[output_coord_names[1]],
) = transformer.transform(
df[input_coord_names[0]].tolist(), # type: ignore[index]
df[input_coord_names[1]].tolist(), # type: ignore[index]
)
if reg is True:
df = (
df[output_coord_names[0]].min(),
df[output_coord_names[0]].max(),
df[output_coord_names[1]].min(),
df[output_coord_names[1]].max(),
)
else:
df = tuple(transformer.transform(df[0], df[1])) # type: ignore[misc]
return df
[docs]
def points_inside_region(
df: pd.DataFrame,
region: tuple[float, float, float, float],
names: tuple[str, str] = ("x", "y"),
reverse: bool = False,
) -> pd.DataFrame:
"""
return a subset of a dataframe which is within a region
Parameters
----------
df : pandas.DataFrame
dataframe with coordinate columns to use for defining if within region
region : tuple[float, float, float, float]
bounding region in format [xmin, xmax, ymin, ymax] for bounds of new subset
dataframe
names : tuple[str, str], optional
column names to use for x and y coordinates, by default ("x", "y") or
("easting", "northing")
reverse : bool, optional
if True, will return points outside the region, by default False
Returns
-------
pandas.DataFrame
returns a subset dataframe
"""
# make a copy of the dataframe
df1 = df.copy()
# check for coord column names
if ("x" in df1.columns) and ("y" in df1.columns):
pass
elif ("easting" in df1.columns) and ("northing" in df1.columns):
names = ("easting", "northing")
# make column of booleans for whether row is within the region
df1["inside_tmp"] = vd.inside(
coordinates=(df1[names[0]], df1[names[1]]), region=region
)
if reverse is True:
# subset if False
df_result = df1.loc[df1.inside_tmp == False].copy() # noqa: E712 # pylint: disable=singleton-comparison
else:
# subset if True
df_result = df1.loc[df1.inside_tmp == True].copy() # noqa: E712 # pylint: disable=singleton-comparison
# drop the column 'inside'
return df_result.drop(columns="inside_tmp")
[docs]
def block_reduce(
df: pd.DataFrame,
reduction: typing.Callable[..., float | int],
input_coord_names: tuple[str, str] = ("x", "y"),
input_data_names: typing.Any | None = None,
**kwargs: typing.Any,
) -> pd.DataFrame:
"""
perform a block reduction of a dataframe.
Parameters
----------
df : pandas.DataFrame
data to block reduce
reduction : typing.Callable
function to use in reduction, e.g. np.mean
input_coord_names : tuple[str, str], optional
strings of coordinate column names, by default ("x", "y") or
("easting", "northing")
input_data_names : typing.Any | None, optional
strings of data column names, by default None
Returns
-------
pandas.DataFrame
a block-reduced dataframe
"""
# define verde reducer function
reducer = vd.BlockReduce(reduction, **kwargs)
# check for coord column names
if ("x" in df.columns) and ("y" in df.columns):
pass
elif ("easting" in df.columns) and ("northing" in df.columns):
input_coord_names = ("easting", "northing")
# if no data names provided, use all columns
if input_data_names is None:
input_data_names = tuple(df.columns.drop(list(input_coord_names)))
# get tuples of pd.Series
input_coords = tuple([df[col] for col in input_coord_names]) # pylint: disable=consider-using-generator
input_data = tuple([df[col] for col in input_data_names]) # pylint: disable=consider-using-generator
# apply reduction
coordinates, data = reducer.filter(
coordinates=input_coords,
data=input_data,
)
# add reduced coordinates to a dictionary
coord_cols = dict(zip(input_coord_names, coordinates, strict=False))
# add reduced data to a dictionary
if len(input_data_names) < 2:
data_cols = {input_data_names[0]: data}
else:
data_cols = dict(zip(input_data_names, data, strict=False))
# merge dicts and create dataframe
return pd.DataFrame(data=coord_cols | data_cols)
[docs]
def nearest_grid_fill(
grid: xr.DataArray,
method: str = "verde",
crs: str | None = None,
) -> xr.DataArray:
"""
fill missing values in a grid with the nearest value.
Parameters
----------
grid : xarray.DataArray
grid with missing values
method : str, optional
choose method of filling, by default "verde"
crs : str | None, optional
if method is 'rioxarray', provide the crs of the grid, in format 'epsg:xxxx',
by default None
Returns
-------
xarray.DataArray
filled grid
"""
# get coordinate names
original_dims = tuple(grid.sizes.keys())
# get original grid name
original_name = grid.name
if method == "rioxarray":
filled: xr.DataArray = (
grid.rio.write_crs(crs)
.rio.set_spatial_dims(original_dims[1], original_dims[0])
.rio.write_nodata(np.nan)
.rio.interpolate_na(method="nearest")
.rename(original_name)
)
elif method == "verde":
df = vd.grid_to_table(grid)
df_dropped = df[df[grid.name].notna()]
coords = (df_dropped[grid.dims[1]], df_dropped[grid.dims[0]])
region = vd.get_region((df[grid.dims[1]], df[grid.dims[0]]))
filled = (
vd.KNeighbors()
.fit(coords, df_dropped[grid.name])
.grid(region=region, shape=grid.shape, data_names=original_name)[
original_name
]
)
# elif method == "pygmt":
# filled = pygmt.grdfill(grid, mode="n", verbose="quiet).rename(original_name)
else:
msg = "method must be 'rioxarray', or 'verde'"
raise ValueError(msg)
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
return filled.rename(
{
next(iter(filled.dims)): original_dims[0],
list(filled.dims)[1]: original_dims[1],
}
)
[docs]
def filter_grid(
grid: xr.DataArray,
filter_width: float | None = None,
filt_type: str = "lowpass",
pad_width_factor: int = 3,
pad_mode: str = "linear_ramp",
pad_constant: float | None = None,
pad_end_values: float | None = None,
) -> xr.DataArray:
"""
Apply a spatial filter to a grid.
Parameters
----------
grid : xarray.DataArray
grid to filter the values of
filter_width : float | None, optional
width of the filter in meters for high and low pass filtering, by default None
filt_type : str, optional
type of filter to use from 'lowpass', 'highpass' 'up_deriv', 'easting_deriv',
'northing_deriv', or 'total_gradient' by default "lowpass"
pad_width_factor : int, optional
factor of grid width to pad the grid by, by default 3, which equates to a pad
with a width of 1/3 of the grid width.
pad_mode : str, optional
mode of padding, can be "linear", by default "linear_ramp"
pad_constant : float | None, optional
constant value to use for padding, by default None
pad_end_values : float | None, optional
value to use for end of padding if pad_mode is "linear_ramp", by default None
Returns
-------
xarray.DataArray
a filtered grid
"""
# get coordinate names
original_dims = tuple(grid.sizes.keys())
# get original grid name
original_name = grid.name
# if there are nan's, fill them with nearest neighbor
if grid.isna().any():
filled = nearest_grid_fill(grid, method="verde")
else:
filled = grid.copy()
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
filled = filled.rename(
{
next(iter(filled.dims)): original_dims[0],
list(filled.dims)[1]: original_dims[1],
}
)
# define width of padding in each direction
pad_width = {
original_dims[1]: grid[original_dims[1]].size // pad_width_factor,
original_dims[0]: grid[original_dims[0]].size // pad_width_factor,
}
if pad_mode == "constant":
if pad_constant is None:
pad_constant = filled.median()
pad_end_values = None
if (pad_mode == "linear_ramp") and (pad_end_values is None):
pad_end_values = filled.median()
if pad_mode != "constant":
pad_constant = (
None # needed until https://github.com/xgcm/xrft/issues/211 is fixed
)
# apply padding
pad_kwargs = {
**pad_width,
"mode": pad_mode,
"constant_values": pad_constant,
"end_values": pad_end_values,
}
padded = xrft.pad(
filled,
**pad_kwargs,
)
if filt_type == "lowpass":
filt = hm.gaussian_lowpass(padded, wavelength=filter_width).rename("filt")
elif filt_type == "highpass":
filt = hm.gaussian_highpass(padded, wavelength=filter_width).rename("filt")
elif filt_type == "up_deriv":
filt = hm.derivative_upward(padded).rename("filt")
elif filt_type == "easting_deriv":
filt = hm.derivative_easting(padded).rename("filt")
elif filt_type == "northing_deriv":
filt = hm.derivative_northing(padded).rename("filt")
elif filt_type == "total_gradient":
filt = hm.total_gradient_amplitude(padded).rename("filt")
else:
msg = (
"filt_type must be 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', "
"'northing_deriv', or 'total_gradient'"
)
raise ValueError(msg)
unpadded = xrft.unpad(filt, pad_width)
# reset coordinate values to original (avoid rounding errors)
unpadded = unpadded.assign_coords(
{
original_dims[0]: grid[original_dims[0]].to_numpy(),
original_dims[1]: grid[original_dims[1]].to_numpy(),
}
)
if grid.isna().any():
result: xr.DataArray = xr.where(grid.notna(), unpadded, grid)
else:
result = unpadded.copy()
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
result = result.rename(
{
next(iter(result.dims)): original_dims[0],
# list(result.dims)[0]: original_dims[0],
list(result.dims)[1]: original_dims[1],
}
)
return result.rename(original_name)
[docs]
def points_inside_shp(
points: pd.DataFrame | gpd.GeoDataFrame,
shapefile: gpd.GeoDataFrame,
crs: str | None = None,
coord_names: tuple[str, str] | None = None,
hemisphere: str | None = None,
) -> pd.DataFrame | gpd.GeoDataFrame:
"""
Add a column to a dataframe indicating whether each point is inside a shapefile.
Parameters
----------
points : pandas.DataFrame | geopandas.GeoDataFrame
dataframe with coordinate columns specified by coord_names to use for defining
if within shapefile
shapefile : geopandas.GeoDataFrame
shapefile to use for defining if point are within it or not
crs : str | None, optional
if points is not a geodataframe, crs to use to convert into a geodataframe, by
default None
coord_names : tuple[str, str] | None, optional
names of coordinate columns, by default 'x' and 'y' or 'easting' and 'northing'
hemisphere : str | None, optional
hemisphere to use for automatically detecting crs, by default None
Returns
-------
pandas.DataFrame | geopandas.GeoDataFrame
Dataframe with a new column 'inside' which is True if the point is inside the
shapefile
"""
points = points.copy()
if isinstance(points, pd.DataFrame):
if crs is None:
hemisphere = default_hemisphere(hemisphere)
if hemisphere == "north":
crs = "epsg:3413"
elif hemisphere == "south":
crs = "epsg:3031"
else:
msg = "provide 'crs' or set hemisphere to 'north' or 'south'"
raise ValueError(msg)
if coord_names is None:
# check for coord column names
if ("x" in points.columns) and ("y" in points.columns):
coord_names = ("x", "y")
elif ("easting" in points.columns) and ("northing" in points.columns):
coord_names = ("easting", "northing")
points = gpd.GeoDataFrame(
points,
geometry=gpd.points_from_xy(
x=points[coord_names[0]], # type: ignore[index]
y=points[coord_names[1]], # type: ignore[index]
),
crs=crs,
)
points["inside"] = points.within(shapefile.geometry[0])
return points
[docs]
def mask_from_shp(
shapefile: str | gpd.GeoDataFrame,
hemisphere: str | None = None,
invert: bool = True,
grid: xr.DataArray | str | None = None,
xr_grid: xr.DataArray | None = None,
grid_file: str | None = None,
region: str | tuple[float, float, float, float] | None = None,
spacing: float | None = None,
masked: bool = False,
pixel_register: bool = True,
input_coord_names: tuple[str, str] = ("easting", "northign"),
) -> xr.DataArray:
"""
Create a mask or a masked grid from area inside or outside of a closed shapefile.
Parameters
----------
shapefile : str or geopandas.GeoDataFrame
either path to .shp filename, must by in same directory as accompanying files :
.shx, .prj, .dbf, should be a closed polygon file, or shapefile which as already
been loaded into a geodataframe.
hemisphere : str, optional
choose "north" for EPSG:3413 or "south" for EPSG:3031
invert : bool, optional
choose whether to mask data outside the shape (False) or inside the shape
(True), by default True (masks inside of shape)
grid : xarray.DataArray or str, optional
_xarray.DataArray or path to a .nc file; to use to define region, or to mask, by
default None
xr_grid : xarray.DataArray, optional
deprecated, use `grid` instead, by default None
grid_file : str, optional
deprecated, use `grid` instead, by default None
region : str or tuple[float, float, float, float], optional
bounding region in format [xmin, xmax, ymin, ymax] in meters to create a dummy
grid if none are supplied, by default None
spacing : str or int, optional
grid spacing in meters to create a dummy grid if none are supplied, by default
None
masked : bool, optional
choose whether to return the masked grid (True) or the mask itself (False), by
default False
pixel_register : bool, optional
choose whether the grid is pixel registered (True) or grid registered (False),
by default True
input_coord_names : tuple[str, str], optional
set names for input coordinate columns, by default ("easting", "northing")
Returns
-------
xarray.DataArray
Returns either a masked grid, or the mask grid itself.
"""
hemisphere = default_hemisphere(hemisphere)
shp = (
gpd.read_file(shapefile, engine="pyogrio")
if isinstance(shapefile, str)
else shapefile
)
if hemisphere == "north":
crs = "epsg:3413"
elif hemisphere == "south":
crs = "epsg:3031"
else:
msg = "hemisphere must be 'north' or 'south'"
raise ValueError(msg)
if xr_grid is not None:
grid = xr_grid
msg = "`xr_grid` parameter has changed, use 'grid' instead."
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
if grid_file is not None:
grid = grid_file
msg = "`grid_file` parameter has changed, use 'grid' instead."
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
if grid is None:
coords = vd.grid_coordinates(
region=region,
spacing=spacing,
pixel_register=pixel_register,
)
ds = vd.make_xarray_grid(
coords,
np.ones_like(coords[0]),
dims=input_coord_names[::-1],
data_names="z",
)
xds = ds.z.rio.write_crs(crs)
elif isinstance(grid, xr.DataArray):
# get coordinate names
original_dims = tuple(grid.sizes.keys())
xds = grid.rio.write_crs(crs).rio.set_spatial_dims(
original_dims[1], original_dims[0]
)
elif isinstance(grid, str):
xds = xr.load_dataarray(grid)
# get coordinate names
original_dims = tuple(xds.sizes.keys())
xds = xds.rio.write_crs(crs).rio.set_spatial_dims(
original_dims[1], original_dims[0]
)
# if single geometry, convert to list
try:
iter(shp.geometry)
except TypeError:
geom = [shp.geometry]
else:
geom = shp.geometry
masked_grd = xds.rio.clip(
geom,
xds.rio.crs,
drop=False,
invert=invert,
)
mask_grd = np.isfinite(masked_grd)
if masked is True:
output = masked_grd
elif masked is False:
output = mask_grd
try:
output = output.drop_vars("spatial_ref") # pylint: disable=used-before-assignment
except ValueError as e:
logger.exception(e)
return typing.cast(xr.DataArray, output)
[docs]
@deprecation.deprecated(
deprecated_in="0.4.0",
removed_in="2.0.0",
current_version=polartoolkit.__version__,
details="alter_region has been moved to the regions module, use that instead",
)
def alter_region(
starting_region: tuple[float, float, float, float],
**kwargs: typing.Any,
) -> tuple[float, float, float, float]:
"""deprecated function, use regions.alter_region instead"""
return regions.alter_region(
starting_region,
**kwargs,
)
[docs]
def set_proj(
region: tuple[float, float, float, float],
hemisphere: str | None = None,
fig_height: float | None = None,
fig_width: float | None = None,
) -> tuple[str, str | None, float, float]:
"""
Gives GMT format projection string from region and figure height or width.
Inspired from https://github.com/mrsiegfried/Venturelli2020-GRL.
Parameters
----------
region : tuple[float, float, float, float]
region boundaries in format [xmin, xmax, ymin, ymax] in projected meters
hemisphere : str, optional
set whether to lat lon projection is for "north" hemisphere (EPSG:3413) or
"south" hemisphere (EPSG:3031)
fig_height : float | None
desired figure height in cm, by default is None
fig_width : float | None
instead of using figure height, set the projection based on figure width in cm,
by default is None
Returns
-------
tuple
returns a tuple of the following variables: proj, proj_latlon, fig_width,
fig_height
"""
try:
hemisphere = default_hemisphere(hemisphere)
except KeyError:
hemisphere = None
xmin, xmax, ymin, ymax = region
if fig_height is None and fig_width is None:
msg = "either fig_height or fig_width must be set"
raise ValueError(msg)
if fig_width is not None:
fig_height = fig_width * (ymax - ymin) / (xmax - xmin)
ratio = (xmax - xmin) / (fig_width / 100)
else:
fig_height = typing.cast(float, fig_height)
fig_width = fig_height * (xmax - xmin) / (ymax - ymin)
ratio = (ymax - ymin) / (fig_height / 100)
proj = f"x1:{ratio}"
if hemisphere == "north":
proj_latlon = f"s-45/90/70/1:{ratio}"
elif hemisphere == "south":
proj_latlon = f"s0/-90/-71/1:{ratio}"
else:
proj_latlon = None
return proj, proj_latlon, fig_width, fig_height
[docs]
def grd_trend(
da: xr.DataArray,
coords: tuple[str, str, str] = ("x", "y", "z"),
deg: int = 1,
plot: bool = False,
**kwargs: typing.Any,
) -> tuple[xr.DataArray, xr.DataArray]:
"""
Fit an arbitrary order trend to a grid and use it to detrend.
Parameters
----------
da : xarray.DataArray
input grid
coords : tuple[str, str, str], optional
coordinate names of the supplied grid, by default ['x', 'y', 'z']
deg : int, optional
trend order to use, by default 1
plot : bool, optional
plot the results, by default False
Returns
-------
tuple[xarray.DataArray, xarray.DataArray]
returns xarray.DataArrays of the fitted surface, and the detrended grid.
"""
# convert grid to a dataframe
df0 = vd.grid_to_table(da).astype("float64")
df = df0.dropna().copy()
# define a trend
trend = vd.Trend(degree=deg).fit((df[coords[0]], df[coords[1]]), df[coords[2]])
# fit a trend to the grid of degree: deg
df["fit"] = trend.predict((df[coords[0]], df[coords[1]]))
# remove the trend from the data
df["detrend"] = df[coords[2]] - df.fit
info = get_grid_info(da)
spacing = info[0]
region = info[1]
registration = info[4]
fit = pygmt.xyz2grd(
data=df[[coords[0], coords[1], "fit"]],
region=region,
spacing=spacing,
registration=registration,
)
detrend = pygmt.xyz2grd(
data=df[[coords[0], coords[1], "detrend"]],
region=region,
spacing=spacing,
registration=registration,
)
if plot is True:
cmap: typing.Any = kwargs.get("cmap", "plasma")
coast: typing.Any = kwargs.get("coast", True)
inset: typing.Any = kwargs.get("inset", True)
inset_position: typing.Any = kwargs.get("inset_position", "jTL+jTL+o0/0")
inset_pos: typing.Any = kwargs.get("inset_pos")
origin_shift: typing.Any = kwargs.get("origin_shift", "y")
fit_label: typing.Any = kwargs.get("fit_label", f"fitted trend (order {deg})")
input_label: typing.Any = kwargs.get("input_label", "input grid")
title: typing.Any = kwargs.get("title", "Detrending a grid")
detrended_label: typing.Any = kwargs.get("detrended_label", "detrended")
fig = maps.plot_grd(
da,
cbar_label=input_label,
title=title,
cmap=cmap,
# grd2cpt=True,
inset=inset,
inset_position=inset_position,
inset_pos=inset_pos,
coast=coast,
hist=True,
robust=True,
**kwargs,
)
fig = maps.plot_grd(
fit,
fig=fig,
cmap=cmap,
# grd2cpt=True,
cbar_label=fit_label,
origin_shift=origin_shift,
coast=coast,
hist=True,
robust=True,
**kwargs,
)
fig = maps.plot_grd(
detrend,
fig=fig,
cmap=cmap,
# grd2cpt=True,
cbar_label=detrended_label,
origin_shift=origin_shift,
coast=coast,
hist=True,
robust=True,
**kwargs,
)
fig.show()
return fit, detrend
[docs]
def get_combined_min_max(
values: tuple[typing.Any, ...],
shapefile: str | gpd.GeoDataFrame | None = None,
robust: bool = False,
region: tuple[float, float, float, float] | None = None,
hemisphere: str | None = None,
absolute: bool = False,
robust_percentiles: tuple[float, float] = (0.02, 0.98),
) -> tuple[float, float]:
"""
Get a grids max and min values.
Parameters
----------
values : tuple[xarray.DataArray | pandas.Series | numpy.ndarray]
values to get min and max for
shapefile : Union[str or geopandas.GeoDataFrame], optional
path or loaded shapefile to use for a mask, by default None
robust: bool, optional
choose whether to return the 2nd and 98th percentile values, instead of the
min/max
region : tuple[float, float, float, float], optional
give a subset region to get min and max values from, in format
[xmin, xmax, ymin, ymax], by default None
hemisphere : str, optional
set whether to lat lon projection is for "north" hemisphere (EPSG:3413) or
"south" hemisphere (EPSG:3031)
absolute : bool, optional
choose whether to return the absolute min and max values, by default False
robust_percentiles : tuple[float, float], optional
percentiles to use for robust min and max values, by default (0.02, 0.98)
Returns
-------
tuple[float, float]
returns the min and max values.
"""
try:
hemisphere = default_hemisphere(hemisphere)
except KeyError:
hemisphere = None
# get min max of each grid
limits = []
for v in values:
limits.append(
get_min_max(
v,
robust=robust,
region=region,
shapefile=shapefile,
hemisphere=hemisphere,
absolute=absolute,
robust_percentiles=robust_percentiles,
)
)
# get min of all mins and max of all maxes
ar = np.array(limits)
return np.min(ar[:, 0]), np.max(ar[:, 1])
[docs]
def grd_compare(
da1: xr.DataArray | str,
da2: xr.DataArray | str,
plot: bool = True,
plot_type: typing.Any | None = None,
robust: bool = False,
**kwargs: typing.Any,
) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
"""
Find the difference between 2 grids and plot the results, if necessary resample and
cut grids to match
Parameters
----------
da1 : xarray.DataArray or str
first grid, loaded grid of filename
da2 : xarray.DataArray or str
second grid, loaded grid of filename
plot : bool, optional
plot the results, by default True
plot_type : typing.Any or None, optional
this argument has been deprecated and will default to 'pygmt'
robust : bool, optional
use xarray robust color lims instead of min and max, by default is False.
Keyword Args
------------
shp_mask : str
shapefile filename to use to mask the grids for setting the color range.
robust : bool
use xarray robust color lims instead of min and max, by default is False.
region : tuple[float, float, float, float]
choose a specific region to compare, in format [xmin, xmax, ymin, ymax].
rmse_in_title: bool
add the RMSE to the title, by default is True.
cpt_lims : tuple[float, float]
set the colorbar limits for the two grids.
diff_lims : tuple[float, float]
set the colorbar limits for the difference grid.
Returns
-------
tuple[xarray.DataArray, xarray.DataArray, xarray.DataArray]
three xarray.DataArrays: (diff, resampled grid1, resampled grid2)
"""
kwargs = copy.deepcopy(kwargs)
if plot_type is not None:
warnings.warn(
"plot_type has been deprecated and will default to 'pygmt'",
DeprecationWarning,
stacklevel=2,
)
shp_mask = kwargs.pop("shp_mask", None)
region = kwargs.pop("region", None)
verbose = kwargs.pop("verbose", "error")
if isinstance(da1, str):
da1 = xr.load_dataarray(da1)
if isinstance(da2, str):
da2 = xr.load_dataarray(da2)
# get coordinate names
da1_original_dims = tuple(da1.sizes.keys())
da2_original_dims = tuple(da2.sizes.keys())
if region is not None:
# cut grids to supplied region
da1 = subset_grid(
da1,
region=region,
)
da2 = subset_grid(
da2,
region=region,
)
# extract grid info of both grids
da1_info = get_grid_info(da1)
da2_info = get_grid_info(da2)
# extract spacing of both grids
assert da1_info[0] is not None
assert da2_info[0] is not None
da1_spacing: float = da1_info[0]
da2_spacing: float = da2_info[0]
# extract regions of both grids
da1_reg = da1_info[1]
da1_reg = typing.cast(tuple[float, float, float, float], da1_reg)
da2_reg = da2_info[1]
da2_reg = typing.cast(tuple[float, float, float, float], da2_reg)
# extract registrations of both grids
da1_registration = da1_info[-1]
da2_registration = da2_info[-1]
logger.debug("grid 1 spacing: %s, grid 2 spacing: %s", da1_spacing, da2_spacing)
logger.debug("grid 1 region: %s, grid 2 region: %s", da1_reg, da2_reg)
logger.debug(
"grid 1 registration: %s, grid 2 registration: %s",
da1_registration,
da2_registration,
)
# if spacing, region and registration match, no resampling
if (
(da1_spacing == da2_spacing)
and (da1_reg == da2_reg)
and (da1_registration == da2_registration)
):
grid1 = da1
grid2 = da2
else:
# get minimum grid spacing of both grids
if da1_spacing != da2_spacing:
spacing = min(da1_spacing, da2_spacing)
logger.info(
"grid spacings don't match, using smaller spacing (%s m).",
spacing,
)
else:
spacing = da1_spacing
# get inside region of both grids
if da1_reg != da2_reg:
xmin = max(da1_reg[0], da2_reg[0])
xmax = min(da1_reg[1], da2_reg[1])
ymin = max(da1_reg[2], da2_reg[2])
ymax = min(da1_reg[3], da2_reg[3])
region = (xmin, xmax, ymin, ymax)
logger.info("grid regions dont match, using inner region %s", region)
else:
region = da1_reg
# get registration, if different use gridline
registration = "g" if da1_registration != da2_registration else da1_registration
logger.info(
"resampling both grids with a spacing of %s, region of %s"
"and a registration of %s",
spacing,
region,
registration,
)
# resample grids
grid1 = pygmt.grdsample(
da1,
spacing=spacing,
region=region,
registration=registration,
verbose=verbose,
)
grid2 = pygmt.grdsample(
da2,
spacing=spacing,
region=region,
registration=registration,
verbose=verbose,
)
reg = grid1.gmt.registration
grid_registration: str = "g" if reg == 0 else "p"
if grid_registration != registration:
warnings.warn(
"registration hasn't been correctly changed",
stacklevel=2,
)
grid1 = change_reg(grid1)
reg = grid2.gmt.registration
grid_registration = "g" if reg == 0 else "p"
if grid_registration != registration:
warnings.warn(
"registration hasn't been correctly changed",
stacklevel=2,
)
grid2 = change_reg(grid2)
grid1 = typing.cast(xr.DataArray, grid1)
grid2 = typing.cast(xr.DataArray, grid2)
dif = grid1 - grid2
if plot is True:
cpt_lims = kwargs.pop("cpt_lims", None)
# define color limits for the 2 input grids
if cpt_lims is not None:
vmin, vmax = cpt_lims
else:
vmin, vmax = get_combined_min_max(
(grid1, grid2),
shapefile=shp_mask,
robust=robust,
region=region,
absolute=kwargs.get("maxabs", False),
robust_percentiles=kwargs.get("robust_percentiles", (0.02, 0.98)),
)
# define color limits for the difference grid
diff_lims = kwargs.pop("diff_lims", None)
if diff_lims is None:
diff_lims = get_min_max(
dif,
shapefile=shp_mask,
robust=robust,
region=region,
absolute=kwargs.get("diff_maxabs", True),
robust_percentiles=kwargs.get("robust_percentiles", (0.02, 0.98)),
)
title = kwargs.pop("title", "Comparing Grids")
if kwargs.get("rmse_in_title", True) is True:
title += f", RMSE: {round(rmse(dif), kwargs.get('RMSE_decimals', 2))}"
coast = kwargs.pop("coast", False)
origin_shift = kwargs.get("origin_shift", "x")
cmap = kwargs.get("cmap", "viridis")
subplot_labels = kwargs.get("subplot_labels", False)
inset = kwargs.pop("inset", True)
new_kwargs = {
key: value
for key, value in kwargs.items()
if key
not in [
"cmap",
"shp_mask",
]
}
diff_kwargs = {
key: value
for key, value in new_kwargs.items()
if key not in ["reverse_cpt", "cbar_label", "shp_mask"]
}
fig = maps.plot_grd(
grid1,
cmap=cmap,
region=region,
coast=coast,
title=kwargs.get("grid1_name", "grid 1"),
cpt_lims=(vmin, vmax),
**new_kwargs,
)
if subplot_labels is True:
fig.text(
position="TL",
justify="BL",
text="a)",
font=kwargs.get("label_font", "18p,Helvetica,black"),
offset=kwargs.get("label_offset", "j0/.3"),
no_clip=True,
)
fig = maps.plot_grd(
dif,
cmap=kwargs.get("diff_cmap", "balance+h0"),
region=region,
coast=coast,
origin_shift=origin_shift,
cbar_label="difference",
cpt_lims=diff_lims,
fig=fig,
title=title,
inset=inset,
inset_position=kwargs.get("inset_position", "jTL+jTL+o0/0"),
inset_pos=kwargs.get("inset_pos"),
**diff_kwargs,
)
if subplot_labels is True:
fig.text(
position="TL",
justify="BL",
text="b)",
font=kwargs.get("label_font", "20p,Helvetica,black"),
offset=kwargs.get("label_offset", "j0/.3"),
no_clip=True,
)
fig = maps.plot_grd(
grid2,
cmap=cmap,
region=region,
coast=coast,
origin_shift=origin_shift,
fig=fig,
title=kwargs.get("grid2_name", "grid 2"),
cpt_lims=(vmin, vmax),
**new_kwargs,
)
if subplot_labels is True:
fig.text(
position="TL",
justify="BL",
text="c)",
font=kwargs.get("label_font", "20p,Helvetica,black"),
offset=kwargs.get("label_offset", "j0/.3"),
no_clip=True,
)
fig.show()
grid1 = grid1.rename(
{
next(iter(grid1.dims)): da1_original_dims[0],
list(grid1.dims)[1]: da1_original_dims[1],
}
)
grid2 = grid2.rename(
{
next(iter(grid2.dims)): da2_original_dims[0],
list(grid2.dims)[1]: da2_original_dims[1],
}
)
return (dif, grid1, grid2)
[docs]
def make_grid(
region: tuple[float, float, float, float],
spacing: float,
value: float,
name: str,
) -> xr.DataArray:
"""
Create a grid with 1 variable by defining a region, spacing, name and constant value
Parameters
----------
region : tuple[float, float, float, float]
bounding region in format [xmin, xmax, ymin, ymax]
spacing : float
spacing for grid
value : float
constant value to use for variable
name : str
name for variable
Returns
-------
xarray.DataArray
Returns a xarray.DataArray with 1 variable of constant value.
"""
coords = vd.grid_coordinates(region=region, spacing=spacing, pixel_register=True)
data = np.ones_like(coords[0]) * value
return typing.cast(
xr.DataArray,
vd.make_xarray_grid(coords, data, dims=["y", "x"], data_names=name),
)
[docs]
def square_subplots(n: int) -> tuple[int, int]:
"""
From https://github.com/matplotlib/grid-strategy/blob/master/src/grid_strategy/strategies.py
Calculate the number of rows and columns based on the total number of items (n) to
make an arrangement as close to square as looks good.
Parameters
----------
n : int
The number of total plots in the subplot
Returns
-------
tuple[int, int]
Returns a tuple in the format (number of rows, number of columns), so for
example a 3 x 2 grid would be represented as ``(3, 3)``, because there are 2
rows of length 3.
"""
special_cases = {
1: (1, 1),
2: (1, 2),
3: (1, 3),
4: (2, 2),
5: (2, 3),
6: (2, 3),
7: (2, 4),
8: (2, 4),
9: (3, 3),
10: (3, 4),
11: (3, 4),
12: (3, 4),
}
if n in special_cases:
return special_cases[n]
# May not work for very large n
n_sqrtf = np.sqrt(n)
n_sqrt = int(np.ceil(n_sqrtf))
if n_sqrtf == n_sqrt:
# Perfect square, we're done
x, y = n_sqrt, n_sqrt
elif n <= n_sqrt * (n_sqrt - 1):
# An n_sqrt x n_sqrt - 1 grid is close enough to look pretty
# square, so if n is less than that value, will use that rather
# than jumping all the way to a square grid.
x, y = n_sqrt, n_sqrt - 1
elif not (n_sqrt % 2) and n % 2:
# If the square root is even and the number of axes is odd, in
# order to keep the arrangement horizontally symmetrical, using a
# grid of size (n_sqrt + 1 x n_sqrt - 1) looks best and guarantees
# symmetry.
x, y = (n_sqrt + 1, n_sqrt - 1)
else:
# It's not a perfect square, but a square grid is best
x, y = n_sqrt, n_sqrt
if n == x * y:
# There are no deficient rows, so we can just return from here
return x, y # tuple(x for i in range(y))
# If exactly one of these is odd, make it the rows
if (x % 2) != (y % 2) and (x % 2):
x, y = y, x
return x, y
[docs]
def random_color() -> str:
"""
generate a random color in format R/G/B
Returns
-------
str
returns a random color string, for example '95/226/100'
"""
rng = np.random.default_rng()
numbers = rng.integers(low=0, high=256, size=3)
return f"{numbers[0]}/{numbers[1]}/{numbers[2]}"
[docs]
def subset_grid(
grid: xr.DataArray,
region: tuple[float, float, float, float],
) -> xr.DataArray:
"""
Return a subset of a grid based on a region
Parameters
----------
grid : xarray.DataArray
grid to be clipped
region : tuple[float, float, float, float]
region to clip to, in format [xmin, xmax, ymin, ymax]
Returns
-------
xarray.DataArray
clipped grid
"""
try:
da = pygmt.grdcut(
grid,
region=region,
verbose="quiet",
)
# get coordinate names
original_dims = tuple(grid.sizes.keys())
return da.rename(
{
next(iter(da.dims)): original_dims[0],
list(da.dims)[1]: original_dims[1],
}
)
except IndexError:
ew = [region[0], region[1]]
ns = [region[2], region[3]]
return grid.sel(
{
list(grid.sizes.keys())[1]: slice(min(ew), max(ew)),
list(grid.sizes.keys())[0]: slice(min(ns), max(ns)), # noqa: RUF015
}
)
[docs]
def get_min_max(
values: xr.DataArray | pd.Series | NDArray,
shapefile: str | gpd.GeoDataFrame | None = None,
robust: bool = False,
region: tuple[float, float, float, float] | None = None,
hemisphere: str | None = None,
absolute: bool = False,
robust_percentiles: tuple[float, float] = (0.02, 0.98),
) -> tuple[float, float]:
"""
Get a grids max and min values.
Parameters
----------
values : xarray.DataArray or pandas.Series or numpy.ndarray
values to find min or max for
shapefile : Union[str or geopandas.GeoDataFrame], optional
path or loaded shapefile to use for a mask, by default None
robust: bool, optional
choose whether to return the 2nd and 98th percentile values, instead of the
min/max
region : tuple[float, float, float, float], optional
give a subset region to get min and max values from, in format
[xmin, xmax, ymin, ymax], by default None
hemisphere : str, optional
set whether to lat lon projection is for "north" hemisphere (EPSG:3413) or
"south" hemisphere (EPSG:3031)
absolute : bool, optional
return the absolute min and max values, by default False
robust_percentiles : tuple[float, float], optional
decimal percentiles to use for robust min and max, by default (0.02, 0.98)
Returns
-------
tuple[float, float]
returns the min and max values.
"""
try:
hemisphere = default_hemisphere(hemisphere)
except KeyError:
hemisphere = None
if region is not None:
values = subset_grid(values, region)
if shapefile is None:
if robust:
v_min, v_max = np.nanquantile(values, robust_percentiles)
else:
v_min, v_max = np.nanmin(values), np.nanmax(values)
elif shapefile is not None:
if isinstance(values, xr.DataArray):
masked = mask_from_shp(
shapefile,
hemisphere=hemisphere,
grid=values,
masked=True,
invert=False,
)
else:
msg = "values must be an xarray.DataArray to use shapefile masking"
raise ValueError(msg)
if robust is True:
v_min, v_max = np.nanquantile(masked, robust_percentiles)
elif robust is False:
v_min, v_max = np.nanmin(masked), np.nanmax(masked)
if absolute is True:
v_min, v_max = -vd.maxabs([v_min, v_max]), vd.maxabs([v_min, v_max]) # pylint: disable=used-before-assignment
assert v_min <= v_max, "min value should be less than or equal to max value" # pylint: disable=possibly-used-before-assignment
return (v_min, v_max)
[docs]
def shapes_to_df(
shapes: list[float],
hemisphere: str | None = None,
) -> pd.DataFrame:
"""
convert the output of `regions.draw_region` and `profiles.draw_lines` to a dataframe
of easting and northing points
Parameters
----------
hemisphere : str, optional
choose between the "north" or "south" hemispheres
shapes : list
list of vertices
Returns
-------
pandas.DataFrame
Dataframe with easting, northing, and shape_num.
"""
hemisphere = default_hemisphere(hemisphere)
df = pd.DataFrame()
for i, j in enumerate(shapes):
lon = [coord[0] for coord in j] # type: ignore[attr-defined]
lat = [coord[1] for coord in j] # type: ignore[attr-defined]
shape = pd.DataFrame({"lon": lon, "lat": lat, "shape_num": i})
df = pd.concat((df, shape))
if hemisphere == "north":
df = latlon_to_epsg3413(df)
elif hemisphere == "south":
df = latlon_to_epsg3031(df)
else:
msg = "hemisphere must be 'north' or 'south'"
raise ValueError(msg)
return df
[docs]
def polygon_to_region(
polygon: list[float],
hemisphere: str | None = None,
) -> tuple[float, float, float, float]:
"""
convert the output of `regions.draw_region` to bounding region in EPSG:3031 for the
south hemisphere and EPSG:3413 for the north hemisphere.
Parameters
----------
polyon : list
list of polygon vertices
hemisphere : str, optional
choose between the "north" or "south" hemispheres
Returns
-------
tuple[float, float, float, float]
region in format in format [xmin, xmax, ymin, ymax]
"""
hemisphere = default_hemisphere(hemisphere)
df = shapes_to_df(shapes=polygon, hemisphere=hemisphere)
if df.shape_num.max() > 0:
logger.info(
"supplied dataframe has multiple polygons, only using the first one."
)
df = df[df.shape_num == 0]
reg: tuple[float, float, float, float] = vd.get_region((df.easting, df.northing))
return reg
[docs]
def mask_from_polygon(
polygon: list[float],
hemisphere: str | None = None,
invert: bool = False,
drop_nans: bool = False,
grid: str | xr.DataArray | None = None,
region: tuple[float, float, float, float] | None = None,
spacing: int | None = None,
**kwargs: typing.Any,
) -> xr.DataArray:
"""
convert the output of `regions.draw_region` to a mask or use it to mask a grid
Parameters
----------
polygon : list
list of polygon vertices
hemisphere : str, optional
choose between the "north" or "south" hemispheres
invert : bool, optional
reverse the sense of masking, by default False
drop_nans : bool, optional
drop nans after masking, by default False
grid : Union[str, xarray.DataArray], optional
grid to mask, by default None
region : tuple[float, float, float, float], optional
region to create a grid if none is supplied, in format [xmin, xmax, ymin, ymax],
by default None
spacing : int, optional
spacing to create a grid if none is supplied, by default None
Returns
-------
xarray.DataArray
masked grid or mask grid with 1's inside the mask.
"""
hemisphere = default_hemisphere(hemisphere)
# convert drawn polygon into dataframe
df = shapes_to_df(polygon, hemisphere=hemisphere)
data_coords = (df.easting, df.northing)
# remove additional polygons
if df.shape_num.max() > 0:
logger.info(
"supplied dataframe has multiple polygons, only using the first one."
)
df = df[df.shape_num == 0]
# if grid given as filename, load it
if isinstance(grid, str):
grid = xr.load_dataarray(grid)
grid = typing.cast(xr.DataArray, grid)
ds = grid.to_dataset(name="z") # type: ignore[union-attr]
elif isinstance(grid, xr.DataArray):
ds = grid.to_dataset()
# if no grid given, make a dummy one with supplied region and spacing
elif grid is None:
if region is None:
msg = "region must be supplied if grid is None"
raise ValueError(msg)
if spacing is None:
msg = "spacing must be supplied if grid is None"
raise ValueError(msg)
coords = vd.grid_coordinates(
region=region,
spacing=spacing,
pixel_register=kwargs.get("pixel_register", False),
)
ds = vd.make_xarray_grid(
coords, np.ones_like(coords[0]), dims=("y", "x"), data_names="z"
)
else:
msg = "grid must be a xarray.DataArray, a filename, or None"
raise ValueError(msg)
masked = vd.convexhull_mask(
data_coords,
grid=ds,
).z
# reverse the mask
if invert is True:
inverse = masked.isna()
inverse = inverse.where(inverse != 0)
masked = inverse * ds.z
# drop nans
if drop_nans is True:
masked = masked.where(masked.notna() == 1, drop=True)
return typing.cast(xr.DataArray, masked)
[docs]
def change_reg(grid: xr.DataArray) -> xr.DataArray:
"""
Use GMT grdedit to change the registration type in the metadata.
Parameters
----------
grid : xarray.DataArray
input grid to change the reg for.
Returns
-------
xarray.DataArray
returns a xarray.DataArray with switched reg type.
"""
with pygmt.clib.Session() as ses: # noqa: SIM117
# store the input grid in a virtual file so GMT can read it from a dataarray
# and write the output to a virtual file after changing the registration
with (
ses.virtualfile_in(data=grid) as vingrd,
ses.virtualfile_out(kind="grid") as voutgrd,
):
args = f"{vingrd} -T -G{voutgrd}"
ses.call_module("grdedit", args)
f_out: xr.DataArray = ses.virtualfile_to_raster(vfname=voutgrd, kind="grid")
# check if the registration has been changed
if grid.gmt.registration == f_out.gmt.registration:
msg = "issue in changing registration"
raise ValueError(msg)
return f_out
[docs]
def grd_blend(
grid1: xr.DataArray,
grid2: xr.DataArray,
) -> xr.DataArray:
"""
Use GMT grdblend to blend 2 grids into 1.
Parameters
----------
grid1 : xarray.DataArray
input grid to change the reg for.
grid2 : xarray.DataArray
input grid to change the reg for.
Returns
-------
xarray.DataArray
returns a blended grid.
"""
with pygmt.clib.Session() as ses: # noqa: SIM117
# store the input grid in a virtual file so GMT can read it from a dataarray
# and write the output to a virtual file after changing the registration
with (
ses.virtualfile_in(data=grid1) as vingrd1,
ses.virtualfile_in(data=grid2) as vingrd2,
ses.virtualfile_out(kind="grid") as voutgrd,
):
args = f"{vingrd1} {vingrd2} -Cf -G{voutgrd}"
ses.call_module("grdblend", args=args)
f_out: xr.DataArray = ses.virtualfile_to_raster(vfname=voutgrd, kind="grid")
return f_out
[docs]
def get_fig_width() -> float:
"""
Get the width of the current PyGMT figure instance.
Returns
-------
float
width of the figure
"""
with pygmt.clib.Session() as session: # noqa: SIM117
with pygmt.helpers.GMTTempFile() as tmpfile:
session.call_module("mapproject", f"-Ww ->{tmpfile.name}")
map_width = tmpfile.read().strip()
return float(map_width)
[docs]
def get_fig_height() -> float:
"""
Get the height of the current PyGMT figure instance.
Returns
-------
float
height of the figure
"""
with pygmt.clib.Session() as session: # noqa: SIM117
with pygmt.helpers.GMTTempFile() as tmpfile:
session.call_module("mapproject", f"-Wh ->{tmpfile.name}")
map_height = tmpfile.read().strip()
return float(map_height)
[docs]
def gmt_str_to_list(region: tuple[float, float, float, float]) -> str:
"""
convert a tuple of floats representing the boundaries of a region into a GMT-style
region string
Parameters
----------
region : tuple[float, float, float, float]
bounding region in format [xmin, xmax, ymin, ymax]
Returns
-------
str
a GMT style region string
"""
return "".join([str(x) + "/" for x in region])[:-1]