Source code for rforge.library.containers.layer

import os
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import rasterio
from rforge.library.tools.exceptions import Errors
from rforge.library.tools.rescale_dataset import rescale_dataset

ERROR_MESSAGES = {
    "no_file": "Error: The file {file_path} does not exist.",
    "array": "ERROR: 'array' argument is {array_type}, but it must be a NumPy array of numeric type.",
    "bounds_type": (
        "ERROR: 'bounds' argument is {bounds_type}, but it must be a dictionary."
    ),
    "bounds_values": "ERROR: All values in 'bounds' must be numeric.",
    "bounds_keys": "ERROR: 'bounds' argument has keys {bounds_keys}, but must contain the keys {{'left', 'bottom', 'right', 'top'}}.",
    "crs": "ERROR: 'crs' argument is {crs_type}, but it must be a string.",
    "driver": "ERROR: 'driver' argument is {driver_type}, but it must be a string.",
    "no_data": "ERROR: 'no_data' argument is {no_data_type}, but it must be an integer or float.",
    "transform": "ERROR: 'transform' argument is {transform_type}, but it must be a tuple of six floats.",
    "units": "ERROR: 'units' argument is {units_type}, but it must be a string.",
}


[docs] class Layer: """Represents a data layer in a geospatial dataset. Attributes: _array (Optional[np.ndarray[np.int32]]): The array data of the layer. _bounds (Optional[Dict[str, float]]): The spatial bounds of the layer. _crs (Optional[str]): The coordinate reference system (CRS) of the layer. _driver (Optional[str]): The driver used for data storage. _no_data (Optional[Union[int, float]]): The value representing no data in the layer. _transform (Optional[Tuple[float, float, float, float, float, float]]): Affine transformation parameters. _units (Optional[str]): The units of the layer data. Methods: __init__: Initializes a Layer instance. __eq__: Checks equality between two Layer instances or a Layer and a numpy array. __str__: Returns a string representation of the layer attributes. import_layer: Imports layer data from a file. array: Getter and setter for the layer array data. bounds: Getter and setter for the spatial bounds. crs: Getter and setter for the CRS. driver: Getter and setter for the driver. no_data: Getter and setter for the no_data value. transform: Getter and setter for the affine transformation parameters. units: Getter and setter for the units. resolution: Computes the resolution of the layer. width: Computes the width of the layer. height: Computes the height of the layer. count: Computes the number of bands in the layer. mean: Computes the mean value(s) of the layer data. median: Computes the median value(s) of the layer data. min: Computes the minimum value(s) of the layer data. max: Computes the maximum value(s) of the layer data. std_dev: Computes the standard deviation value(s) of the layer data. """ _array: Optional[np.ndarray[Union[np.uint8, np.int32]]] = None _bounds: Optional[Dict[str, float]] = None _crs: Optional[str] = None _driver: Optional[str] = None _no_data: Optional[Union[int, float]] = None _transform: Optional[Tuple[float, float, float, float, float, float]] = None _units: Optional[str] = None
[docs] def __init__( self, array: Optional[np.ndarray[np.int32]] = None, bounds: Optional[Dict[str, Union[float, int]]] = None, crs: Optional[str] = None, driver: Optional[str] = None, no_data: Optional[Union[int, float]] = None, transform: Optional[Tuple[float, float, float, float, float, float]] = None, units: Optional[str] = None, ): if array is not None and not ( isinstance(array, np.ndarray) and np.issubdtype(array.dtype, np.number) ): raise TypeError( Errors.bad_input( name="array", provided_type=type(array), expected_type="a numeric array", ) ) if bounds is not None: if not isinstance(bounds, dict): raise TypeError( Errors.bad_input( name="bounds", provided_type=type(bounds), expected_type="a dictionary", ) ) if not all(isinstance(value, (int, float)) for value in bounds.values()): raise TypeError(ERROR_MESSAGES["bounds_values"]) if not (set(bounds.keys()) == {"left", "bottom", "right", "top"}): raise TypeError( ERROR_MESSAGES["bounds_keys"].format(bounds_keys=set(bounds.keys())) ) if crs is not None and not isinstance(crs, str): raise TypeError(ERROR_MESSAGES["crs"].format(crs_type=type(crs))) if driver is not None and not isinstance(driver, str): raise TypeError(ERROR_MESSAGES["driver"].format(driver_type=type(driver))) if no_data is not None and not isinstance(no_data, (int, float)): raise TypeError( ERROR_MESSAGES["no_data"].format(no_data_type=type(no_data)) ) if transform is not None and not ( isinstance(transform, tuple) and len(transform) == 6 and all((isinstance(value, (int, float)) for value in transform)) ): raise TypeError( ERROR_MESSAGES["transform"].format(transform_type=type(transform)) ) if units is not None and not isinstance(units, str): raise TypeError(ERROR_MESSAGES["units"].format(units_type=type(units))) self._array = array self._bounds = bounds self._crs = crs self._driver = driver self._no_data = no_data self._transform = transform self._units = units
[docs] def __eq__(self, other): if isinstance(other, Layer): return ( np.allclose(self._array, other.array, atol=0.01) and self._bounds == other.bounds and self._crs == other.crs and self._driver == other.driver and self._no_data == other.no_data and self._transform == other.transform and self._units == other.units ) elif isinstance(other, np.ndarray): return ( np.allclose(self._array, other, atol=0.01) and self.width == other.shape[1] and self.height == other.shape[0] and self.count == (other.shape[2] if len(other.shape) == 3 else 1) ) else: return False
[docs] def __str__(self) -> str: return str( { "crs": self.crs, "driver": self.driver, "bounds": self.bounds, "no_data": self.no_data, "transform": self.transform, "units": self.units, "resolution": self.resolution, "width": self.width, "height": self.height, "count": self.count, "mean": self.mean, "median": self.median, "minimum": self.min, "maximum": self.max, "standard_deviation": self.std_dev, } )
[docs] def import_layer(self, path: str, id: int = 1, scale: Optional[int] = None): if not os.path.exists(path): raise FileNotFoundError(ERROR_MESSAGES["no_file"].format(file_path=path)) with rasterio.open(path) as dataset: if scale is not None: dataset = rescale_dataset(dataset, scale) array = dataset.read(id) bounds = { "left": dataset.bounds[0], "bottom": dataset.bounds[1], "right": dataset.bounds[2], "top": dataset.bounds[3], } crs = ( str(dataset.crs.to_epsg()) if dataset.crs.to_epsg() is not None else "4326" ) driver = dataset.meta["driver"].upper() no_data = dataset.nodata transform = ( dataset.transform.c, dataset.transform.a, dataset.transform.b, dataset.transform.f, dataset.transform.d, dataset.transform.e, ) units = dataset.units[id - 1] self.array = array self.bounds = bounds self.crs = crs self.driver = driver self.no_data = no_data self.transform = transform self.units = units
@property def array(self) -> Optional[np.ndarray[np.int32]]: return self._array @array.setter def array(self, value: np.ndarray[np.int32]): if value is not None and not ( isinstance(value, np.ndarray) and np.issubdtype(value.dtype, np.number) ): raise TypeError(ERROR_MESSAGES["array"].format(array_type=type(value))) self._array = value @property def bounds(self) -> Optional[Dict[str, float]]: return self._bounds @bounds.setter def bounds(self, value: Dict[str, float]): if value is not None: if not isinstance(value, dict): raise TypeError( ERROR_MESSAGES["bounds_type"].format(bounds_type=type(value)) ) if not all(isinstance(value, (int, float)) for value in value.values()): raise TypeError(ERROR_MESSAGES["bounds_values"]) if not (set(value.keys()) == {"left", "bottom", "right", "top"}): raise TypeError( ERROR_MESSAGES["bounds_keys"].format(bounds_keys=set(value.keys())) ) self._bounds = value @property def crs(self) -> Optional[str]: return self._crs @crs.setter def crs(self, value: str): if value is not None and not isinstance(value, str): raise TypeError(ERROR_MESSAGES["crs"].format(crs_type=type(value))) self._crs = value @property def driver(self) -> Optional[str]: return self._driver @driver.setter def driver(self, value: str): if value is not None and not isinstance(value, str): raise TypeError(ERROR_MESSAGES["driver"].format(driver_type=type(value))) self._driver = value @property def no_data(self) -> Optional[Union[int, float]]: return self._no_data @no_data.setter def no_data(self, value: Union[int, float]): if value is not None and not isinstance(value, (int, float)): raise TypeError(ERROR_MESSAGES["no_data"].format(no_data_type=type(value))) self._no_data = value @property def transform(self) -> Optional[Tuple[float, float, float, float, float, float]]: return self._transform @transform.setter def transform(self, value: Tuple[float, float, float, float, float, float]): if value is not None and not ( isinstance(value, tuple) and len(value) == 6 and all((isinstance(v, (int, float)) for v in value)) ): raise TypeError( ERROR_MESSAGES["transform"].format(transform_type=type(value)) ) self._transform = value @property def units(self) -> Optional[str]: return self._units @units.setter def units(self, value: str): if value is not None and not isinstance(value, str): raise TypeError(ERROR_MESSAGES["units"].format(units_type=type(value))) self._units = value @property def resolution(self) -> float: if self._array is not None and self._transform is not None: return self._transform[1] else: return 0 @property def width(self) -> int: if self._array is not None: return self._array.shape[1] else: return 0 @property def height(self) -> int: if self._array is not None: return self._array.shape[0] else: return 0 @property def count(self) -> int: if self._array is not None: return self._array.shape[2] if len(self._array.shape) == 3 else 1 else: return 0 @property def mean(self) -> Optional[Union[float, int, list[Union[int, float]]]]: if self._array is not None: return ( float(np.mean(self._array)) if len(self._array.shape) <= 2 else [ float(np.mean(self._array[:, :, i])) for i in range(self._array.shape[2]) ] ) else: return None @property def median(self) -> Optional[Union[float, int, list[Union[int, float]]]]: if self._array is not None: return ( float(np.median(self._array)) if len(self._array.shape) <= 2 else [ float(np.median(self._array[:, :, i])) for i in range(self._array.shape[2]) ] ) else: return None @property def min(self) -> Optional[Union[float, int, list[Union[int, float]]]]: if self._array is not None: return ( float(np.min(self._array)) if len(self._array.shape) <= 2 else [ float(np.min(self._array[:, :, i])) for i in range(self._array.shape[2]) ] ) else: return None @property def max(self) -> Optional[Union[float, int, list[Union[int, float]]]]: if self._array is not None: return ( float(np.max(self._array)) if len(self._array.shape) <= 2 else [ float(np.max(self._array[:, :, i])) for i in range(self._array.shape[2]) ] ) else: return None @property def std_dev(self) -> Optional[Union[float, int, list[Union[int, float]]]]: if self._array is not None: return ( float(np.std(self._array)) if len(self._array.shape) <= 2 else [ float(np.std(self._array[:, :, i])) for i in range(self._array.shape[2]) ] ) else: return None