Source code for polartoolkit.utils

# pylint: disable=too-many-lines
import copy
import os
import typing
import warnings

import deprecation
import geopandas as gpd
import harmonica as hm
import numpy as np
import pandas as pd
import pygmt
import pyproj
import shapely
import verde as vd
import xarray as xr
import xrft
from numpy.typing import NDArray
from pyproj import Transformer

import polartoolkit
from polartoolkit import logger, maps, regions


[docs] def default_epsg(epsg: str | None, hemisphere: str | None) -> str: """ Returns the provided EPSG code through 1 of 4 methods in the following preference. 1) the parameter `epsg` which is an EPSG code as a string ("3031") 2) the parameter `hemisphere` which is either the strings "north" or "south" (which correspond to EPSG codes 3413 and 3031 respectively), 3) the user-set environment variable POLARTOOLKIT_EPSG which should be a EPSG code as a string ("3031") or 4) the user-set environment variable POLARTOOLKIT_HEMISPHERE which should be set to "north" or "south" (which correspond to EPSG codes 3413 and 3031 respectively). If none of these are provided, an error is raised informing the user how to set the values. Parameters ---------- epsg : str | None an EPSG code as a string ("3031"), by default None. hemisphere : str | None hemisphere to use, either "north" or "south", corresponding to EPSG codes 3413 and 3031 respectively, by default None. Returns ------- str EPSG code to use """ # Raise error that both are provided if epsg is not None and hemisphere is not None: msg = "only provide `epsg` or `hemisphere`, not both" raise ValueError(msg) # checks for correct types / value if not isinstance(epsg, (str, type(None))): msg = f"`epsg` must be a string or None, not {type(epsg)}" # type: ignore[unreachable] raise TypeError(msg) if isinstance(epsg, str) and not epsg.isdigit(): msg = f"`epsg` must be a string of digits or None, not {epsg}" raise TypeError(msg) if hemisphere not in ["north", "south", None]: msg = f"`hemisphere` must be 'north', 'south', or None, not {hemisphere}" raise TypeError(msg) # 1st priority: if epsg provided, use that if epsg is not None: if epsg == "3431": msg = ( "Are you sure you didn't mean EPSG 3413 (North Polar Stereographic) " "instead of 3431 (For Nevada USA)?" ) warnings.warn(msg, UserWarning, stacklevel=2) return epsg # 2nd priority: if hemisphere provided, use that if hemisphere is not None: if hemisphere == "north": return "3413" if hemisphere == "south": return "3031" # 3rd priority: look for POLARTOOLKIT_EPSG environment variable try: epsg = os.environ["POLARTOOLKIT_EPSG"] # 4th priority: look for POLARTOOLKIT_HEMISPHERE environment variable except KeyError as e: try: hemisphere = os.environ["POLARTOOLKIT_HEMISPHERE"] if hemisphere == "north": return "3413" if hemisphere == "south": return "3031" msg = f"Environment variable POLARTOOLKIT_HEMISPHERE must be either 'north' or 'south', not {hemisphere}" raise ValueError(msg) # pylint: disable=raise-missing-from # if neither found, raise error except KeyError: msg = ( "You must either 1) supply `epsg` as a string of an EPSG code, 2) " "supply `hemisphere` as one of the strings 'north' or 'south', 3) set " "the environment variable POLARTOOLKIT_EPSG as an EPSG code string, or " "4) set the environment variable 'POLARTOOLKIT_HEMISPHERE' as either " "'north' or 'south'. These can be either set as a temporary environment" " variable in python (os.environ['POLARTOOLKIT_EPSG']='3031') or " "set as a permanent operating system environment variable (i.e. for" "Unix, add 'export POLARTOOLKIT_EPSG=3031' to the end of your " ".bashrc file)." ) raise KeyError(msg) from e else: # check epsg is string of digits assert epsg.isdigit(), ( f"Environment variable POLARTOOLKIT_EPSG must be a string of numbers, not {epsg}" ) return epsg
[docs] def rmse(data: typing.Any, as_median: bool = False) -> float: """ function to give the root mean/median squared error (RMSE) of data Parameters ---------- data : numpy.ndarray[typing.Any, typing.Any] input data as_median : bool, optional choose to give root median squared error instead, by default False Returns ------- float RMSE value """ if as_median: value: float = np.sqrt(np.nanmedian(data**2).item()) else: value = np.sqrt(np.nanmean(data**2).item()) return value
[docs] def get_grid_region( grid: str | xr.DataArray, ) -> tuple[float, ...]: """ Returns the region of the specified grid. Parameters ---------- grid : str or xarray.DataArray Input grid to get region from. Filename string or loaded grid. Returns ------- tuple array with the region boundary in the format (xmin, xmax, ymin, ymax) """ if isinstance(grid, xr.DataArray) and len(grid.dims) > 2: grid = grid.squeeze() try: region: tuple[float, ...] = tuple( float(pygmt.grdinfo(grid, per_column="n", o=i)[:-1]) for i in range(4) ) except Exception as e: # pylint: disable=broad-exception-caught # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid region can't be extracted") raise return region
[docs] def get_grid_spacing( grid: str | xr.DataArray, ) -> float | None: """ Returns the spacing of the specified grid. Parameters ---------- grid : str or xarray.DataArray Input grid to get spacing from. Filename string or loaded grid. Returns ------- float | None Spacing of the grid or None if it can't be extracted. """ if isinstance(grid, xr.DataArray) and len(grid.dims) > 2: grid = grid.squeeze() try: spacing: float | None = float(pygmt.grdinfo(grid, per_column="n", o=7)[:-1]) except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid spacing can't be extracted") spacing = None return spacing
[docs] def get_grid_registration( grid: str | xr.DataArray, ) -> str | None: """ Returns the registration of the specified grid. Parameters ---------- grid : str or xarray.DataArray Input grid to get registration from. Filename string or loaded grid. Returns ------- str | None "g" for gridline or "p" for pixel registration. """ if isinstance(grid, xr.DataArray) and len(grid.dims) > 2: grid = grid.squeeze() try: reg = grid.gmt.registration # type: ignore[union-attr] registration: str | None = "g" if reg == 0 else "p" except AttributeError: logger.warning( "grid registration not extracted, re-trying with file loaded as xarray grid" ) # grid = xr.load_dataarray(grid) with xr.open_dataarray(grid) as da: try: reg = da.gmt.registration registration = "g" if reg == 0 else "p" except AttributeError: logger.warning("grid registration can't be extracted, setting to 'g'.") registration = "g" except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid registration can't be extracted") registration = None return registration
[docs] def get_grid_info( grid: str | xr.DataArray, print_info: bool = False, ) -> tuple[ float | None, tuple[float, float, float, float] | None, float | None, float | None, str | None, ]: """ Returns information of the specified grid. Parameters ---------- grid : str or xarray.DataArray Input grid to get info from. Filename string or loaded grid. print_info : bool, optional If true, prints out the grid info, by default False Returns ------- tuple (string of grid spacing, array with the region boundary, data min, data max, grid registration) """ # if isinstance(grid, str): # grid = xr.load_dataarray(grid) # try: # grid = xr.load_dataarray(grid).squeeze() # except ValueError: # print("loading grid as dataarray didn't work") # raise # pass # grid = xr.open_rasterio(grid) # grid = rioxarray.open_rasterio(grid) if isinstance(grid, xr.DataArray) and len(grid.dims) > 2: grid = grid.squeeze() try: spacing: float | None = float(pygmt.grdinfo(grid, per_column="n", o=7)[:-1]) except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid spacing can't be extracted") spacing = None try: region: typing.Any = tuple( float(pygmt.grdinfo(grid, per_column="n", o=i)[:-1]) for i in range(4) ) except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid region can't be extracted") region = None try: zmin: float | None = float(pygmt.grdinfo(grid, per_column="n", o=4)[:-1]) except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid zmin can't be extracted") zmin = None try: zmax = float(pygmt.grdinfo(grid, per_column="n", o=5)[:-1]) except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 # pygmt.exceptions.GMTInvalidInput: # pygmt.exceptions.GMTInvalidInput: logger.exception(e) logger.warning("grid zmax can't be extracted") zmax = None try: reg = grid.gmt.registration # type: ignore[union-attr] registration: str | None = "g" if reg == 0 else "p" except AttributeError: logger.warning( "grid registration not extracted, re-trying with file loaded as xarray grid" ) # grid = xr.load_dataarray(grid) with xr.open_dataarray(grid) as da: try: reg = da.gmt.registration registration = "g" if reg == 0 else "p" except AttributeError: logger.warning("grid registration can't be extracted, setting to 'g'.") registration = "g" except Exception as e: # pylint: disable=broad-exception-caught # noqa: BLE001 logger.exception(e) logger.warning("grid registration can't be extracted") registration = None if print_info: info = ( f"grid spacing: {spacing} m\n" f"grid region: {region}\n" f"grid zmin: {zmin}\n" f"grid zmax: {zmax}\n" f"grid registration: {registration}" ) print(info) # noqa: T201 return spacing, region, zmin, zmax, registration
[docs] def dd2dms(dd: float) -> str: """ Convert decimal degrees to minutes, seconds. Modified from https://stackoverflow.com/a/10286690/18686384 Parameters ---------- dd : float input decimal degrees Returns ------- str degrees in the format "DD:MM:SS" """ is_positive = dd >= 0 dd = abs(dd) minutes, seconds = divmod(dd * 3600, 60) degrees, minutes = divmod(minutes, 60) degrees = degrees if is_positive else -degrees return f"{int(degrees)}:{int(minutes)}:{seconds}"
[docs] def region_to_df( region: tuple[float, float, float, float] | pd.DataFrame, coord_names: tuple[str, str] = ("easting", "northing"), reverse: bool = False, ) -> tuple[float, float, float, float] | pd.DataFrame: """ Convert region bounds in format [xmin, xmax, ymin, ymax] to pandas dataframe with coordinates of region corners, or reverse this if `reverse` is True. Parameters ---------- region : tuple[float, float, float, float] | pandas.DataFrame bounding region in format [xmin, xmax, ymin, ymax] or, if `reverse` is True, a DataFrame with coordinate columns with names set by `cood_names` coord_names : tuple[str, str], optional names of input or output coordinate columns, by default ("easting", "northing") reverse : bool, optional If True, convert from df to region tuple, else, convert from region tuple to df, by default False Returns ------- tuple[typing.Any, typing.Any, typing.Any, typing.Any] | pandas.DataFrame Dataframe with easting and northing columns, and a row for each corner of the region, or, if `reverse` is True, an array in the format [xmin, xmax, ymin, ymax]. """ if reverse: return ( region[coord_names[0]][0], # type: ignore[call-overload,index] region[coord_names[0]][1], # type: ignore[call-overload,index] region[coord_names[1]][0], # type: ignore[call-overload,index] region[coord_names[1]][2], # type: ignore[call-overload,index] ) bl = (region[0], region[2]) br = (region[1], region[2]) tl = (region[0], region[3]) tr = (region[1], region[3]) return pd.DataFrame(data=[bl, br, tl, tr], columns=(coord_names[0], coord_names[1]))
[docs] def region_xy_to_ll( region: tuple[typing.Any, typing.Any, typing.Any, typing.Any], hemisphere: str | None = None, epsg: str | None = None, dms: bool = False, as_corners: bool = False, ) -> tuple[typing.Any, typing.Any, typing.Any, typing.Any] | str: """ Convert region in format [xmin, xmax, ymin, ymax] in projected meters to lat / lon Parameters ---------- region : tuple[float, float, float, float] region boundaries in format [xmin, xmax, ymin, ymax] in meters hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None dms: bool if True, will return results as deg:min:sec instead of decimal degrees, by default False as_corners: bool = False, if True, will return the region string in the format (lower left longitude / lower left latitude / upper right longitude / upper right latitude +r) Returns ------- tuple[float, float, float, float] | str region boundaries in format [lon_min, lon_max, lat_min, lat_max] or as GMT region string in the format (lower left longitude / lower left latitude / upper right longitude / upper right latitude +r). """ epsg = default_epsg(epsg, hemisphere) if as_corners: ll_xy = (region[0], region[2]) ur_xy = (region[1], region[3]) ll_lonlat = reproject( ll_xy, input_crs=f"epsg:{epsg}", output_crs="EPSG:4326", ) ur_lonlat = reproject( ur_xy, input_crs=f"epsg:{epsg}", output_crs="EPSG:4326", ) assert not any(np.isinf(ll_lonlat)), ( "Failed to reproject lower lect corner of inset region to lat/lon coordinates. The inset region was likely too large." ) assert not any(np.isinf(ur_lonlat)), ( "Failed to reproject upper right corner of inset region to lat/lon coordinates. The inset region was likely too large." ) if epsg == "3857": # for web mercator, latitudes need to be between -85 and 85 degrees assert abs(ll_lonlat[1]) < 85, ( # type: ignore[misc] "Failed to reproject inset region to lat/lon coordinates. Web Mercator projection only supports latitudes between -85 and 85 degrees. The inset region was likely too large." ) assert abs(ur_lonlat[1]) < 85, ( # type: ignore[misc] "Failed to reproject inset region to lat/lon coordinates. Web Mercator projection only supports latitudes between -85 and 85 degrees. The inset region was likely too large." ) return f"{ll_lonlat[0]}/{ll_lonlat[1]}/{ur_lonlat[0]}/{ur_lonlat[1]}+r" # type: ignore[misc] df = region_to_df(region) df_proj = reproject( df, f"epsg:{epsg}", "epsg:4326", reg=True, input_coord_names=("easting", "northing"), output_coord_names=("lon", "lat"), ) return tuple([dd2dms(x) for x in df_proj] if dms is True else df_proj)
[docs] def region_ll_to_xy( region: tuple[float, float, float, float], hemisphere: str | None = None, epsg: str | None = None, ) -> tuple[float, float, float, float]: """ Convert region in format [lon_min, lon_max, lat_min, lat_max] to projected meters in the north or south polar stereographic projections. Parameters ---------- region : tuple[float, float, float, float] region boundaries in format [xmin, xmax, ymin, ymax] in decimal degrees hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None Returns ------- tuple[float, float, float, float] region boundaries in format [x_min, x_max, y_min, y_max] """ epsg = default_epsg(epsg, hemisphere) df = region_to_df(region, coord_names=("lon", "lat")) return tuple( reproject( df, "epsg:4326", f"epsg:{epsg}", reg=True, input_coord_names=("lon", "lat"), output_coord_names=("easting", "northing"), ) )
[docs] def region_to_bounding_box( region: tuple[typing.Any, typing.Any, typing.Any, typing.Any], ) -> tuple[typing.Any, typing.Any, typing.Any, typing.Any]: """ Convert region in format [xmin, xmax, ymin, ymax] to bounding box format used for icepyx: [ lower left longitude, lower left latitude, upper right longitude, upper right latitude ] Same format as [xmin, ymin, xmax, ymax], used for `bbox` parameter of geopandas.read_file Parameters ---------- region : tuple[typing.Any, typing.Any, typing.Any, typing.Any] region boundaries in format [xmin, xmax, ymin, ymax] in meters or degrees. Returns ------- tuple[typing.Any, typing.Any, typing.Any, typing.Any] region boundaries in bounding box format. """ return (region[0], region[2], region[1], region[3])
[docs] def epsg3031_to_latlon( df: pd.DataFrame | tuple[typing.Any], reg: bool = False, input_coord_names: tuple[str, str] | None = None, output_coord_names: tuple[str, str] = ("lon", "lat"), ) -> pd.DataFrame | tuple[typing.Any]: """ Convert coordinates from EPSG:3031 Antarctic Polar Stereographic in meters to EPSG:4326 WGS84 in decimal degrees. Parameters ---------- df : pandas.DataFrame or tuple[typing.Any] input dataframe with easting and northing columns, or tuple [x,y] reg : bool, optional if true, returns a GMT formatted region string, by default False input_coord_names : tuple | None, optional set names for input coordinate columns, by default ("x", "y") or ("easting", "northing") output_coord_names : tuple | None, optional set names for output coordinate columns, by default ("lon", "lat") Returns ------- pandas.DataFrame or tuple[typing.Any] Updated dataframe with new latitude and longitude columns, numpy.ndarray in format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon] """ return reproject( df, "epsg:3031", "epsg:4326", reg=reg, input_coord_names=input_coord_names, output_coord_names=output_coord_names, )
[docs] def epsg3413_to_latlon( df: pd.DataFrame | tuple[typing.Any], reg: bool = False, input_coord_names: tuple[str, str] | None = None, output_coord_names: tuple[str, str] = ("lon", "lat"), ) -> pd.DataFrame | tuple[typing.Any]: """ Convert coordinates from EPSG:3413 North Polar Stereographic in meters to EPSG:4326 WGS84 in decimal degrees. Parameters ---------- df : pandas.DataFrame or tuple[typing.Any] input dataframe with easting and northing columns, or tuple [x,y] reg : bool, optional if true, returns a GMT formatted region string, by default False input_coord_names : tuple | None, optional set names for input coordinate columns, by default ("x", "y") or ("easting", "northing") output_coord_names : tuple | None, optional set names for output coordinate columns, by default ("lon", "lat") Returns ------- pandas.DataFrame or tuple[typing.Any] Updated dataframe with new latitude and longitude columns, numpy.ndarray in format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon] """ return reproject( df, "epsg:3413", "epsg:4326", reg=reg, input_coord_names=input_coord_names, output_coord_names=output_coord_names, )
[docs] def latlon_to_epsg3031( df: pd.DataFrame | NDArray[typing.Any], reg: bool = False, input_coord_names: tuple[str, str] | None = None, output_coord_names: tuple[str, str] = ("easting", "northing"), ) -> pd.DataFrame | NDArray[typing.Any]: """ Convert coordinates from EPSG:4326 WGS84 in decimal degrees to EPSG:3031 Antarctic Polar Stereographic in meters. Parameters ---------- df : pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any] input dataframe with latitude and longitude columns reg : bool, optional if true, returns a GMT formatted region string, by default False input_coord_names : tuple | None, optional set names for input coordinate columns, by default ("lon", "lat") output_coord_names : tuple | None, optional set names for output coordinate columns, by default ("easting", "northing") Returns ------- pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any] Updated dataframe with new easting and northing columns or numpy.ndarray in format [xmin, xmax, ymin, ymax] """ return reproject( df, "epsg:4326", "epsg:3031", reg=reg, input_coord_names=input_coord_names, output_coord_names=output_coord_names, )
[docs] def latlon_to_epsg3413( df: pd.DataFrame | NDArray[typing.Any], reg: bool = False, input_coord_names: tuple[str, str] | None = None, output_coord_names: tuple[str, str] = ("easting", "northing"), ) -> pd.DataFrame | NDArray[typing.Any]: """ Convert coordinates from EPSG:4326 WGS84 in decimal degrees to EPSG:3413 North Polar Stereographic in meters. Parameters ---------- df : pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any] input dataframe with latitude and longitude columns reg : bool, optional if true, returns a GMT formatted region string, by default False input_coord_names : tuple | None, optional set names for input coordinate columns, by default ("lon", "lat") output_coord_names : tuple | None, optional set names for output coordinate columns, by default ("easting", "northing") Returns ------- pandas.DataFrame or numpy.ndarray[typing.Any, typing.Any] Updated dataframe with new easting and northing columns or numpy.ndarray in format [xmin, xmax, ymin, ymax] """ return reproject( df, "epsg:4326", "epsg:3413", reg=reg, input_coord_names=input_coord_names, output_coord_names=output_coord_names, )
[docs] def reproject( df: pd.DataFrame | tuple[typing.Any], input_crs: str, output_crs: str, input_coord_names: tuple[str, str] | None = None, output_coord_names: tuple[str, str] | None = None, reg: bool = False, ) -> pd.DataFrame | tuple[typing.Any]: """ Convert coordinates from input CRS to output CRS. Coordinates can be supplied as a dataframe with coordinate columns set by input_coord_names, or as a tuple of a list of x coordinates and a list of y coordinates. Parameters ---------- df : pandas.DataFrame or tuple[typing.Any] input dataframe with easting/longitude and northing/latitude columns, or tuple [x,y] input_crs : str input CRS in EPSG format, e.g. "epsg:4326" output_crs : str output CRS in EPSG format, e.g. "epsg:3413" reg : bool, optional if true, returns a GMT formatted region string, by default False input_coord_names : tuple, optional set names for input coordinate columns, by default "x"/"y" or "easting"/"northing" if input_crs is "epsg:3413" or "epsg:3031", or if input_crs is "epsg:4326", "lon"/"lat" output_coord_names : tuple, optional set names for output coordinate columns, by default "x"/"y" if output_crs is "epsg:3413" or "epsg:3031", or if output_crs is "epsg:4326", "lon"/"lat". Returns ------- pandas.DataFrame or tuple[typing.Any] Updated dataframe with new latitude and longitude columns, numpy.ndarray in format [xmin, xmax, ymin, ymax], or tuple in format [lat, lon] """ # make crs lowercase input_crs = input_crs.lower() output_crs = output_crs.lower() transformer = Transformer.from_crs( input_crs, output_crs, always_xy=True, ) if isinstance(df, pd.DataFrame): df = df.copy() # use sensible default coord names if input_crs == "epsg:4326": if input_coord_names is None: input_coord_names = ("lon", "lat") elif input_coord_names is None: # check for coord column names if ("x" in df.columns) and ("y" in df.columns): input_coord_names = ("x", "y") elif ("easting" in df.columns) and ("northing" in df.columns): input_coord_names = ("easting", "northing") if output_crs == "epsg:4326": if output_coord_names is None: output_coord_names = ("lon", "lat") else: if output_coord_names is None: # check for coord column names if ("x" in df.columns) and ("y" in df.columns): output_coord_names = ("x", "y") elif ("easting" in df.columns) and ("northing" in df.columns): output_coord_names = ("easting", "northing") if output_coord_names is None: output_coord_names = ("easting", "northing") ( # pylint: disable=unpacking-non-sequence df[output_coord_names[0]], df[output_coord_names[1]], ) = transformer.transform( df[input_coord_names[0]].tolist(), # type: ignore[index] df[input_coord_names[1]].tolist(), # type: ignore[index] ) if reg is True: df = ( df[output_coord_names[0]].min(), df[output_coord_names[0]].max(), df[output_coord_names[1]].min(), df[output_coord_names[1]].max(), ) else: df = tuple(transformer.transform(df[0], df[1])) # type: ignore[misc] return df
[docs] def epsg_central_coordinates( epsg: str, ) -> tuple[float, float]: """ Returns the central coordinates for the EPSG code in degrees. Parameters ---------- epsg : str EPSG code to use as a string ("3031"). Returns ------- tuple central coordinates (lat, lon) in degrees. """ crs = pyproj.CRS(f"epsg:{epsg}") return ( crs.coordinate_operation.params[0].value, crs.coordinate_operation.params[1].value, )
[docs] def points_inside_region( df: pd.DataFrame, region: tuple[float, float, float, float], names: tuple[str, str] = ("x", "y"), reverse: bool = False, ) -> pd.DataFrame: """ return a subset of a dataframe which is within a region Parameters ---------- df : pandas.DataFrame dataframe with coordinate columns to use for defining if within region region : tuple[float, float, float, float] bounding region in format [xmin, xmax, ymin, ymax] for bounds of new subset dataframe names : tuple[str, str], optional column names to use for x and y coordinates, by default ("x", "y") or ("easting", "northing") reverse : bool, optional if True, will return points outside the region, by default False Returns ------- pandas.DataFrame returns a subset dataframe """ # make a copy of the dataframe df1 = df.copy() # check for coord column names if ("x" in df1.columns) and ("y" in df1.columns): pass elif ("easting" in df1.columns) and ("northing" in df1.columns): names = ("easting", "northing") # make column of booleans for whether row is within the region df1["inside_tmp"] = vd.inside( coordinates=(df1[names[0]], df1[names[1]]), region=region ) if reverse is True: # subset if False df_result = df1.loc[df1.inside_tmp == False].copy() # noqa: E712 # pylint: disable=singleton-comparison else: # subset if True df_result = df1.loc[df1.inside_tmp == True].copy() # noqa: E712 # pylint: disable=singleton-comparison # drop the column 'inside' return df_result.drop(columns="inside_tmp")
[docs] def block_reduce( df: pd.DataFrame, reduction: typing.Callable[..., float | int], input_coord_names: tuple[str, str] = ("x", "y"), input_data_names: typing.Any | None = None, **kwargs: typing.Any, ) -> pd.DataFrame: """ perform a block reduction of a dataframe. Parameters ---------- df : pandas.DataFrame data to block reduce reduction : typing.Callable function to use in reduction, e.g. np.mean input_coord_names : tuple[str, str], optional strings of coordinate column names, by default ("x", "y") or ("easting", "northing") input_data_names : typing.Any | None, optional strings of data column names, by default None Returns ------- pandas.DataFrame a block-reduced dataframe """ # define verde reducer function reducer = vd.BlockReduce(reduction, **kwargs) # check for coord column names if ("x" in df.columns) and ("y" in df.columns): pass elif ("easting" in df.columns) and ("northing" in df.columns): input_coord_names = ("easting", "northing") # if no data names provided, use all columns if input_data_names is None: input_data_names = tuple(df.columns.drop(list(input_coord_names))) # get tuples of pd.Series input_coords = tuple([df[col] for col in input_coord_names]) # pylint: disable=consider-using-generator input_data = tuple([df[col] for col in input_data_names]) # pylint: disable=consider-using-generator # apply reduction coordinates, data = reducer.filter( coordinates=input_coords, data=input_data, ) # add reduced coordinates to a dictionary coord_cols = dict(zip(input_coord_names, coordinates, strict=False)) # add reduced data to a dictionary if len(input_data_names) < 2: data_cols = {input_data_names[0]: data} else: data_cols = dict(zip(input_data_names, data, strict=False)) # merge dicts and create dataframe return pd.DataFrame(data=coord_cols | data_cols)
[docs] def nearest_grid_fill( grid: xr.DataArray, method: str = "verde", crs: str | None = None, ) -> xr.DataArray: """ fill missing values in a grid with the nearest value. Parameters ---------- grid : xarray.DataArray grid with missing values method : str, optional choose method of filling, by default "verde" crs : str | None, optional if method is 'rioxarray', provide the crs of the grid, in format 'epsg:xxxx', by default None Returns ------- xarray.DataArray filled grid """ # get coordinate names original_dims = tuple(grid.sizes.keys()) # get original grid name original_name = grid.name if method == "rioxarray": filled: xr.DataArray = ( grid.rio.write_crs(crs) .rio.set_spatial_dims(original_dims[1], original_dims[0]) .rio.write_nodata(np.nan) .rio.interpolate_na(method="nearest") .rename(original_name) ) elif method == "verde": df = vd.grid_to_table(grid) df_dropped = df[df[grid.name].notna()] coords = (df_dropped[grid.dims[1]], df_dropped[grid.dims[0]]) region = vd.get_region((df[grid.dims[1]], df[grid.dims[0]])) filled = ( vd.KNeighbors() .fit(coords, df_dropped[grid.name]) .grid(region=region, shape=grid.shape, data_names=original_name)[ original_name ] ) # elif method == "pygmt": # filled = pygmt.grdfill(grid, mode="n", verbose="quiet).rename(original_name) else: msg = "method must be 'rioxarray', or 'verde'" raise ValueError(msg) # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") return filled.rename( { next(iter(filled.dims)): original_dims[0], list(filled.dims)[1]: original_dims[1], } )
[docs] def filter_grid( grid: xr.DataArray, filter_width: float | None = None, filter_type: str = "lowpass", filt_type: str = "lowpass", pad_width_factor: int = 3, pad_mode: str = "linear_ramp", pad_constant: float | None = None, pad_end_values: float | None = None, ) -> xr.DataArray: """ Apply a spatial filter to a grid. Parameters ---------- grid : xarray.DataArray grid to filter the values of filter_width : float | None, optional width of the filter in meters for high and low pass filtering, by default None filter_type : str, optional type of filter to use from 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', 'northing_deriv', or 'total_gradient' by default "lowpass" filt_type : str, optional deprecated, use filter_type instead, by default "lowpass" pad_width_factor : int, optional factor of grid width to pad the grid by, by default 3, which equates to a pad with a width of 1/3 of the grid width. pad_mode : str, optional mode of padding, can be "linear", by default "linear_ramp" pad_constant : float | None, optional constant value to use for padding, by default None pad_end_values : float | None, optional value to use for end of padding if pad_mode is "linear_ramp", by default None Returns ------- xarray.DataArray a filtered grid """ if filt_type != "lowpass": warnings.warn( "'filt_type' is deprecated, use 'filter_type' instead", UserWarning, stacklevel=2, ) filter_type = filt_type # get coordinate names original_dims = tuple(grid.sizes.keys()) # get original grid name original_name = grid.name # if there are nan's, fill them with nearest neighbor if grid.isna().any(): filled = nearest_grid_fill(grid, method="verde") else: filled = grid.copy() # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") filled = filled.rename( { next(iter(filled.dims)): original_dims[0], list(filled.dims)[1]: original_dims[1], } ) # define width of padding in each direction pad_width = { original_dims[1]: grid[original_dims[1]].size // pad_width_factor, original_dims[0]: grid[original_dims[0]].size // pad_width_factor, } if pad_mode == "constant": if pad_constant is None: pad_constant = filled.median() pad_end_values = None if (pad_mode == "linear_ramp") and (pad_end_values is None): pad_end_values = filled.median() if pad_mode != "constant": pad_constant = ( None # needed until https://github.com/xgcm/xrft/issues/211 is fixed ) # apply padding pad_kwargs = { **pad_width, "mode": pad_mode, "constant_values": pad_constant, "end_values": pad_end_values, } padded = xrft.pad( filled, **pad_kwargs, ) if filter_type == "lowpass": filt = hm.gaussian_lowpass(padded, wavelength=filter_width).rename("filt") elif filter_type == "highpass": filt = hm.gaussian_highpass(padded, wavelength=filter_width).rename("filt") elif filter_type == "up_deriv": filt = hm.derivative_upward(padded).rename("filt") elif filter_type == "easting_deriv": filt = hm.derivative_easting(padded).rename("filt") elif filter_type == "northing_deriv": filt = hm.derivative_northing(padded).rename("filt") elif filter_type == "total_gradient": filt = hm.total_gradient_amplitude(padded).rename("filt") else: msg = ( "filter_type must be 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', " "'northing_deriv', or 'total_gradient'" ) raise ValueError(msg) unpadded = xrft.unpad(filt, pad_width) # reset coordinate values to original (avoid rounding errors) unpadded = unpadded.assign_coords( { original_dims[0]: grid[original_dims[0]].to_numpy(), original_dims[1]: grid[original_dims[1]].to_numpy(), } ) if grid.isna().any(): result: xr.DataArray = xr.where(grid.notna(), unpadded, grid) else: result = unpadded.copy() # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") result = result.rename( { next(iter(result.dims)): original_dims[0], # list(result.dims)[0]: original_dims[0], list(result.dims)[1]: original_dims[1], } ) return result.rename(original_name)
@deprecation.deprecated( deprecated_in="1.4.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `points_inside_shapefile()` instead. Note the new function returns only the subset of data instead of the dataframe with a new column 'inside'", ) def points_inside_shp( points: pd.DataFrame | gpd.GeoDataFrame, shapefile: gpd.GeoDataFrame, crs: str | None = None, coord_names: tuple[str, str] | None = None, hemisphere: str | None = None, epsg: str | None = None, ) -> pd.DataFrame | gpd.GeoDataFrame: """ Deprecated, use `points_inside_shapefile` instead. Note the new function returns only the subset of data instead of the dataframe with a new column 'inside' """ msg = "`points_inside_shp` is deprecated, use `points_inside_shapefile` instead. Note the new function returns only the subset of data instead of the dataframe with a new column 'inside'" warnings.warn( msg, UserWarning, stacklevel=2, ) df = points_inside_shapefile( points=points, shapefile=shapefile, crs=crs, coord_names=coord_names, hemisphere=hemisphere, epsg=epsg, ) df2 = points.copy() df2["inside"] = False df2.loc[df.index.to_numpy(), "inside"] = True return df2
[docs] def points_inside_shapefile( points: pd.DataFrame | gpd.GeoDataFrame, shapefile: gpd.GeoDataFrame, crs: str | None = None, coord_names: tuple[str, str] | None = None, hemisphere: str | None = None, epsg: str | None = None, ) -> pd.DataFrame | gpd.GeoDataFrame: """ return a subset of a dataframe which is located inside a shapefile. Parameters ---------- points : pandas.DataFrame | geopandas.GeoDataFrame dataframe with coordinate columns specified by coord_names to use for defining if within shapefile shapefile : geopandas.GeoDataFrame shapefile to use for defining if point are within it or not crs : str | None, optional if points is not a geodataframe, crs to use to convert into a geodataframe, by default None coord_names : tuple[str, str] | None, optional names of coordinate columns, by default 'x' and 'y' or 'easting' and 'northing' hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None Returns ------- pandas.DataFrame | geopandas.GeoDataFrame returns a subset dataframe """ points = points.copy() if isinstance(points, pd.DataFrame): if crs is None: epsg = default_epsg(epsg, hemisphere) if coord_names is None: # check for coord column names if ("x" in points.columns) and ("y" in points.columns): coord_names = ("x", "y") elif ("easting" in points.columns) and ("northing" in points.columns): coord_names = ("easting", "northing") points = gpd.GeoDataFrame( points, geometry=gpd.points_from_xy( x=points[coord_names[0]], # type: ignore[index] y=points[coord_names[1]], # type: ignore[index] ), crs=f"epsg:{epsg}", ) points["inside_tmp"] = points.within(shapefile.geometry.iloc[0]) points = points[points.inside_tmp] return points.drop(columns="inside_tmp")
@deprecation.deprecated( deprecated_in="1.4.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `mask_from_shapefile()` instead", ) def mask_from_shp( shapefile: str | gpd.GeoDataFrame, hemisphere: str | None = None, epsg: str | None = None, invert: bool = True, grid: xr.DataArray | str | None = None, xr_grid: xr.DataArray | None = None, grid_file: str | None = None, region: str | tuple[float, float, float, float] | None = None, spacing: float | None = None, masked: bool = False, pixel_register: bool = True, input_coord_names: tuple[str, str] = ("easting", "northing"), ) -> xr.DataArray: """ Deprecated, use `mask_from_shapefile` instead. """ msg = "`mask_from_shp` is deprecated, use `mask_from_shapefile` instead." warnings.warn( msg, UserWarning, stacklevel=2, ) return mask_from_shapefile( shapefile=shapefile, hemisphere=hemisphere, epsg=epsg, invert=invert, grid=grid, xr_grid=xr_grid, grid_file=grid_file, region=region, spacing=spacing, masked=masked, pixel_register=pixel_register, input_coord_names=input_coord_names, )
[docs] def mask_from_shapefile( shapefile: str | gpd.GeoDataFrame, hemisphere: str | None = None, epsg: str | None = None, invert: bool = True, grid: xr.DataArray | str | None = None, xr_grid: xr.DataArray | None = None, grid_file: str | None = None, region: str | tuple[float, float, float, float] | None = None, spacing: float | None = None, masked: bool = False, pixel_register: bool = True, input_coord_names: tuple[str, str] = ("easting", "northing"), ) -> xr.DataArray: """ Create a mask or a masked grid from area inside or outside of a closed shapefile. Parameters ---------- shapefile : str or geopandas.GeoDataFrame either path to .shp filename, must by in same directory as accompanying files : .shx, .prj, .dbf, should be a closed polygon file, or shapefile which as already been loaded into a geodataframe. hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None invert : bool, optional choose whether to mask data outside the shape (False) or inside the shape (True), by default True (masks inside of shape) grid : xarray.DataArray or str, optional _xarray.DataArray or path to a .nc file; to use to define region, or to mask, by default None xr_grid : xarray.DataArray, optional deprecated, use `grid` instead, by default None grid_file : str, optional deprecated, use `grid` instead, by default None region : str or tuple[float, float, float, float], optional bounding region in format [xmin, xmax, ymin, ymax] in meters to create a dummy grid if none are supplied, by default None spacing : str or int, optional grid spacing in meters to create a dummy grid if none are supplied, by default None masked : bool, optional choose whether to return the masked grid (True) or the mask itself (False), by default False pixel_register : bool, optional choose whether the grid is pixel registered (True) or grid registered (False), by default True input_coord_names : tuple[str, str], optional set names for input coordinate columns, by default ("easting", "northing") Returns ------- xarray.DataArray Returns either a masked grid, or the mask grid itself. """ epsg = default_epsg(epsg, hemisphere) crs = f"epsg:{epsg}" shp = ( gpd.read_file(shapefile, engine="pyogrio") if isinstance(shapefile, str) else shapefile ) if xr_grid is not None: grid = xr_grid msg = "`xr_grid` parameter has changed, use 'grid' instead." warnings.warn( msg, UserWarning, stacklevel=2, ) if grid_file is not None: grid = grid_file msg = "`grid_file` parameter has changed, use 'grid' instead." warnings.warn( msg, UserWarning, stacklevel=2, ) if grid is None: coords = vd.grid_coordinates( region=region, spacing=spacing, pixel_register=pixel_register, ) ds = vd.make_xarray_grid( coords, np.ones_like(coords[0]), dims=input_coord_names[::-1], data_names="z", ) xds = ds.z.rio.write_crs(crs) elif isinstance(grid, xr.DataArray): # get coordinate names original_dims = tuple(grid.sizes.keys()) xds = grid.rio.write_crs(crs).rio.set_spatial_dims( original_dims[1], original_dims[0] ) elif isinstance(grid, str): xds = xr.load_dataarray(grid) # get coordinate names original_dims = tuple(xds.sizes.keys()) xds = xds.rio.write_crs(crs).rio.set_spatial_dims( original_dims[1], original_dims[0] ) # if single geometry, convert to list try: iter(shp.geometry) except TypeError: geom = [shp.geometry] else: geom = shp.geometry masked_grd = xds.rio.clip( geom, xds.rio.crs, drop=False, invert=invert, ) mask_grd = np.isfinite(masked_grd) if masked is True: output = masked_grd elif masked is False: output = mask_grd try: output = output.drop_vars("spatial_ref") # pylint: disable=used-before-assignment except ValueError as e: logger.exception(e) return typing.cast("xr.DataArray", output)
@deprecation.deprecated( deprecated_in="0.4.0", removed_in="2.0.0", current_version=polartoolkit.__version__, details="alter_region has been moved to the regions module, use that instead", ) def alter_region( starting_region: tuple[float, float, float, float], **kwargs: typing.Any, ) -> tuple[float, float, float, float]: """deprecated function, use regions.alter_region instead""" return regions.alter_region( starting_region, **kwargs, )
[docs] def set_proj( region: tuple[float, float, float, float], hemisphere: str | None = None, epsg: str | None = None, fig_height: float | None = None, fig_width: float | None = None, ) -> tuple[str, str | None, float, float]: """ Gives GMT projection strings in project and geographic units, from region and figure height or width. Inspired from https://github.com/mrsiegfried/Venturelli2020-GRL. Parameters ---------- region : tuple[float, float, float, float] region boundaries in format [xmin, xmax, ymin, ymax] in projected meters hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None fig_height : float | None desired figure height in cm, by default is None fig_width : float | None instead of using figure height, set the projection based on figure width in cm, by default is None Returns ------- tuple returns a tuple of the following variables: proj, proj_latlon, fig_width, fig_height """ epsg = default_epsg(epsg, hemisphere) if epsg == "3857": msg = ( "EPSG:3857 (Web Mercator) is not recommended, especially for high latitudes" " as it can cause significant distortion. Consider using a different " "projection." ) warnings.warn( msg, UserWarning, stacklevel=2, ) xmin, xmax, ymin, ymax = region if fig_height is None and fig_width is None: msg = "either fig_height or fig_width must be set" raise ValueError(msg) if fig_height is not None and fig_width is not None: msg = "only one of fig_height or fig_width can be set" raise ValueError(msg) # fig_width takes priority over fig_height if both are set. if fig_width is not None: fig_height = fig_width * (ymax - ymin) / (xmax - xmin) ratio = (xmax - xmin) / (fig_width / 100) else: fig_height = typing.cast("float", fig_height) fig_width = fig_height * (xmax - xmin) / (ymax - ymin) ratio = (ymax - ymin) / (fig_height / 100) ratio = f"1:{ratio}" # type: ignore[assignment] # define projection string in projected units (EPSG) # x denotes ratio is in plot-units/data-units proj = f"x{ratio}" # define projection string in lat/lon using the same ratio if epsg == "3413": # s denotes stereographic projection # -45/90 denotes the lon and lat at the center of the projection (north pole) # -45 is central meridian # 90 is the central latitude # 70 is the latitude of true scale # 70/ratio denotes the true scale is located at -71 deg lat proj_latlon = f"s-45/90/70/{ratio}" elif epsg == "3031": # s denotes stereographic projection # 0/-90 denotes the lon and lat at the center of the projection (south pole) # 0 is the central meridian # -90 is the central latitude # -71 is the latitude of true scale # -71/ratio denotes the true scale is located at -71 deg lat proj_latlon = f"s0/-90/-71/{ratio}" else: # just use the EPSG codes directly proj_latlon = f"EPSG:{epsg}/{ratio}" # instead, we can get PROJ code from EPSG and give that to GMT # https://docs.generic-mapping-tools.org/latest/gmt.html#j # get PROJ string of EPSG code # crs = pyproj.CRS(f"EPSG:{epsg}") # proj_str = crs.to_proj4().replace(" ", "") # proj_latlon = f'{proj_str}/{ratio}' return proj, proj_latlon, fig_width, fig_height
@deprecation.deprecated( deprecated_in="1.3.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `grid_trend()` instead", ) def grd_trend( da: xr.DataArray, coords: tuple[str, str, str] = ("x", "y", "z"), deg: int = 1, plot: bool = False, **kwargs: typing.Any, ) -> tuple[xr.DataArray, xr.DataArray]: """ Deprecated, use grid_trend instead. """ msg = "grd_trend is deprecated, use grid_trend instead" warnings.warn( msg, UserWarning, stacklevel=2, ) return grid_trend( da=da, coords=coords, deg=deg, plot=plot, **kwargs, )
[docs] def grid_trend( da: xr.DataArray, coords: tuple[str, str, str] = ("x", "y", "z"), deg: int = 1, plot: bool = False, **kwargs: typing.Any, ) -> tuple[xr.DataArray, xr.DataArray]: """ Fit an arbitrary order trend to a grid and use it to detrend. Parameters ---------- da : xarray.DataArray input grid coords : tuple[str, str, str], optional coordinate names of the supplied grid, by default ['x', 'y', 'z'] deg : int, optional trend order to use, by default 1 plot : bool, optional plot the results, by default False Returns ------- tuple[xarray.DataArray, xarray.DataArray] returns xarray.DataArrays of the fitted surface, and the detrended grid. """ # convert grid to a dataframe df0 = vd.grid_to_table(da).astype("float64") df = df0.dropna().copy() # define a trend trend = vd.Trend(degree=deg).fit((df[coords[0]], df[coords[1]]), df[coords[2]]) # fit a trend to the grid of degree: deg df["fit"] = trend.predict((df[coords[0]], df[coords[1]])) # remove the trend from the data df["detrend"] = df[coords[2]] - df.fit info = get_grid_info(da) spacing = info[0] region = info[1] registration = info[4] fit = pygmt.xyz2grd( data=df[[coords[0], coords[1], "fit"]], region=region, spacing=spacing, registration=registration, ) detrend = pygmt.xyz2grd( data=df[[coords[0], coords[1], "detrend"]], region=region, spacing=spacing, registration=registration, ) if plot is True: cmap: typing.Any = kwargs.get("cmap", "plasma") coast: typing.Any = kwargs.get("coast", True) inset: typing.Any = kwargs.get("inset", True) inset_position: typing.Any = kwargs.get("inset_position", "jTL+jTL+o0/0") inset_pos: typing.Any = kwargs.get("inset_pos") origin_shift: typing.Any = kwargs.get("origin_shift", "y") fit_label: typing.Any = kwargs.get("fit_label", f"fitted trend (order {deg})") input_label: typing.Any = kwargs.get("input_label", "input grid") title: typing.Any = kwargs.get("title", "Detrending a grid") detrended_label: typing.Any = kwargs.get("detrended_label", "detrended") fig = maps.plot_grid( da, cbar_label=input_label, title=title, cmap=cmap, # grd2cpt=True, inset=inset, inset_position=inset_position, inset_pos=inset_pos, coast=coast, hist=True, robust=True, **kwargs, ) fig = maps.plot_grid( fit, fig=fig, cmap=cmap, # grd2cpt=True, cbar_label=fit_label, origin_shift=origin_shift, coast=coast, hist=True, robust=True, **kwargs, ) fig = maps.plot_grid( detrend, fig=fig, cmap=cmap, # grd2cpt=True, cbar_label=detrended_label, origin_shift=origin_shift, coast=coast, hist=True, robust=True, **kwargs, ) fig.show() return fit, detrend
[docs] def get_combined_min_max( values: tuple[typing.Any, ...], shapefile: str | gpd.GeoDataFrame | None = None, robust: bool = False, region: tuple[float, float, float, float] | None = None, hemisphere: str | None = None, epsg: str | None = None, absolute: bool = False, robust_percentiles: tuple[float, float] = (0.02, 0.98), ) -> tuple[float, float]: """ Get a grids max and min values. Parameters ---------- values : tuple[xarray.DataArray | pandas.Series | numpy.ndarray] values to get min and max for shapefile : Union[str or geopandas.GeoDataFrame], optional path or loaded shapefile to use for a mask, by default None robust: bool, optional choose whether to return the 2nd and 98th percentile values, instead of the min/max region : tuple[float, float, float, float], optional give a subset region to get min and max values from, in format [xmin, xmax, ymin, ymax], by default None hemisphere : str, optional, if using a shapefile to subset the data, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional if using a shapefile to subset the data, set projection from EPSG code string ("3031"), by default None absolute : bool, optional choose whether to return the absolute min and max values, by default False robust_percentiles : tuple[float, float], optional percentiles to use for robust min and max values, by default (0.02, 0.98) Returns ------- tuple[float, float] returns the min and max values. """ try: epsg = default_epsg(epsg, hemisphere) except KeyError: epsg = None # get min max of each grid limits = [ get_min_max( v, robust=robust, region=region, shapefile=shapefile, epsg=epsg, absolute=absolute, robust_percentiles=robust_percentiles, ) for v in values ] # get min of all mins and max of all maxes ar = np.array(limits) return np.min(ar[:, 0]), np.max(ar[:, 1])
@deprecation.deprecated( deprecated_in="1.3.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `grid_compare()` instead", ) def grd_compare( da1: xr.DataArray | str, da2: xr.DataArray | str, plot: bool = True, plot_type: typing.Any | None = None, robust: bool = False, **kwargs: typing.Any, ) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]: """ Deprecated, use grid_compare instead. """ msg = "grd_compare is deprecated, use grid_compare instead" warnings.warn( msg, UserWarning, stacklevel=2, ) return grid_compare( da1=da1, da2=da2, plot=plot, plot_type=plot_type, robust=robust, **kwargs, )
[docs] def grid_compare( da1: xr.DataArray | str, da2: xr.DataArray | str, plot: bool = True, plot_type: typing.Any | None = None, robust: bool = False, **kwargs: typing.Any, ) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]: """ Find the difference between 2 grids and plot the results, if necessary resample and cut grids to match Parameters ---------- da1 : xarray.DataArray or str first grid, loaded grid of filename da2 : xarray.DataArray or str second grid, loaded grid of filename plot : bool, optional plot the results, by default True plot_type : typing.Any or None, optional this argument has been deprecated and will default to 'pygmt' robust : bool, optional use xarray robust color lims instead of min and max, by default is False. Keyword Args ------------ shp_mask : str deprecated, use `shapefile` instead, shapefile : str or geopandas.GeoDataFrame shapefile or filename to use to mask the grids for setting the color range. robust : bool use xarray robust color lims instead of min and max, by default is False. region : tuple[float, float, float, float] choose a specific region to compare, in format [xmin, xmax, ymin, ymax]. rmse_in_title: bool add the RMSE to the title, by default is True. cpt_lims : tuple[float, float] set the colorbar limits for the two grids. diff_lims : tuple[float, float] set the colorbar limits for the difference grid. Returns ------- tuple[xarray.DataArray, xarray.DataArray, xarray.DataArray] three xarray.DataArrays: (diff, resampled grid1, resampled grid2) """ kwargs = copy.deepcopy(kwargs) if plot_type is not None: warnings.warn( "plot_type has been deprecated and will default to 'pygmt'", UserWarning, stacklevel=2, ) shp_mask = kwargs.pop("shp_mask", None) shapefile = kwargs.pop("shapefile", None) if shp_mask is not None: msg = "'shp_mask' kwarg is deprecated, use 'shapefile' kwarg instead" warnings.warn(msg, UserWarning, stacklevel=2) shapefile = shp_mask region = kwargs.pop("region", None) verbose = kwargs.pop("verbose", "error") if isinstance(da1, str): da1 = xr.load_dataarray(da1) if isinstance(da2, str): da2 = xr.load_dataarray(da2) # get coordinate names da1_original_dims = tuple(da1.sizes.keys()) da2_original_dims = tuple(da2.sizes.keys()) if region is not None: # cut grids to supplied region da1 = subset_grid( da1, region=region, ) da2 = subset_grid( da2, region=region, ) # extract grid info of both grids da1_info = get_grid_info(da1) da2_info = get_grid_info(da2) # extract spacing of both grids assert da1_info[0] is not None assert da2_info[0] is not None da1_spacing: float = da1_info[0] da2_spacing: float = da2_info[0] # extract regions of both grids da1_reg = da1_info[1] da1_reg = typing.cast("tuple[float, float, float, float]", da1_reg) da2_reg = da2_info[1] da2_reg = typing.cast("tuple[float, float, float, float]", da2_reg) # extract registrations of both grids da1_registration = da1_info[-1] da2_registration = da2_info[-1] logger.debug("grid 1 spacing: %s, grid 2 spacing: %s", da1_spacing, da2_spacing) logger.debug("grid 1 region: %s, grid 2 region: %s", da1_reg, da2_reg) logger.debug( "grid 1 registration: %s, grid 2 registration: %s", da1_registration, da2_registration, ) # if spacing, region and registration match, no resampling if ( (da1_spacing == da2_spacing) and (da1_reg == da2_reg) and (da1_registration == da2_registration) ): grid1 = da1 grid2 = da2 else: # get minimum grid spacing of both grids if da1_spacing != da2_spacing: spacing = min(da1_spacing, da2_spacing) logger.info( "grid spacings don't match, using smaller spacing (%s m).", spacing, ) else: spacing = da1_spacing # get inside region of both grids if da1_reg != da2_reg: xmin = max(da1_reg[0], da2_reg[0]) xmax = min(da1_reg[1], da2_reg[1]) ymin = max(da1_reg[2], da2_reg[2]) ymax = min(da1_reg[3], da2_reg[3]) region = (xmin, xmax, ymin, ymax) logger.info("grid regions dont match, using inner region %s", region) else: region = da1_reg # get registration, if different use gridline registration = "g" if da1_registration != da2_registration else da1_registration logger.info( "resampling both grids with a spacing of %s, region of %s" "and a registration of %s", spacing, region, registration, ) # resample grids grid1 = pygmt.grdsample( da1, spacing=spacing, region=region, registration=registration, verbose=verbose, ) grid2 = pygmt.grdsample( da2, spacing=spacing, region=region, registration=registration, verbose=verbose, ) reg = grid1.gmt.registration grid_registration: str = "g" if reg == 0 else "p" if grid_registration != registration: msg = "registration hasn't been correctly changed" warnings.warn( msg, UserWarning, stacklevel=2, ) grid1 = change_registration(grid1) reg = grid2.gmt.registration grid_registration = "g" if reg == 0 else "p" if grid_registration != registration: msg = "registration hasn't been correctly changed" warnings.warn( msg, UserWarning, stacklevel=2, ) grid2 = change_registration(grid2) grid1 = typing.cast("xr.DataArray", grid1) grid2 = typing.cast("xr.DataArray", grid2) dif = grid1 - grid2 if plot is True: cpt_lims = kwargs.pop("cpt_lims", None) # define color limits for the 2 input grids if cpt_lims is not None: vmin, vmax = cpt_lims else: vmin, vmax = get_combined_min_max( (grid1, grid2), shapefile=shapefile, robust=robust, region=region, absolute=kwargs.get("maxabs", False), robust_percentiles=kwargs.get("robust_percentiles", (0.02, 0.98)), ) # define color limits for the difference grid diff_lims = kwargs.pop("diff_lims", None) if diff_lims is None: diff_lims = get_min_max( dif, shapefile=shapefile, robust=robust, region=region, absolute=kwargs.get("diff_maxabs", True), robust_percentiles=kwargs.get("robust_percentiles", (0.02, 0.98)), ) title = kwargs.pop("title", "Comparing Grids") if kwargs.get("rmse_in_title", True) is True: title += f", RMSE: {round(rmse(dif), kwargs.get('RMSE_decimals', 2))}" coast = kwargs.pop("coast", False) origin_shift = kwargs.get("origin_shift", "x") cmap = kwargs.get("cmap", "viridis") subplot_labels = kwargs.get("subplot_labels", False) inset = kwargs.pop("inset", True) new_kwargs = { key: value for key, value in kwargs.items() if key not in [ "cmap", "shapefile", ] } diff_kwargs = { key: value for key, value in new_kwargs.items() if key not in ["reverse_cpt", "cbar_label", "shapefile"] } fig = maps.plot_grid( grid1, cmap=cmap, region=region, coast=coast, title=kwargs.get("grid1_name", "grid 1"), cpt_lims=(vmin, vmax), **new_kwargs, ) if subplot_labels is True: fig.text( position="TL", justify="BL", text="a)", font=kwargs.get("label_font", "18p,Helvetica,black"), offset=kwargs.get("label_offset", "j0/.3"), no_clip=True, ) fig = maps.plot_grid( dif, cmap=kwargs.get("diff_cmap", "balance+h0"), region=region, coast=coast, origin_shift=origin_shift, cbar_label="difference", cpt_lims=diff_lims, fig=fig, title=title, inset=inset, inset_position=kwargs.get("inset_position", "jTL+jTL+o0/0"), inset_pos=kwargs.get("inset_pos"), **diff_kwargs, ) if subplot_labels is True: fig.text( position="TL", justify="BL", text="b)", font=kwargs.get("label_font", "20p,Helvetica,black"), offset=kwargs.get("label_offset", "j0/.3"), no_clip=True, ) fig = maps.plot_grid( grid2, cmap=cmap, region=region, coast=coast, origin_shift=origin_shift, fig=fig, title=kwargs.get("grid2_name", "grid 2"), cpt_lims=(vmin, vmax), **new_kwargs, ) if subplot_labels is True: fig.text( position="TL", justify="BL", text="c)", font=kwargs.get("label_font", "20p,Helvetica,black"), offset=kwargs.get("label_offset", "j0/.3"), no_clip=True, ) fig.show() grid1 = grid1.rename( { next(iter(grid1.dims)): da1_original_dims[0], list(grid1.dims)[1]: da1_original_dims[1], } ) grid2 = grid2.rename( { next(iter(grid2.dims)): da2_original_dims[0], list(grid2.dims)[1]: da2_original_dims[1], } ) return (dif, grid1, grid2)
[docs] def make_grid( region: tuple[float, float, float, float], spacing: float, value: float, name: str, ) -> xr.DataArray: """ Create a grid with 1 variable by defining a region, spacing, name and constant value Parameters ---------- region : tuple[float, float, float, float] bounding region in format [xmin, xmax, ymin, ymax] spacing : float spacing for grid value : float constant value to use for variable name : str name for variable Returns ------- xarray.DataArray Returns a xarray.DataArray with 1 variable of constant value. """ coords = vd.grid_coordinates(region=region, spacing=spacing, pixel_register=True) data = np.ones_like(coords[0]) * value return typing.cast( "xr.DataArray", vd.make_xarray_grid(coords, data, dims=["y", "x"], data_names=name), )
[docs] def square_subplots(n: int) -> tuple[int, int]: """ From https://github.com/matplotlib/grid-strategy/blob/master/src/grid_strategy/strategies.py Calculate the number of rows and columns based on the total number of items (n) to make an arrangement as close to square as looks good. Parameters ---------- n : int The number of total plots in the subplot Returns ------- tuple[int, int] Returns a tuple in the format (number of rows, number of columns), so for example a 3 x 2 grid would be represented as ``(3, 3)``, because there are 2 rows of length 3. """ special_cases = { 1: (1, 1), 2: (1, 2), 3: (1, 3), 4: (2, 2), 5: (2, 3), 6: (2, 3), 7: (2, 4), 8: (2, 4), 9: (3, 3), 10: (3, 4), 11: (3, 4), 12: (3, 4), } if n in special_cases: return special_cases[n] # May not work for very large n n_sqrtf = np.sqrt(n) n_sqrt = int(np.ceil(n_sqrtf)) if n_sqrtf == n_sqrt: # Perfect square, we're done x, y = n_sqrt, n_sqrt elif n <= n_sqrt * (n_sqrt - 1): # An n_sqrt x n_sqrt - 1 grid is close enough to look pretty # square, so if n is less than that value, will use that rather # than jumping all the way to a square grid. x, y = n_sqrt, n_sqrt - 1 elif not (n_sqrt % 2) and n % 2: # If the square root is even and the number of axes is odd, in # order to keep the arrangement horizontally symmetrical, using a # grid of size (n_sqrt + 1 x n_sqrt - 1) looks best and guarantees # symmetry. x, y = (n_sqrt + 1, n_sqrt - 1) else: # It's not a perfect square, but a square grid is best x, y = n_sqrt, n_sqrt if n == x * y: # There are no deficient rows, so we can just return from here return x, y # tuple(x for i in range(y)) # If exactly one of these is odd, make it the rows if (x % 2) != (y % 2) and (x % 2): x, y = y, x return x, y
[docs] def random_color() -> str: """ generate a random color in format R/G/B Returns ------- str returns a random color string, for example '95/226/100' """ rng = np.random.default_rng() numbers = rng.integers(low=0, high=256, size=3) return f"{numbers[0]}/{numbers[1]}/{numbers[2]}"
[docs] def subset_grid( grid: xr.DataArray, region: tuple[float, float, float, float], ) -> xr.DataArray: """ Return a subset of a grid based on a region Parameters ---------- grid : xarray.DataArray grid to be clipped region : tuple[float, float, float, float] region to clip to, in format [xmin, xmax, ymin, ymax] Returns ------- xarray.DataArray clipped grid """ try: da = pygmt.grdcut( grid, region=region, verbose="quiet", ) # get coordinate names original_dims = tuple(grid.sizes.keys()) return da.rename( { next(iter(da.dims)): original_dims[0], list(da.dims)[1]: original_dims[1], } ) except IndexError: ew = [region[0], region[1]] ns = [region[2], region[3]] return grid.sel( { list(grid.sizes.keys())[1]: slice(min(ew), max(ew)), list(grid.sizes.keys())[0]: slice(min(ns), max(ns)), # noqa: RUF015 } )
[docs] def get_min_max( values: xr.DataArray | pd.Series | NDArray, shapefile: str | gpd.GeoDataFrame | None = None, robust: bool = False, region: tuple[float, float, float, float] | None = None, hemisphere: str | None = None, epsg: str | None = None, absolute: bool = False, robust_percentiles: tuple[float, float] = (0.02, 0.98), ) -> tuple[float, float]: """ Get a grids max and min values. Parameters ---------- values : xarray.DataArray or pandas.Series or numpy.ndarray values to find min or max for shapefile : Union[str or geopandas.GeoDataFrame], optional path or loaded shapefile to use for a mask, by default None robust: bool, optional choose whether to return the 2nd and 98th percentile values, instead of the min/max region : tuple[float, float, float, float], optional give a subset region to get min and max values from, in format [xmin, xmax, ymin, ymax], by default None hemisphere : str, optional, if using a shapefile to subset the data, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional if using a shapefile to subset the data, set projection from EPSG code string ("3031"), by default None absolute : bool, optional return the absolute min and max values, by default False robust_percentiles : tuple[float, float], optional decimal percentiles to use for robust min and max, by default (0.02, 0.98) Returns ------- tuple[float, float] returns the min and max values. """ try: epsg = default_epsg(epsg, hemisphere) except KeyError: epsg = None if region is not None: values = subset_grid(values, region) if shapefile is None: if robust: v_min, v_max = np.nanquantile(values, robust_percentiles) else: v_min, v_max = np.nanmin(values), np.nanmax(values) elif shapefile is not None: if isinstance(values, xr.DataArray): masked = mask_from_shapefile( shapefile, epsg=epsg, grid=values, masked=True, invert=False, ) else: msg = "values must be an xarray.DataArray to use shapefile masking" raise ValueError(msg) if robust is True: v_min, v_max = np.nanquantile(masked, robust_percentiles) elif robust is False: v_min, v_max = np.nanmin(masked), np.nanmax(masked) if absolute is True: v_min, v_max = -vd.maxabs([v_min, v_max]), vd.maxabs([v_min, v_max]) # pylint: disable=used-before-assignment assert v_min <= v_max, "min value should be less than or equal to max value" # pylint: disable=possibly-used-before-assignment return (v_min, v_max)
[docs] def shapes_to_df( shapes: list[float], hemisphere: str | None = None, epsg: str | None = None, ) -> pd.DataFrame: """ convert the output of `ptk.draw_region` and `ptk.draw_lines` to a dataframe of easting and northing points Parameters ---------- shapes : list list of vertices hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None Returns ------- pandas.DataFrame Dataframe with easting, northing, and shape_num. """ epsg = default_epsg(epsg, hemisphere) df = pd.DataFrame() for i, j in enumerate(shapes): lon = [coord[0] for coord in j] # type: ignore[attr-defined] lat = [coord[1] for coord in j] # type: ignore[attr-defined] shape = pd.DataFrame({"lon": lon, "lat": lat, "shape_num": i}) df = pd.concat((df, shape)) return reproject( df, "epsg:4326", f"epsg:{epsg}", reg=False, input_coord_names=("lon", "lat"), output_coord_names=("easting", "northing"), )
[docs] def polygon_to_region( polygon: list[float], hemisphere: str | None = None, epsg: str | None = None, ) -> tuple[float, float, float, float]: """ convert the output of `ptk.draw_region` to bounding region in the supplied projected units. Parameters ---------- polyon : list list of polygon vertices hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None Returns ------- tuple[float, float, float, float] region in format in format [xmin, xmax, ymin, ymax] """ epsg = default_epsg(epsg, hemisphere) df = shapes_to_df(shapes=polygon, epsg=epsg) if df.shape_num.max() > 0: logger.info( "supplied dataframe has multiple polygons, only using the first one." ) df = df[df.shape_num == 0] reg: tuple[float, float, float, float] = vd.get_region((df.easting, df.northing)) return reg
[docs] def polygon_to_shapefile( polygon: list[float], hemisphere: str | None = None, epsg: str | None = None, ) -> gpd.GeoDataFrame: """ Convert a polygon drawn with `ptk.draw_region` to a geopandas GeoDataFrame Parameters ---------- polygon : list list of polygon vertices hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None Returns ------- geopandas.GeoDataFrame returns a geopandas GeoDataFrame of the polygon """ epsg = default_epsg(epsg, hemisphere) # convert drawn polygon into dataframe df = shapes_to_df(polygon, epsg=epsg) # remove additional polygons if df.shape_num.max() > 0: logger.info( "supplied dataframe has multiple polygons, only using the first one." ) df = df[df.shape_num == 0] coords = zip(df.easting, df.northing, strict=True) poly = shapely.geometry.Polygon(coords) return gpd.GeoDataFrame(index=[0], crs=f"EPSG:{epsg}", geometry=[poly])
[docs] def mask_from_polygon( polygon: list[float], hemisphere: str | None = None, epsg: str | None = None, invert: bool = False, drop_nans: bool = False, grid: str | xr.DataArray | None = None, region: tuple[float, float, float, float] | None = None, spacing: int | None = None, **kwargs: typing.Any, ) -> xr.DataArray: """ convert the output of `ptk.draw_region` to a mask or use it to mask a grid Parameters ---------- polygon : list list of polygon vertices hemisphere : str, optional, set projection based on "north" or "south" hemispheres, by default None epsg : str | None, optional set projection from EPSG code string ("3031"), by default None invert : bool, optional reverse the sense of masking, by default False drop_nans : bool, optional drop nans after masking, by default False grid : Union[str, xarray.DataArray], optional grid to mask, by default None region : tuple[float, float, float, float], optional region to create a grid if none is supplied, in format [xmin, xmax, ymin, ymax], by default None spacing : int, optional spacing to create a grid if none is supplied, by default None Returns ------- xarray.DataArray masked grid or mask grid with 1's inside the mask. """ epsg = default_epsg(epsg, hemisphere) # convert drawn polygon into dataframe df = shapes_to_df(polygon, epsg=epsg) data_coords = (df.easting, df.northing) # remove additional polygons if df.shape_num.max() > 0: logger.info( "supplied dataframe has multiple polygons, only using the first one." ) df = df[df.shape_num == 0] # if grid given as filename, load it if isinstance(grid, str): grid = xr.load_dataarray(grid) grid = typing.cast("xr.DataArray", grid) ds = grid.to_dataset(name="z") # type: ignore[union-attr] elif isinstance(grid, xr.DataArray): ds = grid.to_dataset() # if no grid given, make a dummy one with supplied region and spacing elif grid is None: if region is None: msg = "region must be supplied if grid is None" raise ValueError(msg) if spacing is None: msg = "spacing must be supplied if grid is None" raise ValueError(msg) coords = vd.grid_coordinates( region=region, spacing=spacing, pixel_register=kwargs.get("pixel_register", False), ) ds = vd.make_xarray_grid( coords, np.ones_like(coords[0]), dims=("y", "x"), data_names="z" ) else: msg = "grid must be a xarray.DataArray, a filename, or None" raise ValueError(msg) masked = vd.convexhull_mask( data_coords, grid=ds, ).z # reverse the mask if invert is True: inverse = masked.isnull() # noqa: PD003 inverse = inverse.where(inverse != 0) masked = inverse * ds.z # drop nans if drop_nans is True: masked = masked.where(masked.notna() == 1, drop=True) return typing.cast("xr.DataArray", masked)
@deprecation.deprecated( deprecated_in="1.3.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `change_registration` instead", ) def change_reg(grid: xr.DataArray) -> xr.DataArray: """Deprecated, use change_registration instead.""" msg = "change_reg is deprecated, use change_registration instead" warnings.warn( msg, UserWarning, stacklevel=2, ) return change_registration(grid)
[docs] def change_registration(grid: xr.DataArray) -> xr.DataArray: """ Use GMT grdedit to change the registration type in the metadata. Parameters ---------- grid : xarray.DataArray input grid to change the reg for. Returns ------- xarray.DataArray returns a xarray.DataArray with switched reg type. """ with pygmt.clib.Session() as ses: # noqa: SIM117 # store the input grid in a virtual file so GMT can read it from a dataarray # and write the output to a virtual file after changing the registration with ( ses.virtualfile_in(data=grid) as vingrd, ses.virtualfile_out(kind="grid") as voutgrd, ): args = f"{vingrd} -T -G{voutgrd}" ses.call_module("grdedit", args) f_out: xr.DataArray = ses.virtualfile_to_raster(vfname=voutgrd, kind="grid") # check if the registration has been changed if grid.gmt.registration == f_out.gmt.registration: msg = "issue in changing registration" raise ValueError(msg) return f_out
@deprecation.deprecated( deprecated_in="1.3.1", removed_in="2.0.0", current_version=polartoolkit.__version__, details="Use the new function `grid_blend` instead", ) def grd_blend( grid1: xr.DataArray, grid2: xr.DataArray, ) -> xr.DataArray: """Deprecated, use `grid_blend` instead.""" msg = "grd_blend is deprecated, use grid_blend instead" warnings.warn( msg, UserWarning, stacklevel=2, ) return grid_blend(grid1, grid2)
[docs] def grid_blend( grid1: xr.DataArray, grid2: xr.DataArray, ) -> xr.DataArray: """ Use GMT grdblend to blend 2 grids into 1. Parameters ---------- grid1 : xarray.DataArray input grid to change the reg for. grid2 : xarray.DataArray input grid to change the reg for. Returns ------- xarray.DataArray returns a blended grid. """ with pygmt.clib.Session() as ses: # noqa: SIM117 # store the input grid in a virtual file so GMT can read it from a dataarray # and write the output to a virtual file after changing the registration with ( ses.virtualfile_in(data=grid1) as vingrd1, ses.virtualfile_in(data=grid2) as vingrd2, ses.virtualfile_out(kind="grid") as voutgrd, ): args = f"{vingrd1} {vingrd2} -Cf -G{voutgrd}" ses.call_module("grdblend", args=args) f_out: xr.DataArray = ses.virtualfile_to_raster(vfname=voutgrd, kind="grid") return f_out
[docs] def get_fig_width() -> float: """ Get the width of the current PyGMT figure instance. Returns ------- float width of the figure """ with pygmt.clib.Session() as session: # noqa: SIM117 with pygmt.helpers.GMTTempFile() as tmpfile: session.call_module("mapproject", f"-Ww ->{tmpfile.name}") map_width = tmpfile.read().strip() return float(map_width)
[docs] def get_fig_height() -> float: """ Get the height of the current PyGMT figure instance. Returns ------- float height of the figure """ with pygmt.clib.Session() as session: # noqa: SIM117 with pygmt.helpers.GMTTempFile() as tmpfile: session.call_module("mapproject", f"-Wh ->{tmpfile.name}") map_height = tmpfile.read().strip() return float(map_height)
[docs] def gmt_str_to_list(region: tuple[float, float, float, float]) -> str: """ convert a tuple of floats representing the boundaries of a region into a GMT-style region string Parameters ---------- region : tuple[float, float, float, float] bounding region in format [xmin, xmax, ymin, ymax] Returns ------- str a GMT style region string """ return "".join([str(x) + "/" for x in region])[:-1]
[docs] def gmt_projection_from_epsg(epsg: str) -> None: """ Get a GMT projection string from an EPSG code. This just prints it out to stdout, but doesn't capture the actual value. Parameters ---------- epsg : str EPSG code to get the projection string for, for example "3031" """ data = [0, 0] with pygmt.clib.Session() as session: # noqa: SIM117 with session.virtualfile_in(data=data) as vin: session.call_module( "mapproject", f"-J{epsg} -I -V {vin}", )
[docs] def square_around_region( region: tuple[float, float, float, float], factor: float = 1, ) -> tuple[float, float, float, float]: """ Get a square region around a supplied region, by expanding the smaller dimension to match the larger one and optionally scaling it by a multiple of the larger dimension. For example, a 50 x 100 km region would expanded to be 100 x 100 km, and with a factor of 5 would be expanded to a 500 x 500 km region, both centered on the same point as the original region. Parameters ---------- region : tuple[float, float, float, float] bounding region in format [xmin, xmax, ymin, ymax] factor : float, optional factor to expand the encompassing square region by, by default 1 Returns ------- tuple[float, float, float, float] square bounding region in format [xmin, xmax, ymin, ymax] """ x_diff = region[1] - region[0] y_diff = region[3] - region[2] assert x_diff > 0, "xmax should be greater than xmin" assert y_diff > 0, "ymax should be greater than ymin" x_center = np.mean([region[0], region[1]]) y_center = np.mean([region[2], region[3]]) width = max(x_diff, y_diff) * factor assert width > 0, "width should be greater than 0" new_region = ( x_center - width / 2, x_center + width / 2, y_center - width / 2, y_center + width / 2, ) assert new_region[1] - new_region[0] == new_region[3] - new_region[2], ( "new region should be square" ) return new_region