"""
Utilities to manipulate and analyze raster data.
The raster module provides accessor utilities for xarray
data arrays and datasets accessible using the `oap` accessor.
These functions are available just by importing directly
the library using ``import ochanticipy``.
Since rioxarray already extends xarray, this
module's extensions inherit from the RasterArray and
RasterDataset extensions respectively. This ensures
cleaner code in the module as ``rio`` methods are
available immediately, but also means a couple of
design decisions are followed.
The ``xarray.DataArray`` and ``xarray.Dataset``
extensions here inherit from rioxarray base classes.
Thus, methods that are identical for both objects
are defined in a mixin class ``OapRasterMixin`` which
can be inherited by the two respective extensions.
"""
import logging
from typing import Any, Dict, List, Optional, Union
import geopandas as gpd
import numpy as np
import pandas as pd
import rioxarray # noqa: F401
import xarray as xr
from rioxarray.exceptions import DimensionError, MissingCRS, NoDataInBounds
from rioxarray.raster_array import RasterArray
from rioxarray.raster_dataset import RasterDataset
from rioxarray.rioxarray import CRS, _get_data_var_message
logger = logging.getLogger(__name__)
[docs]
class OapRasterMixin:
"""OCHA AnticiPy mixin base class."""
# setting attributes to avoid mypy error, from SO
# https://stackoverflow.com/questions/53120262/mypy-how-to-
# ignore-missing-attribute-errors-in-mixins/53228204#53228204
x_dim: str
y_dim: str
_obj: Union[xr.DataArray, xr.Dataset]
_height: int
_width: int
_crs: CRS
def __init__(self, xarray_obj):
super().__init__(xarray_obj)
# Adding lat/lon to set of default spatial dims
if "lat" in self._obj.dims and "lon" in self._obj.dims:
self._x_dim = self._obj.rio._x_dim = "lon"
self._y_dim = self._obj.rio._y_dim = "lat"
# Adding Y/X to set of default spatial dims
if "Y" in self._obj.dims and "X" in self._obj.dims:
self._x_dim = self._obj.rio._x_dim = "X"
self._y_dim = self._obj.rio._y_dim = "Y"
# Managing time coordinate default dims
self._t_dim = None
for t in ["t", "T", "time"]:
if t in self._obj.dims:
self._t_dim = t
# longitude range indicates if coordinates are
# between -180 and 180 or 0 and 360
self._longitude_range = None
# methods derived from rioxarray.rioxarray.x_dim and y_dim
@property
def t_dim(self):
"""str: The dimension for time."""
if self._t_dim is None:
raise DimensionError(
"Time dimension not found. 'oap.set_time_dim()' or "
"using 'rename()' to change the dimension name to "
f"'t' can address this.{_get_data_var_message(self._obj)}"
)
return self._t_dim
@property
def longitude_range(self):
"""str: The longitude range.
The longitude range indicates if coordinates are
between -180 and 180 (indicated by '180')
or 0 and 360 (indicated by '360').
"""
if self._longitude_range is None:
if self._x_dim is not None:
data_obj = self._get_obj_oap(inplace=False)
lon_max = data_obj.indexes[self.x_dim].max()
if lon_max > 180:
self._longitude_range = "360"
else:
self._longitude_range = "180"
else:
raise DimensionError(
"Longitude range not set due to missing 'x_dim'."
"Use 'oap.set_spatial_dims()' to set the name of "
f"the x dimension. {_get_data_var_message(self._obj)}"
)
return self._longitude_range
[docs]
def set_time_dim(
self, t_dim: str, inplace: bool = False
) -> Optional[Union[xr.DataArray, xr.Dataset]]:
"""Set the time dimension of the dataset.
Parameters
----------
t_dim: str
The name of the time dimension.
inplace: bool, optional
If True, it will modify the dataarray in place.
Otherwise it will return a modified copy.
Returns
-------
Union[xarray.DataArray, xarray.Dataset]
Data array or dataset with time dimension.
Examples
--------
>>> import xarray
>>> import numpy
>>> da = xarray.DataArray(
... numpy.arange(64).reshape(4,4,4),
... coords={"lat":numpy.array([87, 88, 89, 90]),
... "lon":numpy.array([5, 120, 199, 360]),
... "F":numpy.array([10,11,12,13])}
... )
>>> da.oap.set_time_dim(t_dim="F", inplace=True)
>>> da.oap.t_dim
'F'
"""
data_obj = self._get_obj_oap(inplace=inplace)
if t_dim not in data_obj.dims:
raise DimensionError(
"Time dimension ({t_dim}) not found."
f"{_get_data_var_message(data_obj)}"
)
data_obj.oap._t_dim = t_dim
return data_obj if not inplace else None
[docs]
def correct_calendar(
self, inplace: bool = False
) -> Optional[Union[xr.DataArray, xr.Dataset]]:
"""Correct calendar attribute for recognition by xarray.
Some datasets come with a wrong calendar attribute that isn't
recognized by xarray. This function corrects the coordinate
attribute to ensure that a ``calendar`` attribute exists
and specifies a calendar alias that is supportable by
``xarray.cftime_range`` and NetCDF in general.
Currently ensures that calendar attributes that are either
specified with ``units="months since"`` or ``calendar="360"``
explicitly have ``calendar="360_day"``. This is based on
discussions in `this GitHub issue
<https://github.com/Unidata/netcdf4-python/issues/434>`_.
If and when further issues are found with calendar
attributes, support for conversion will be added here.
Parameters
----------
inplace: bool, optional
If True, it will modify the dataarray in place.
Otherwise it will return a modified copy.
Returns
-------
Union[xarray.DataArray, xarray.Dataset]
Data array or dataset with transformed calendar
coordinate.
Examples
--------
>>> import xarray
>>> import numpy
>>> da = xarray.DataArray(
... numpy.arange(64).reshape(4,4,4),
... coords={"lat":numpy.array([87, 88, 89, 90]),
... "lon":numpy.array([5, 120, 199, 360]),
... "t":numpy.array([10,11,12,13])}
... )
>>> da["t"].attrs["units"] = "months since 1960-01-01"
>>> da_crct = da.oap.correct_calendar()
>>> da_crct["t"].attrs["calendar"]
'360_day'
"""
data_obj = self._get_obj_oap(inplace=inplace)
if (
"calendar" in data_obj[self.t_dim].attrs.keys()
and data_obj[self.t_dim].attrs["calendar"] == "360"
):
data_obj[self.t_dim].attrs["calendar"] = "360_day"
logger.info("Calendar attribute changed from '360' to '360_day'.")
elif (
"units" in data_obj[self.t_dim].attrs.keys()
and "months since" in data_obj[self.t_dim].attrs["units"]
):
data_obj[self.t_dim].attrs["calendar"] = "360_day"
logger.info(
"Calendar attribute '360_day' added, "
"equivalent of 'units' 'months since'."
)
else:
logger.info("No 'units' or 'calendar' attributes to correct.")
return data_obj if not inplace else None
[docs]
def invert_coordinates(
self, inplace: bool = False
) -> Optional[Union[xr.DataArray, xr.Dataset]]:
"""
Invert latitude and longitude in data array.
This function checks for inversion of latitude and longitude
and inverts them if needed. Datasets with inverted coordinates
can produce incorrect results in certain functions like
``rasterstats.zonal_stats()``. Correctly ordered coordinates
should be:
* latitude: Largest to smallest.
* longitude: Smallest to largest.
If data array already has correct coordinate ordering, it is
directly returned. Function largely copied from
https://github.com/perrygeo/python-rasterstats/issues/218.
Parameters
----------
inplace : bool, optional
If True, will overwrite existing data array. Default is False.
Returns
-------
Union[xarray.DataArray, xarray.Dataset]
Data array or dataset with correct coordinate ordering.
Examples
--------
>>> import xarray
>>> import numpy
>>> da = xarray.DataArray(
... numpy.arange(16).reshape(4,4),
... coords={"lat":numpy.array([87, 88, 89, 90]),
... "lon":numpy.array([70, 69, 68, 67])}
... )
>>> da.oap.invert_coordinates(inplace=True)
>>> da.get_index("lon")
Index([67, 68, 69, 70], dtype='int64', name='lon')
>>> da.get_index("lat")
Index([90, 89, 88, 87], dtype='int64', name='lat')
"""
data_obj = self._get_obj_oap(inplace=inplace)
lon_inv, lat_inv = self._check_coords_inverted()
if lon_inv:
logger.info("Longitude was inverted, reversing coordinates.")
data_obj[self.x_dim] = data_obj[self.x_dim][::-1]
if lat_inv:
logger.info("Latitude was inverted, reversing coordinates.")
data_obj[self.y_dim] = data_obj[self.y_dim][::-1]
return data_obj if not inplace else None
def _check_coords_inverted(self):
"""
Check if latitude and longitude inverted.
Examples
--------
>>> import xarray
>>> import numpy
>>> da = xarray.DataArray(
... numpy.arange(16).reshape(4,4),
... coords={"lat":numpy.array([90, 89, 88, 87]),
... "lon":numpy.array([70, 69, 68, 67])}
... )
>>> da.oap._check_coords_inverted()
(True, False)
"""
data_obj = self._get_obj_oap(inplace=False)
lat = data_obj.get_index(self.y_dim)
lon = data_obj.get_index(self.x_dim)
return lon[0] > lon[-1], lat[0] < lat[-1]
[docs]
def change_longitude_range(
self, to_180_range: bool = True, inplace: bool = False
) -> Optional[Union[xr.DataArray, xr.Dataset]]:
"""Convert longitude range between -180 to 180 and 0 to 360.
The standard longitude range is from -180 to 180, while some
applications use 0 to 360. This includes
```rasterstats.zonal_stats`` <https://pypi.org/project/rasterstats/>`_,
which assumes ranges from 0 to 360.
``change_longitude_range()`` will convert between the
two coordinate ranges based on its current state.
By default it will use the -180 to 180 range unless
`to_180_range` is False, then it will use 0-360
If coordinates lie solely between 0 and 180 then there is
no need for conversion and the input will be returned.
Parameters
----------
to_180_range : bool, default = True
If True, the returned range is -180 to 180
Else, the returned range is 0 to 360
inplace : bool, optional
If True, will overwrite existing data array. Default is False.
Returns
-------
Union[xarray.DataArray, xarray.Dataset]
Dataset with transformed longitude coordinates.
Examples
--------
>>> import xarray
>>> import numpy
>>> import pandas
>>> temp = 15 + 8 * numpy.random.randn(4, 4, 3)
>>> precip = 10 * numpy.random.rand(4, 4, 3)
>>> ds = xarray.Dataset(
... {
... "temperature": (["lat", "lon", "time"], temp),
... "precipitation": (["lat", "lon", "time"], precip)
... },
... coords={
... "lat":numpy.array([87, 88, 89, 90]),
... "lon":numpy.array([5, 120, 199, 360]),
... "time": pandas.date_range("2014-09-06", periods=3)
... }
... )
>>> ds_inv = ds.oap.change_longitude_range()
>>> ds_inv.get_index("lon")
Index([-161, 0, 5, 120], dtype='int64', name='lon')
>>> # invert coordinates back to original, in place
>>> ds_inv.oap.change_longitude_range(to_180_range=False, inplace=True)
>>> ds_inv.get_index("lon")
Index([0, 5, 120, 199], dtype='int64', name='lon')
"""
data_obj = self._get_obj_oap(inplace=inplace)
if to_180_range and self.longitude_range == "360":
logger.info("Converting longitude from 0 360 to -180 to 180.")
data_obj[self.x_dim] = np.sort(
((data_obj[self.x_dim] + 180) % 360) - 180
)
data_obj.oap._longitude_range = "180"
elif not to_180_range and self.longitude_range == "180":
logger.info("Converting longitude from -180 to 180 to 0 to 360.")
data_obj[self.x_dim] = np.sort(data_obj[self.x_dim] % 360)
data_obj.oap._longitude_range = "360"
else:
logger.info("Coordinates already in required range.")
return data_obj if not inplace else None
def _get_obj_oap(self, inplace: bool) -> Union[xr.DataArray, xr.Dataset]:
"""
Get object to modify.
Originally directly used the ``rioxarray``
_get_obj() method, however, this was silently failing
so implementing a similar function directly within
this module.
Parameters
----------
inplace : bool
If True, returns self
Returns
-------
Union[xr.DataArray, xr.Dataset]
Object to modify.
"""
if inplace:
return self._obj
obj_copy = self._obj.copy(deep=True)
# preserve attribute information
obj_copy.rio._x_dim = self._x_dim
obj_copy.rio._y_dim = self._y_dim
obj_copy.rio._width = self._width
obj_copy.rio._height = self._height
obj_copy.rio._crs = self._crs
obj_copy.oap._t_dim = self._t_dim
obj_copy.oap._longitude_range = self._longitude_range
return obj_copy
[docs]
@xr.register_dataarray_accessor("oap")
class OapRasterArray(OapRasterMixin, RasterArray):
"""OCHA AnticiPy extension for xarray.DataArray."""
def __init__(self, xarray_object):
super().__init__(xarray_object)
[docs]
def compute_raster_stats(
self,
gdf: gpd.GeoDataFrame,
feature_col: str,
stats_list: Optional[List[str]] = None,
percentile_list: Optional[List[int]] = None,
all_touched: bool = False,
) -> pd.DataFrame:
"""Compute raster statistics for polygon geometry.
``compute_raster_stats()`` is designed to
quickly compute raster statistics across a polygon
and its features.
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame with row per area for stats computation.
If ``pd.DataFrame`` is passed, geometry column must
have the name ``geometry``.
feature_col : str
Column in ``gdf`` to use as row/feature identifier.
stats_list : Optional[List[str]], optional
List of statistics to calculate, by default None.
Passed to ``get_attr()``.
percentile_list : Optional[List[int]], optional
List of percentiles to compute, by default None.
all_touched : bool, optional
If ``True`` all cells touching the region will be
included, by default False. If ``False``, only
cells with their centre in the region will be
included.
Returns
-------
pandas.DataFrame
Dataframe with computed statistics.
Examples
--------
>>> import geopandas as gpd
>>> import xarray as xr
>>> import rioxarray
>>> from shapely.geometry import Polygon
>>>
>>> # compute raster stats on simple data
>>> d = {
... "name": ["area_a", "area_b"],
... "geometry": [
... Polygon([(0, 0), (0, 2), (2, 2), (2, 0)]),
... Polygon([(2, 0), (2, 2), (3, 2), (3, 0)]),
... ],
... }
>>> gdf = gpd.GeoDataFrame(d)
>>>
>>> da = xr.DataArray(
... [[1, 2, 3], [4, 5, 6]],
... dims=("y", "x"),
... coords={"y": [1.5, 0.5], "x": [0.5, 1.5, 2.5]},
... ).rio.write_crs("EPSG:4326")
>>>
>>> da.oap.compute_raster_stats(
... gdf=gdf,
... feature_col="name"
... ) # doctest: +SKIP
mean_name std_name min_name max_name sum_name count_name name # noqa: E501
0 3.0 1.5811388300841898 1 5 12.0 4 area_a # noqa: E501
1 4.5 1.5 3 6 9.0 2 area_b # noqa: E501
"""
if self._obj.rio.crs is None:
raise MissingCRS("No CRS found, set CRS before computation.")
data_obj = self._get_obj_oap(inplace=False)
df_list = []
if stats_list is None:
stats_list = ["mean", "std", "min", "max", "sum", "count"]
for feature in gdf[feature_col].unique():
gdf_adm = gdf[gdf[feature_col] == feature]
da_clip = data_obj.rio.set_spatial_dims(
x_dim=self.x_dim, y_dim=self.y_dim
)
# clip returns error if no overlapping raster cells for geometry
# so catching and skipping rest of iteration so no stats computed
try:
da_clip = da_clip.rio.clip(
gdf_adm.geometry, all_touched=all_touched
)
except NoDataInBounds:
logger.warning(
"No overlapping raster cells for %s, skipping.", feature
)
continue
grid_stat_all = []
for stat in stats_list:
# count automatically ignores NaNs
# therefore skipna can also not be given as an argument
# implemented count cause needed for computing percentages
kwargs: Dict[str, Any] = {}
if stat != "count":
kwargs["skipna"] = True
# makes sum return NaN instead of 0 if array
# only contains NaNs
if stat == "sum":
kwargs["min_count"] = 1
grid_stat = getattr(da_clip, stat)(
dim=[self.x_dim, self.y_dim], **kwargs
).rename(stat)
grid_stat_all.append(grid_stat)
if percentile_list is not None:
grid_quant = [
da_clip.quantile(quant / 100, dim=[self.x_dim, self.y_dim])
.drop_vars("quantile")
.rename(f"{quant}quant")
for quant in percentile_list
]
grid_stat_all.extend(grid_quant)
# if dims is 0, it throws an error when merging
# and then converting to a df
# this occurs when the input da is 2D
if not grid_stat_all[0].dims:
df_adm = pd.DataFrame(
{
da_stat.name: [da_stat.values]
for da_stat in grid_stat_all
}
)
else:
zonal_stats_xr = xr.merge(grid_stat_all)
df_adm = (
zonal_stats_xr.to_dataframe()
.drop("spatial_ref", axis=1)
.reset_index()
)
df_adm[feature_col] = feature
df_list.append(df_adm)
df_zonal_stats = pd.concat(df_list).reset_index(drop=True)
return df_zonal_stats
[docs]
@xr.register_dataset_accessor("oap")
class OapRasterDataset(OapRasterMixin, RasterDataset):
"""OCHA AnticiPy extension for xarray.Dataset."""
def __init__(self, xarray_object):
super().__init__(xarray_object)
[docs]
def get_raster_array(self, var_name: str) -> xr.DataArray:
"""Get xarray.DataArray from variable and keep dimensions.
Accessing a component xarray.DataArray using the
non-coordinate variable name loses and dimensions set
through ``rio`` or ``oap``. This includes ``x_dim``,
``y_dim``, and ``t_dim`` that have to be specifically
set using ``rio.set_spatial_dims()`` or
``oap.set_time_dim()`` respectively. For any dataset
``ds``, ``ds.get_raster_array("var")`` will retrieve
the data array without losing the dimensions. Using
``ds["var"]`` will lose the dimensions.
Parameters
----------
var_name : str
Name of variable.
Returns
-------
xarray.DataArray
A data array.
Examples
--------
>>> import xarray
>>> import numpy
>>> temp = 15 + 8 * numpy.random.randn(4, 4, 3)
>>> precip = 10 * np.random.rand(4, 4, 3)
>>> ds = xarray.Dataset(
... {
... "temperature": (["lat", "lon", "F"], temp),
... "precipitation": (["lat", "lon", "F"], precip)
... },
... coords={
... "lat":numpy.array([87, 88, 89, 90]),
... "lon":numpy.array([5, 120, 199, 360]),
... "F": pd.date_range("2014-09-06", periods=3)
... }
... )
>>> ds.oap.set_time_dim("F", inplace=True)
>>> da = ds.oap.get_raster_array("temperature")
>>> da.oap.t_dim
'F'
>>> # directly accessing array loses set dimensions
>>> ds['temperature'].oap.t_dim # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
rioxarray.exceptions.DimensionError: Time dimension not found.
'oap.set_time_dim()' or using 'rename()' to change the
dimension name to 't' can address this.
Data variable: temperature
"""
obj = self._obj[var_name]
# rioxarray attributes
if self._x_dim is not None and self._y_dim is not None:
obj.rio.set_spatial_dims(
x_dim=self._x_dim, y_dim=self._y_dim, inplace=True
)
if self.crs is not None:
obj.rio.set_crs(self._crs, inplace=True)
# raster module attributes
if self._t_dim is not None:
obj.oap.set_time_dim(self._t_dim, inplace=True)
return obj
[docs]
def compute_raster_stats(
self, var_names: Union[List[str], str, None] = None, **kwargs: Any
):
"""Compute raster statistics across dataset arrays.
``compute_raster_stats()`` calculates raster statistics on
component data arrays of a dataset. By default, calculates on all
non-coordinate variables, unless a list of variable names is
passed in, which then have statistics calculated for them.
Parameters
----------
var_names : Union[List[str], str, None], optional
Dataset data array variables to calculate raster statistics on.
kwargs : Any
Keyword arguments passed to the array method
:meth:`~OapRasterArray.compute_raster_stats`
Returns
-------
List[pandas.DataFrame]
List of raster statistics data frames.
Examples
--------
>>> import geopandas as gpd
>>> import xarray as xr
>>> import rioxarray
>>> from shapely.geometry import Polygon
>>>
>>> # compute raster stats on simple data
>>> d = {
... "name": ["area_a", "area_b"],
... "geometry": [
... Polygon([(0, 0), (0, 2), (2, 2), (2, 0)]),
... Polygon([(2, 0), (2, 2), (3, 2), (3, 0)]),
... ],
... }
>>> gdf = gpd.GeoDataFrame(d)
>>>
>>> ds = xr.DataArray(
... [[1, 2, 3], [4, 5, 6]],
... dims=("y", "x"),
... coords={"y": [1.5, 0.5], "x": [0.5, 1.5, 2.5]},
... ).rio.write_crs("EPSG:4326").to_dataset(name="data")
>>>
>>> ds.oap.compute_raster_stats(
... var_names=["data"],
... gdf=gdf,
... feature_col="name"
... ) # doctest: +SKIP
[ mean std min max sum count name # noqa: E501
0 3.0 1.5811388300841898 1 5 12.0 4 area_a # noqa: E501
1 4.5 1.5 3 6 9.0 2 area_b] # noqa: E501
"""
if var_names is None:
var_names = self.vars
elif isinstance(var_names, str):
var_names = [var_names]
stats = [
self._obj[var].oap.compute_raster_stats(**kwargs)
for var in var_names
]
return stats