diff --git a/NOTICE b/NOTICE index 0a862f21..e038e7db 100644 --- a/NOTICE +++ b/NOTICE @@ -74,11 +74,6 @@ Copyright (c) 2012-2023 Anaconda, Inc. Website: https://numba.pydata.org/ License: BSD 2-Clause. -OpenCV (cv2): Computer vision library. -Copyright (C) 2015-2023, OpenCV Foundation, all rights reserved. -Website: https://opencv.org/ -License: Apache License 2.0 - scikit-image: Image processing in Python. Copyright (c) 2009-2022 the scikit-image team. Website: https://scikit-image.org/ diff --git a/dev-environment.yml b/dev-environment.yml index 91cbf4d5..47c3b6c1 100644 --- a/dev-environment.yml +++ b/dev-environment.yml @@ -48,12 +48,7 @@ dependencies: - numpydoc - pip: - # Optional dependency - -e ./ - - noisyopt - # "Headless" needed for opencv to install without requiring system dependencies - - opencv-contrib-python-headless - # To run CI against latest GeoUtils # - git+https://github.com/rhugonnet/geoutils.git diff --git a/doc/source/api.md b/doc/source/api.md index 085378ba..08305452 100644 --- a/doc/source/api.md +++ b/doc/source/api.md @@ -259,7 +259,9 @@ To build and pass your coregistration pipeline to {func}`~xdem.DEM.coregister_3d coreg.VerticalShift coreg.NuthKaab coreg.DhMinimize + coreg.LZD coreg.ICP + coreg.CPD ``` #### Manipulating affine transforms @@ -268,15 +270,10 @@ To build and pass your coregistration pipeline to {func}`~xdem.DEM.coregister_3d .. autosummary:: :toctree: gen_modules/ - coreg.AffineCoreg.from_matrix - coreg.AffineCoreg.to_matrix - coreg.AffineCoreg.from_translations - coreg.AffineCoreg.to_translations - coreg.AffineCoreg.from_rotations - coreg.AffineCoreg.to_rotations - coreg.apply_matrix coreg.invert_matrix + coreg.matrix_from_translations_rotations + coreg.translations_rotations_from_matrix ``` ### Bias-correction diff --git a/doc/source/citation.md b/doc/source/citation.md index 7966b760..c25603ba 100644 --- a/doc/source/citation.md +++ b/doc/source/citation.md @@ -16,7 +16,7 @@ More details are available on each feature page! ### Terrain attributes ```{list-table} - :widths: 1 1 + :widths: 1 2 :header-rows: 1 :stub-columns: 1 @@ -41,7 +41,7 @@ More details are available on each feature page! ### Coregistration ```{list-table} - :widths: 1 1 + :widths: 1 2 :header-rows: 1 :stub-columns: 1 @@ -51,8 +51,12 @@ More details are available on each feature page! - [Nuth and Kääb (2011)](https://doi.org/10.5194/tc-5-271-2011) * - Dh minimization - N/A + * - Least Z-difference + - [Rosenholm and Torlegård (1988)](https://www.asprs.org/wp-content/uploads/pers/1988journal/oct/1988_oct_1385-1389.pdf) * - Iterative closest point - - [Besl and McKay (1992)](https://doi.org/10.1117/12.57955) + - [Besl and McKay (1992)](https://doi.org/10.1117/12.57955), [Chen and Medioni (1992)](https://doi.org/10.1016/0262-8856(92)90066-C) + * - Coherent point drift + - [Myronenko and Song (2010)](https://doi.org/10.1109/TPAMI.2010.46) * - Vertical shift - N/A ``` @@ -60,7 +64,7 @@ More details are available on each feature page! ### Bias-correction ```{list-table} - :widths: 1 1 + :widths: 1 2 :header-rows: 1 :stub-columns: 1 @@ -68,16 +72,12 @@ More details are available on each feature page! - Reference * - Deramp - N/A - * - Directional bias (sum of sinuoids) + * - Directional bias (sinusoids) - [Girod et al. (2017)](https://doi.org/10.3390/rs9070704) - * - Directional bias (other) - - N/A - * - Terrain bias (maximum curvature) + * - Terrain bias (curvature) - [Gardelle et al. (2012)](https://doi.org/10.3189/2012JoG11J175) * - Terrain bias (elevation) - [Nuth and Kääb (2011)](https://doi.org/10.5194/tc-5-271-2011) - * - Terrain bias (other) - - N/A * - Vertical shift - N/A ``` @@ -85,7 +85,7 @@ More details are available on each feature page! ### Gap-filling ```{list-table} - :widths: 1 1 + :widths: 1 2 :header-rows: 1 :stub-columns: 1 @@ -101,14 +101,14 @@ More details are available on each feature page! ### Uncertainty analysis ```{list-table} - :widths: 2 1 + :widths: 1 1 :header-rows: 1 :stub-columns: 1 * - Method - Reference - * - R2009 (multiple correlation ranges, circular approximation) + * - R2009 (nested ranges, circular approx.) - [Rolstad et al. (2009)](http://dx.doi.org/10.3189/002214309789470950) - * - H2022 (heteroscedasticity, multiple correlation ranges, spatial propagation approximation) + * - H2022 (heterosc., nested ranges, spatial propag.) - [Hugonnet et al. (2022)](http://dx.doi.org/10.1109/JSTARS.2022.3188922) ``` diff --git a/doc/source/coregistration.md b/doc/source/coregistration.md index 218fe7f6..d6bdc9b8 100644 --- a/doc/source/coregistration.md +++ b/doc/source/coregistration.md @@ -111,14 +111,20 @@ Often, an `inlier_mask` has to be passed to {func}`~xdem.coreg.Coreg.fit` to iso - Description - Reference * - {ref}`nuthkaab` - - Horizontal and vertical translations + - Translations - [Nuth and Kääb (2011)](https://doi.org/10.5194/tc-5-271-2011) * - {ref}`dh-minimize` - - Horizontal and vertical translations + - Translations - N/A + * - {ref}`lzd` + - Translations and rotations + - [Rosenholm and Torlegård (1988)](https://www.asprs.org/wp-content/uploads/pers/1988journal/oct/1988_oct_1385-1389.pdf) * - {ref}`icp` - - Translation and rotations - - [Besl and McKay (1992)](https://doi.org/10.1117/12.57955) + - Translations and rotations + - [Besl and McKay (1992)](https://doi.org/10.1117/12.57955), [Chen and Medioni (1992)](https://doi.org/10.1016/0262-8856(92)90066-C) + * - {ref}`cpd` + - Translations and rotations + - [Myronenko and Song (2010)](https://doi.org/10.1109/TPAMI.2010.46) * - {ref}`vshift` - Vertical translation - N/A @@ -305,20 +311,17 @@ _ = ax[1].set_yticklabels([]) plt.tight_layout() ``` -(icp)= -### Iterative closest point +(lzd)= +### Least Z-difference -{class}`xdem.coreg.ICP` +{class}`xdem.coreg.LZD` - **Performs:** Rigid transform transformation (3D translation + 3D rotation). - **Does not support weights.** -- **Pros:** Efficient at estimating rotation and shifts simultaneously. -- **Cons:** Poor sub-pixel accuracy for horizontal shifts, sensitive to outliers, and runs slowly with large samples. +- **Pros:** Good at solving sub-pixels shifts by harnessing gridded nature of DEM. +- **Cons:** Sensitive to elevation outliers. -Iterative Closest Point (ICP) coregistration is an iterative point cloud registration method from [Besl and McKay (1992)](https://doi.org/10.1117/12.57955). It aims at iteratively minimizing the distance between closest neighbours by applying sequential rigid transformations. If DEMs are used as inputs, they are converted to point clouds. -As for Nuth and Kääb (2011), the iteration stops if it reaches the maximum number of iteration limit or if the iterative transformation amplitude falls below a specified tolerance. - -ICP is currently based on [OpenCV's implementation](https://docs.opencv.org/4.x/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html) (an optional dependency), which includes outlier removal arguments. This may improve results significantly on outlier-prone data, but care should still be taken, as the risk of landing in [local minima](https://en.wikipedia.org/wiki/Maxima_and_minima) increases. +Least Z-difference (LZD) coregistration is an iterative point-grid registration method from [Rosenholm and Torlegård (1988)](https://www.asprs.org/wp-content/uploads/pers/1988journal/oct/1988_oct_1385-1389.pdf). ```{code-cell} ipython3 :tags: [hide-cell] @@ -327,21 +330,55 @@ ICP is currently based on [OpenCV's implementation](https://docs.opencv.org/4.x/ : code_prompt_hide: "Hide the code for adding a shift and rotation" # Apply a rotation of 0.2 degrees in X, 0.1 in Y and 0 in Z -e = np.deg2rad([0.2, 0.1, 0]) +rotations = np.array([0.2, 0.1, 0]) # Add X/Y/Z shifts in meters shifts = np.array([10, 20, 5]) # Affine matrix for 3D transformation -import pytransform3d -matrix_rot = pytransform3d.rotations.matrix_from_euler(e, i=0, j=1, k=2, extrinsic=True) -matrix = np.diag(np.ones(4, dtype=float)) -matrix[:3, :3] = matrix_rot -matrix[:3, 3] = shifts +matrix = xdem.coreg.matrix_from_translations_rotations(*shifts, *rotations) -centroid = [ref_dem.bounds.left + 5000, ref_dem.bounds.top - 2000, np.median(ref_dem)] # We create misaligned elevation data +centroid = [ref_dem.bounds.left + 5000, ref_dem.bounds.top - 2000, np.median(ref_dem)] tba_dem_shifted_rotated = xdem.coreg.apply_matrix(ref_dem, matrix, centroid=centroid) ``` +```{code-cell} ipython3 +# Define a coregistration based on LZD +lzd = xdem.coreg.LZD() +# Fit to data and apply +aligned_dem = lzd.fit_and_apply(ref_dem, tba_dem_shifted_rotated) +``` + +```{code-cell} ipython3 +:tags: [hide-input] +:mystnb: +: code_prompt_show: "Show plotting code" +: code_prompt_hide: "Hide plotting code" + +# Plot before and after +f, ax = plt.subplots(1, 2) +ax[0].set_title("Before LZD") +(tba_dem_shifted_rotated - ref_dem).plot(cmap='RdYlBu', vmin=-30, vmax=30, ax=ax[0]) +ax[1].set_title("After LZD") +(aligned_dem - ref_dem).plot(cmap='RdYlBu', vmin=-30, vmax=30, ax=ax[1], cbar_title="Elevation differences (m)") +_ = ax[1].set_yticklabels([]) +plt.tight_layout() +``` + +(icp)= +### Iterative closest point + +{class}`xdem.coreg.ICP` + +- **Performs:** Rigid transform transformation (3D translation + 3D rotation). +- **Does not support weights.** +- **Pros:** Efficient at estimating rotation and shifts simultaneously. +- **Cons:** Poor sub-pixel accuracy for horizontal shifts, sensitive to outliers, and runs slowly with large samples. + +Iterative closest point (ICP) coregistration is an iterative point cloud registration method from [Besl and McKay (1992)](https://doi.org/10.1117/12.57955) (point-to-point) or [Chen and Medioni (1992)](https://doi.org/10.1016/0262-8856(92)90066-C) (point-to-plane). +It aims at iteratively minimizing the distance between closest neighbours by applying sequential rigid transformations. For point-to-point, the 3D distance is used for the minimization while, for point-to-plane, the 3D distance projected on the plane normals is used instead. If DEMs are used as inputs, they are converted to point clouds. +As for Nuth and Kääb (2011), the iteration stops if it reaches the maximum number of iteration limit or if the iterative transformation amplitude falls below a specified tolerance. + + ```{code-cell} ipython3 # Define a coregistration based on ICP icp = xdem.coreg.ICP() @@ -365,6 +402,43 @@ _ = ax[1].set_yticklabels([]) plt.tight_layout() ``` +(cpd)= +### Coherent point drift + +{class}`xdem.coreg.CPD` + +- **Performs:** Rigid transform transformation (3D translation + 3D rotation). +- **Does not support weights.** +- **Pros:** Needs fewer samples to work efficiently. +- **Cons:** Has trouble solving for horizontal translations if X/Y scale differs from Z scale, as GMM variance is the same in all dimensions. + +Coherent point drift (CPD) coregistration is an iterative point cloud registration method from [Myronenko and Song (2010)](https://doi.org/10.1109/TPAMI.2010.46). +It is a probabilistic approach, iteratively fitting Gaussian mixture model (GMM) centroids for the reference point cloud on the target point cloud by maximizing the likelihood of the probability density estimation problem. + + +```{code-cell} ipython3 +# Define a coregistration based on CPD +cpd = xdem.coreg.CPD() +# Fit to data and apply +aligned_dem = cpd.fit_and_apply(ref_dem, tba_dem_shifted_rotated) +``` + +```{code-cell} ipython3 +:tags: [hide-input] +:mystnb: +: code_prompt_show: "Show plotting code" +: code_prompt_hide: "Hide plotting code" + +# Plot before and after +f, ax = plt.subplots(1, 2) +ax[0].set_title("Before CPD") +(tba_dem_shifted_rotated - ref_dem).plot(cmap='RdYlBu', vmin=-30, vmax=30, ax=ax[0]) +ax[1].set_title("After CPD") +(aligned_dem - ref_dem).plot(cmap='RdYlBu', vmin=-30, vmax=30, ax=ax[1], cbar_title="Elevation differences (m)") +_ = ax[1].set_yticklabels([]) +plt.tight_layout() +``` + ## Building coregistration pipelines ### The {class}`~xdem.coreg.CoregPipeline` object @@ -476,6 +550,8 @@ For both **inputs** and **outputs**, four consistent categories of metadata are **4. Affine metadata (common to all affine methods)**: +- An input `only_translation` to define if a coregistration should solve only for translations, +- An input `standardize` to define if the input data should be standardized to the unit sphere before coregistration, - An output `matrix` that stores the estimated affine matrix, - An output `centroid` that stores the centroid coordinates with which to apply the affine transformation, - Outputs `shift_x`, `shift_y` and `shift_z` that store the easting, northing and vertical offsets, respectively. diff --git a/examples/advanced/plot_norm_regional_hypso.py b/examples/advanced/plot_norm_regional_hypso.py index 5672acb7..e65224c5 100644 --- a/examples/advanced/plot_norm_regional_hypso.py +++ b/examples/advanced/plot_norm_regional_hypso.py @@ -58,13 +58,13 @@ # %% -# To test the method, we can generate a semi-random mask to assign nans to glacierized areas. +# To test the method, we can generate a random mask to assign nans to glacierized areas. # Let's remove 30% of the data. -random_nans = (xdem.misc.generate_random_field(dem_1990.shape, corr_size=5, random_state=42) > 0.7) & ( - glacier_index_map > 0 -) +index_nans = dem_2009.subsample(subsample=0.3, return_indices=True) +mask_nans = dem_2009.copy(new_array=np.ones(dem_2009.shape)) +mask_nans[index_nans] = 0 -random_nans.plot() +mask_nans.plot() # %% # The normalized hypsometric signal shows the tendency for elevation change as a function of elevation. @@ -74,7 +74,7 @@ # Generating it first, however, allows us to visualize and validate it. ddem = dem_2009 - dem_1990 -ddem_voided = np.where(random_nans.data, np.nan, ddem.data) +ddem_voided = np.where(mask_nans.data, np.nan, ddem.data) signal = xdem.volume.get_regional_hypsometric_signal( ddem=ddem_voided, @@ -105,7 +105,7 @@ # %% # We can plot the difference between the actual and the interpolated values, to validate the method. -difference = (ddem_filled - ddem)[random_nans] +difference = (ddem_filled - ddem)[mask_nans.data] median = np.ma.median(difference) nmad = xdem.spatialstats.nmad(difference) diff --git a/setup.cfg b/setup.cfg index 266f9173..3ec53479 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,7 +49,6 @@ include = [options.extras_require] opt = - opencv-contrib-python-headless pytransform3d noisyopt scikit-learn diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 8843f516..803c429b 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -3,21 +3,28 @@ from __future__ import annotations import os.path +import re import warnings import geopandas as gpd import geoutils import numpy as np import pytest -import pytransform3d import rasterio as rio +import scipy.optimize from geoutils import Raster, Vector from geoutils.raster import RasterType from geoutils.raster.geotransformations import _translate from scipy.ndimage import binary_dilation from xdem import coreg, examples -from xdem.coreg.affine import AffineCoreg, NuthKaab, _reproject_horizontal_shift_samecrs +from xdem.coreg.affine import ( + AffineCoreg, + _reproject_horizontal_shift_samecrs, + invert_matrix, + matrix_from_translations_rotations, + translations_rotations_from_matrix, +) def load_examples(crop: bool = True) -> tuple[RasterType, RasterType, Vector]: @@ -155,7 +162,7 @@ def test_raise_all_nans(self) -> None: @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore @pytest.mark.parametrize("shifts", [(20, 5, 2), (-50, 100, 2)]) # type: ignore - @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.DhMinimize, coreg.ICP]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.DhMinimize, coreg.ICP, coreg.LZD]) # type: ignore def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> None: """ Test the horizontal/vertical shift coregistrations with synthetic shifted data. These tests include NuthKaab, @@ -186,14 +193,15 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> elev_fit_args["to_be_aligned_elev"] = ref_shifted # Run coregistration - coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=subsample_size, random_state=42) # Check all fit parameters are the opposite of those used above, within a relative 1% (10% for ICP) fit_shifts = [-horizontal_coreg.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] # ICP can be less precise than other methods rtol = 10e-2 if coreg_method == coreg.NuthKaab else 10e-1 - assert np.allclose(shifts, fit_shifts, rtol=rtol) + assert np.allclose(fit_shifts, shifts, rtol=rtol) # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): @@ -225,7 +233,8 @@ def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> [ (coreg.NuthKaab, (9.202739, 2.735573, -1.97733)), (coreg.DhMinimize, (10.0850892, 2.898172, -1.943001)), - (coreg.ICP, (8.73833, 1.584255, -1.943957)), + (coreg.LZD, (9.969819, 2.140150, -1.9257709)), + (coreg.ICP, (5.417970, 1.1282436, -2.0662609)), ], ) # type: ignore def test_coreg_translations__example( @@ -242,7 +251,8 @@ def test_coreg_translations__example( # Get the coregistration method and expected shifts from the inputs coreg_method, expected_shifts = coreg_method__shift - c = coreg_method(subsample=50000) + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + c = coreg_method(subsample=subsample_size) c.fit(ref, tba, inlier_mask=inlier_mask, random_state=42) # Check the output translations match the exact values @@ -329,11 +339,13 @@ def test_coreg_vertical_translation__example( assert vshift == pytest.approx(expected_vshift) @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore - @pytest.mark.parametrize("shifts_rotations", [(20, 5, 0, 0.02, 0.05, 0.1), (-50, 100, 0, 10, 5, 4)]) # type: ignore - @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore + @pytest.mark.parametrize( + "shifts_rotations", [(20, 5, 0.1, 0.1, 0.05, 0.01), (-50, 100, 0.1, 1, 0.5, 0.01)] + ) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.ICP, coreg.LZD, coreg.CPD]) # type: ignore def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: """ - Test the rigid coregistrations with synthetic misaligned (shifted and rotatedà data. These tests include ICP. + Test the rigid coregistrations with synthetic misaligned (shifted and rotated) data. We test all combinaison of inputs: raster-raster, point-raster and raster-point. @@ -353,15 +365,7 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) ref = self.ref # Create synthetic rigid transformation (translation and rotation) from the reference DEM - sr = np.array(shifts_rotations) - shifts = sr[:3] - rotations = sr[3:6] - e = np.deg2rad(rotations) - # Derive is a 3x3 rotation matrix - rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) - matrix = np.diag(np.ones(4, float)) - matrix[:3, :3] = rot_matrix - matrix[:3, 3] = shifts + matrix = matrix_from_translations_rotations(*shifts_rotations) # Pass a centroid centroid = (ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)) @@ -375,19 +379,21 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) elev_fit_args["to_be_aligned_elev"] = ref_shifted_rotated # Run coregistration - coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=subsample_size, random_state=42) - # Check that fit matrix is the invert of those used above, within a relative 10% for rotations, and within - # a large 100% margin for shifts, as ICP has relative difficulty resolving shifts with large rotations + # Check that fit matrix is the invert of those used above, within a relative % for rotations fit_matrix = horizontal_coreg.meta["outputs"]["affine"]["matrix"] - invert_fit_matrix = coreg.invert_matrix(fit_matrix) - invert_fit_shifts = invert_fit_matrix[:3, 3] - invert_fit_rotations = pytransform3d.rotations.euler_from_matrix( - invert_fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True - ) - invert_fit_rotations = np.rad2deg(invert_fit_rotations) - assert np.allclose(shifts, invert_fit_shifts, rtol=1) - assert np.allclose(rotations, invert_fit_rotations, rtol=10e-1) + invert_fit_matrix = invert_matrix(fit_matrix) + invert_fit_shifts_rotations = translations_rotations_from_matrix(invert_fit_matrix) + + # Check that shifts are not unreasonable within 100%, except for CPD that has trouble + if coreg_method != coreg.CPD: + assert np.allclose(shifts_rotations[0:3], invert_fit_shifts_rotations[:3], rtol=1) + + # Specify rotation precision: LZD is usually more precise than ICP + atol = 10e-3 if coreg_method == coreg.LZD else 2 * 10e-2 + assert np.allclose(shifts_rotations[3:], invert_fit_shifts_rotations[3:], atol=atol) # For a point cloud output, need to interpolate with the other DEM to get dh if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): @@ -411,12 +417,18 @@ def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) plt.show() # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity - # Only 95% of variance as ICP cannot always resolve the last shifts - assert np.nanvar(dh / np.nanstd(init_dh)) < 0.05 + # Checking for 95% of variance as ICP cannot always resolve the small shifts + # And only 30% of variance for CPD that can't resolve shifts at all + fac_reduc_var = 0.05 if coreg_method != coreg.CPD else 0.7 + assert np.nanvar(dh / np.nanstd(init_dh)) < fac_reduc_var @pytest.mark.parametrize( "coreg_method__shifts_rotations", - [(coreg.ICP, (8.738332, 1.584255, -1.943957, 0.0069004, -0.00703, -0.0119733))], + [ + (coreg.ICP, (5.417970, 1.128243, -2.066260, 0.0071103, -0.007524, -0.0047392)), + (coreg.LZD, (9.969819, 2.140150, -1.925771, 0.0070245, -0.00766, -0.008174)), + (coreg.CPD, (0.005405, 0.005163, -2.047066, 0.0070245, -0.00755, -0.0000405)), + ], ) # type: ignore def test_coreg_rigid__example( self, coreg_method__shifts_rotations: tuple[type[AffineCoreg], tuple[float, float, float]] @@ -433,24 +445,143 @@ def test_coreg_rigid__example( coreg_method, expected_shifts_rots = coreg_method__shifts_rotations # Run co-registration - c = coreg_method(subsample=50000) + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + c = coreg_method(subsample=subsample_size) c.fit(ref, tba, inlier_mask=inlier_mask, random_state=42) # Check the output translations and rotations match the exact values fit_matrix = c.meta["outputs"]["affine"]["matrix"] - fit_shifts = fit_matrix[:3, 3] - fit_rotations = pytransform3d.rotations.euler_from_matrix(fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True) - fit_rotations = np.rad2deg(fit_rotations) - fit_shifts_rotations = tuple(np.concatenate((fit_shifts, fit_rotations))) - + fit_shifts_rotations = translations_rotations_from_matrix(fit_matrix) assert fit_shifts_rotations == pytest.approx(expected_shifts_rots, abs=10e-6) + @pytest.mark.parametrize( + "rigid_coreg", + [ + coreg.ICP(method="point-to-point", max_iterations=20), + coreg.ICP(method="point-to-plane"), + coreg.ICP(fit_minimizer="lsq_approx"), + coreg.ICP(fit_minimizer=scipy.optimize.least_squares), + coreg.ICP(picky=True), + coreg.ICP(picky=False), + coreg.CPD(weight=0.5), + ], + ) # type: ignore + def test_coreg_rigid__specific_args(self, rigid_coreg) -> None: + """ + Check that all specific arguments (non-fitting and binning, subsampling, iterative) of rigid coregistrations + run correctly and yield with a reasonable output by comparing back after a synthetic transformation. + """ + + # Get reference elevation + ref = self.ref + + # Add artificial shift and rotations + shifts_rotations = (300, 150, 75, 1, 0.5, 0.2) + matrix = matrix_from_translations_rotations(*shifts_rotations) + centroid = (ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)) + ref_shifted_rotated = coreg.apply_matrix(ref, matrix=matrix, centroid=centroid) + + # Coregister + subsample_size = 50000 if rigid_coreg.__class__.__name__ != "CPD" else 500 + rigid_coreg.fit(ref, ref_shifted_rotated, random_state=42, subsample=subsample_size) + + # Check that fit matrix is the invert of those used above, within a relative % for rotations + fit_matrix = rigid_coreg.meta["outputs"]["affine"]["matrix"] + invert_fit_matrix = invert_matrix(fit_matrix) + invert_fit_shifts_rotations = translations_rotations_from_matrix(invert_fit_matrix) + + # Not so precise for shifts + if rigid_coreg.__class__.__name__ != "CPD": + assert np.allclose(invert_fit_shifts_rotations[:3], shifts_rotations[:3], rtol=1) + # Precise for rotations + assert np.allclose(invert_fit_shifts_rotations[3:], shifts_rotations[3:], rtol=10e-1, atol=2 * 10e-2) + + @pytest.mark.parametrize("coreg_method", [coreg.ICP, coreg.CPD, coreg.LZD]) # type: ignore + def test_coreg_rigid__only_translation(self, coreg_method) -> None: + + # Get reference elevation + ref = self.ref + + # Add artificial shift and rotations + # (Define small rotations on purpose, so that the "translation only" coregistration is not affected) + shifts_rotations = (300, 150, 75, 0.01, 0.01, 0.01) + matrix = matrix_from_translations_rotations(*shifts_rotations) + centroid = (ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)) + ref_shifted_rotated = coreg.apply_matrix(ref, matrix=matrix, centroid=centroid) + + # Run co-registration + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + c = coreg_method(subsample=subsample_size, only_translation=True) + c.fit(ref, ref_shifted_rotated, random_state=42) + + # Get invert of resulting matrix + fit_matrix = c.meta["outputs"]["affine"]["matrix"] + invert_fit_matrix = invert_matrix(fit_matrix) + invert_fit_shifts_translations = translations_rotations_from_matrix(invert_fit_matrix) + + # Check that rotations are not solved for + assert np.allclose(invert_fit_shifts_translations[3:], 0) + + # Check that translations are not far from expected values + if coreg_method != coreg.CPD: + assert np.allclose(invert_fit_shifts_translations[:3], shifts_rotations[:3], rtol=10e-1) + + @pytest.mark.parametrize("coreg_method", [coreg.ICP, coreg.CPD]) # type: ignore + def test_coreg_rigid__standardize(self, coreg_method) -> None: + + # Get reference elevation + ref = self.ref + + # Add artificial shift and rotations + # (Define small rotations on purpose, so that the "translation only" coregistration is not affected) + shifts_rotations = (300, 150, 75, 1, 0.5, 0.2) + matrix = matrix_from_translations_rotations(*shifts_rotations) + centroid = (ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)) + ref_shifted_rotated = coreg.apply_matrix(ref, matrix=matrix, centroid=centroid) + + # 1/ Run co-registration with standardization + subsample_size = 50000 if coreg_method != coreg.CPD else 500 + c_std = coreg_method(subsample=subsample_size, standardize=True) + c_std.fit(ref, ref_shifted_rotated, random_state=42) + + # Get invert of resulting matrix + fit_matrix_std = c_std.meta["outputs"]["affine"]["matrix"] + invert_fit_shifts_translations_std = translations_rotations_from_matrix(invert_matrix(fit_matrix_std)) + + # Check that standardized result are OK + if coreg_method != coreg.CPD: + assert np.allclose(invert_fit_shifts_translations_std[:3], shifts_rotations[:3], rtol=1) + assert np.allclose(invert_fit_shifts_translations_std[3:], shifts_rotations[3:], rtol=10e-1, atol=2 * 10e-2) + + # 2/ Run coregistration without standardization + + c_nonstd = coreg_method(subsample=subsample_size, standardize=False) + + # For CPD, without standardization, the numerics fail + if coreg_method == coreg.CPD: + with pytest.raises( + ValueError, + match=re.escape("CPD coregistration numerics during np.linalg.svd(), " "try setting standardize=True."), + ): + c_nonstd.fit(ref, ref_shifted_rotated, random_state=42) + return + # For ICP, the numerics pass + else: + c_nonstd.fit(ref, ref_shifted_rotated, random_state=42) + + fit_matrix_nonstd = c_nonstd.meta["outputs"]["affine"]["matrix"] + invert_fit_shifts_translations_nonstd = translations_rotations_from_matrix(invert_matrix(fit_matrix_nonstd)) + + # Check results are worse for non-standardized + assert np.allclose(invert_fit_shifts_translations_nonstd[:3], shifts_rotations[:3], rtol=1) + assert np.allclose(invert_fit_shifts_translations_nonstd[3:], shifts_rotations[3:], rtol=10e-1, atol=2 * 10e-2) + def test_nuthkaab_no_vertical_shift(self) -> None: ref, tba = load_examples(crop=False)[0:2] # Compare Nuth and Kaab method with and without applying vertical shift - coreg_method1 = NuthKaab(vertical_shift=True) - coreg_method2 = NuthKaab(vertical_shift=False) + coreg_method1 = coreg.NuthKaab(vertical_shift=True) + coreg_method2 = coreg.NuthKaab(vertical_shift=False) coreg_method1.fit(ref, tba, random_state=42) coreg_method2.fit(ref, tba, random_state=42) diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index 72a64ce5..4836ef1e 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -108,13 +108,10 @@ def recursive_typeddict_items(typed_dict: Mapping[str, Any]) -> Iterable[str]: # Assert all keys exist in the mapping key to str dictionary used for info list_info_keys = list(dict_key_to_str.keys()) - # TODO: Remove ICP keys here once generic optimizer is used - # Temporary exceptions: pipeline/blockwise + gradientdescending/icp + # Temporary exceptions: pipeline/blockwise list_exceptions = [ "step_meta", "pipeline", - "rejection_scale", - "num_levels", ] # Compare the two lists @@ -883,10 +880,10 @@ class TestAffineManipulation: list_matrices = [matrix_identity, matrix_vertical, matrix_translations, matrix_rotations, matrix_all] @pytest.mark.parametrize("matrix", list_matrices) # type: ignore - def test_apply_matrix__points_opencv(self, matrix: NDArrayf) -> None: + def test_apply_matrix__points_geopandas(self, matrix: NDArrayf) -> None: """ Test that apply matrix's exact transformation for points (implemented with NumPy matrix multiplication) - is exactly the same as the one of OpenCV (optional dependency). + is exactly the same as the one of GeoPandas. """ # Create random points @@ -896,14 +893,18 @@ def test_apply_matrix__points_opencv(self, matrix: NDArrayf) -> None: epc = gpd.GeoDataFrame(data={"z": points[:, 2]}, geometry=gpd.points_from_xy(x=points[:, 0], y=points[:, 1])) trans_epc = apply_matrix(epc, matrix=matrix) - # Run the same operation with openCV - import cv2 - - trans_cv2_arr = cv2.perspectiveTransform(points[:, :].reshape(1, -1, 3), matrix)[0, :, :] - - # Transform point cloud back to array - trans_numpy = np.array([trans_epc.geometry.x.values, trans_epc.geometry.y.values, trans_epc["z"].values]).T - assert np.allclose(trans_numpy, trans_cv2_arr) + # Compare to geopandas transformation + # We first need to convert the 4x4 affine matrix into a 12-parameter affine matrix + epc_3d = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x=points[:, 0], y=points[:, 1], z=points[:, 2])) + mat_12params = np.zeros(12) + mat_12params[:9] = matrix[:3, :3].flatten() + mat_12params[9:] = matrix[:3, 3] + trans_epc_gpd = epc_3d.affine_transform(matrix=mat_12params) + + # Check both transformations are equal + assert np.allclose(trans_epc.geometry.x.values, trans_epc_gpd.geometry.x.values) + assert np.allclose(trans_epc.geometry.y.values, trans_epc_gpd.geometry.y.values) + assert np.allclose(trans_epc["z"].values, trans_epc_gpd.geometry.z.values) @pytest.mark.parametrize("regrid_method", [None, "iterative", "griddata"]) # type: ignore @pytest.mark.parametrize("matrix", list_matrices) # type: ignore diff --git a/tests/test_coreg/test_biascorr.py b/tests/test_coreg/test_biascorr.py index e2660235..201e272d 100644 --- a/tests/test_coreg/test_biascorr.py +++ b/tests/test_coreg/test_biascorr.py @@ -397,7 +397,7 @@ def test_biascorr__bin_and_fit_2d(self, fit_args, fit_func, fit_optimizer, bin_s # Run with input parameter, and using only 100 subsamples for speed # Passing p0 defines the number of parameters to solve for - bcorr.fit(**elev_fit_args, subsample=100, p0=[0, 0, 0, 0], random_state=42) + bcorr.fit(**elev_fit_args, subsample=1000, p0=[0, 0, 0, 0], random_state=42) # Check that variable names are defined during fit assert bcorr.meta["inputs"]["fitorbin"]["bias_var_names"] == ["elevation", "slope"] diff --git a/tests/test_coreg/test_blockwise.py b/tests/test_coreg/test_blockwise.py index f4f85a22..d533c44f 100644 --- a/tests/test_coreg/test_blockwise.py +++ b/tests/test_coreg/test_blockwise.py @@ -8,9 +8,10 @@ import rasterio as rio from geoutils import Raster, Vector from geoutils.raster import RasterType +from geoutils.raster.georeferencing import _coords import xdem -from xdem import coreg, examples, misc, spatialstats +from xdem import coreg, examples, spatialstats from xdem.coreg import BlockwiseCoreg from xdem.coreg.base import Coreg @@ -237,10 +238,13 @@ def test_warp_dem() -> None: test_shift = 6 # This shift will be validated below dest_coords[4, 2] += test_shift - # Generate a semi-random DEM + # Create synthetic DEM data transform = rio.transform.from_origin(0, 500, 1, 1) shape = (500, 550) - dem = misc.generate_random_field(shape, 100) * 200 + misc.generate_random_field(shape, 10) * 50 + xx, yy = _coords(transform=transform, shape=shape, area_or_point=None) + # Make it a 2D polynomial gaussian-filtered + dem = xdem.fit.polynomial_2d((xx, yy), 1, 1, 1, 1) # type: ignore + dem = xdem.filters.gaussian_filter_scipy(dem, sigma=10) # Warp the DEM using the source-destination coordinates. transformed_dem = coreg.blockwise.warp_dem( diff --git a/tests/test_ddem.py b/tests/test_ddem.py index e41b078f..8f58965d 100644 --- a/tests/test_ddem.py +++ b/tests/test_ddem.py @@ -79,4 +79,4 @@ def test_local_hypso(self) -> None: assert ddem.filled_data is None ddem.interpolate(method="local_hypsometric", reference_elevation=self.dem_2009.data, mask=scott_1990) - assert np.abs(np.mean(self.ddem.data - ddem.filled_data)) < 1 + assert np.abs(np.nanmean(self.ddem.data - ddem.filled_data)) < 1 diff --git a/tests/test_filters.py b/tests/test_filters.py index bc034d2b..f4ebff04 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -27,15 +27,6 @@ def test_gauss(self) -> None: assert np.max(dem_array) > np.max(dem_sm) assert dem_array.shape == dem_sm.shape - # Test applying OpenCV's Gaussian filter - dem_sm2 = xdem.filters.gaussian_filter_cv(dem_array, sigma=5) - assert np.min(dem_array) < np.min(dem_sm2) - assert np.max(dem_array) > np.max(dem_sm2) - assert dem_array.shape == dem_sm2.shape - - # Assert that both implementations yield similar results - assert np.nanmax(np.abs(dem_sm - dem_sm2)) < 1e-3 - # Test that it works with NaNs too nan_count = 1000 rng = np.random.default_rng(42) @@ -48,22 +39,14 @@ def test_gauss(self) -> None: assert np.nanmin(dem_with_nans) < np.min(dem_sm) assert np.nanmax(dem_with_nans) > np.max(dem_sm) - dem_sm = xdem.filters.gaussian_filter_cv(dem_with_nans, sigma=10) - assert np.nanmin(dem_with_nans) < np.min(dem_sm) - assert np.nanmax(dem_with_nans) > np.max(dem_sm) - # Test that it works with 3D arrays array_3d = np.vstack((dem_array[np.newaxis, :], dem_array[np.newaxis, :])) dem_sm = xdem.filters.gaussian_filter_scipy(array_3d, sigma=5) assert array_3d.shape == dem_sm.shape - # 3D case not implemented with OpenCV - pytest.raises(NotImplementedError, xdem.filters.gaussian_filter_cv, array_3d, sigma=5) - # Tests that it fails with 1D arrays with appropriate error data = dem_array[:, 0] pytest.raises(ValueError, xdem.filters.gaussian_filter_scipy, data, sigma=5) - pytest.raises(ValueError, xdem.filters.gaussian_filter_cv, data, sigma=5) def test_dist_filter(self) -> None: """Test that distance_filter works""" diff --git a/tests/test_misc.py b/tests/test_misc.py index 4b974cd0..754f6427 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -120,29 +120,29 @@ def test_diff_environment_yml(self, capsys) -> None: # type: ignore # Test with synthetic environment env = {"dependencies": ["python==3.9", "numpy", "pandas"]} - devenv = {"dependencies": ["python==3.9", "numpy", "pandas", "opencv"]} + devenv = {"dependencies": ["python==3.9", "numpy", "pandas", "otherdep"]} # This should print the difference between the two xdem.misc.diff_environment_yml(env, devenv, input_dict=True, print_dep="conda") # Capture the stdout and check it is indeed the right diff captured = capsys.readouterr().out - assert captured == "opencv\n" + assert captured == "otherdep\n" # This should print the difference including pip xdem.misc.diff_environment_yml(env, devenv, input_dict=True, print_dep="both") captured = capsys.readouterr().out - assert captured == "opencv\nNone\n" + assert captured == "otherdep\nNone\n" env2 = {"dependencies": ["python==3.9", "numpy", "pandas"]} - devenv2 = {"dependencies": ["python==3.9", "numpy", "pandas", "opencv", {"pip": ["geoutils", "-e ./"]}]} + devenv2 = {"dependencies": ["python==3.9", "numpy", "pandas", "otherdep", {"pip": ["geoutils", "-e ./"]}]} # The diff function should not account for -e ./ that is the local install for developers xdem.misc.diff_environment_yml(env2, devenv2, input_dict=True, print_dep="both") captured = capsys.readouterr().out - assert captured == "opencv\ngeoutils\n" + assert captured == "otherdep\ngeoutils\n" # This should print only pip xdem.misc.diff_environment_yml(env2, devenv2, input_dict=True, print_dep="pip") @@ -157,12 +157,12 @@ def test_diff_environment_yml(self, capsys) -> None: # type: ignore # When the dependencies are not defined in dev-env but in env, it should raise an error # For normal dependencies env3 = {"dependencies": ["python==3.9", "numpy", "pandas", "lol"]} - devenv3 = {"dependencies": ["python==3.9", "numpy", "pandas", "opencv", {"pip": ["geoutils"]}]} + devenv3 = {"dependencies": ["python==3.9", "numpy", "pandas", "otherdep", {"pip": ["geoutils"]}]} with pytest.raises(ValueError, match="The following dependencies are listed in env but not dev-env: lol"): xdem.misc.diff_environment_yml(env3, devenv3, input_dict=True, print_dep="pip") # For pip dependencies env4 = {"dependencies": ["python==3.9", "numpy", "pandas", {"pip": ["lol"]}]} - devenv4 = {"dependencies": ["python==3.9", "numpy", "pandas", "opencv", {"pip": ["geoutils"]}]} + devenv4 = {"dependencies": ["python==3.9", "numpy", "pandas", "otherdep", {"pip": ["geoutils"]}]} with pytest.raises(ValueError, match="The following pip dependencies are listed in env but not dev-env: lol"): xdem.misc.diff_environment_yml(env4, devenv4, input_dict=True, print_dep="pip") diff --git a/xdem/coreg/__init__.py b/xdem/coreg/__init__.py index 9fa352e3..3ce481d5 100644 --- a/xdem/coreg/__init__.py +++ b/xdem/coreg/__init__.py @@ -21,13 +21,22 @@ """ from xdem.coreg.affine import ( # noqa + CPD, ICP, + LZD, AffineCoreg, DhMinimize, NuthKaab, VerticalShift, ) -from xdem.coreg.base import Coreg, CoregPipeline, apply_matrix, invert_matrix # noqa +from xdem.coreg.base import ( # noqa + Coreg, + CoregPipeline, + apply_matrix, + invert_matrix, + matrix_from_translations_rotations, + translations_rotations_from_matrix, +) from xdem.coreg.biascorr import BiasCorr, Deramp, DirectionalBias, TerrainBias # noqa from xdem.coreg.blockwise import BlockwiseCoreg # noqa from xdem.coreg.workflows import dem_coregistration # noqa diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index 4b50a5ec..d0b93c54 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -24,20 +24,15 @@ import warnings from typing import Any, Callable, Iterable, Literal, TypeVar -import xdem.coreg.base - -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False +import affine import geopandas as gpd import numpy as np +import pandas as pd import rasterio as rio import scipy.optimize +import scipy.spatial from geoutils.interface.interpolate import _interp_points -from geoutils.raster.georeferencing import _bounds, _coords, _res +from geoutils.raster.georeferencing import _coords, _res from tqdm import trange from xdem._typing import NDArrayb, NDArrayf @@ -47,10 +42,14 @@ InFitOrBinDict, InRandomDict, OutAffineDict, + _apply_matrix_pts_mat, _bin_or_and_fit_nd, _get_subsample_mask_pts_rst, _preprocess_pts_rst_subsample, _reproject_horizontal_shift_samecrs, + invert_matrix, + matrix_from_translations_rotations, + translations_rotations_from_matrix, ) from xdem.spatialstats import nmad @@ -122,7 +121,7 @@ def _iterate_method( """ Function to iterate a method (e.g. ICP, Nuth and Kääb) until it reaches a tolerance or maximum number of iterations. - :param method: Method that needs to be iterated to derive a transformation. Take argument "inputs" as its input, + :param method: Method that needs to be iterated to derive a transformation. Takes argument "inputs" as its input, and outputs three terms: a "statistic" to compare to tolerance, "updated inputs" with this transformation, and the parameters of the transformation. :param iterating_input: Iterating input to method, should be first argument. @@ -147,7 +146,7 @@ def _iterate_method( # Print final results # TODO: Allow to pass a string to _iterate_method on how to print/describe exactly the iterating input - if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: + if logging.getLogger().getEffectiveLevel() <= logging.INFO: pbar.write(f" Iteration #{i + 1:d} - Offset: {new_inputs}; Magnitude: {new_statistic}") if i > 1 and new_statistic < tolerance: @@ -282,6 +281,7 @@ def _preprocess_pts_rst_subsample_interpolator( inlier_mask=inlier_mask, transform=transform, area_or_point=area_or_point, + z_name=z_name, aux_vars=aux_vars, ) @@ -303,6 +303,41 @@ def _preprocess_pts_rst_subsample_interpolator( return sub_dh_interpolator, sub_bias_vars, subsample_final +def _standardize_epc( + ref_epc: NDArrayf, tba_epc: NDArrayf, scale_std: bool = True +) -> tuple[NDArrayf, NDArrayf, tuple[float, float, float], float]: + """ + Standardize elevation point clouds by getting centroid and standardization factor using median statistics. + + :param ref_epc: Reference point cloud. + :param tba_epc: To-be-aligned point cloud. + :param scale_std: Whether to scale all axes by a factor. + + :return: Standardized point clouds, Centroid of standardization, Scale factor of standardization. + """ + + # Get centroid + centroid = np.median(ref_epc, axis=1) + + # Subtract centroid from point clouds + ref_epc = ref_epc - centroid[:, None] + tba_epc = tba_epc - centroid[:, None] + + centroid = (centroid[0], centroid[1], centroid[2]) + + if scale_std: + # Get mean standardization factor for all axes + std_fac = np.mean([nmad(ref_epc[0, :]), nmad(ref_epc[1, :]), nmad(ref_epc[2, :])]) + + # Standardize point clouds + ref_epc = ref_epc / std_fac + tba_epc = tba_epc / std_fac + else: + std_fac = 1 + + return ref_epc, tba_epc, centroid, std_fac + + ################################ # Affine coregistrations methods # ############################## @@ -340,6 +375,8 @@ def _nuth_kaab_bin_fit( Optimize the Nuth and Kääb (2011) function based on observed values of elevation differences, slope tangent and aspect at the same locations, using either fitting or binning + fitting. + Called at each iteration step. + :param dh: 1D array of elevation differences (in georeferenced unit, typically meters). :param slope_tan: 1D array of slope tangent (unitless). :param aspect: 1D array of aspect (units = radians). @@ -456,7 +493,7 @@ def _nuth_kaab_iteration_step( params_fit_bin: InFitOrBinDict, ) -> tuple[tuple[float, float, float], float]: """ - Iteration step of Nuth and Kääb (2011), passed to the iterate_method function. + Iteration step of Nuth and Kääb (2011). Returns newly incremented coordinate offsets, and new statistic to compare to tolerance to reach. @@ -466,6 +503,8 @@ def _nuth_kaab_iteration_step( :param slope_tan: Array of slope tangent. :param aspect: Array of aspect. :param res: Resolution of DEM. + + :return X/Y/Z offsets, Horizontal tolerance. """ # Calculate the elevation difference with offsets @@ -525,6 +564,10 @@ def nuth_kaab( """ Nuth and Kääb (2011) iterative coregistration. + This function subsamples input data, then runs Nuth and Kääb iteration steps to optimize its fit function until + convergence or a maximum of iterations is reached. + + :return: Final estimated offset: east, north, vertical (in georeferenced units). """ logging.info("Running Nuth and Kääb (2011) coregistration") @@ -558,7 +601,6 @@ def nuth_kaab( z_name=z_name, ) - logging.info("Iteratively estimating horizontal shift:") # Initialise east, north and vertical offset variables (these will be incremented up and down) initial_offset = (0.0, 0.0, 0.0) # Resolution @@ -713,7 +755,7 @@ def vertical_shift( logging.info("Running vertical shift coregistration") # Pre-process point-raster inputs to the same subsampled points - sub_ref, sub_tba, _ = _preprocess_pts_rst_subsample( + sub_ref, sub_tba, _, _ = _preprocess_pts_rst_subsample( params_random=params_random, ref_elev=ref_elev, tba_elev=tba_elev, @@ -740,282 +782,1289 @@ def vertical_shift( return vshift, subsample_final -################################## -# Affine coregistration subclasses -################################## +############################ +# 4/ Iterative closest point +############################ -AffineCoregType = TypeVar("AffineCoregType", bound="AffineCoreg") +def _icp_fit_func( + inputs: tuple[NDArrayf, NDArrayf, NDArrayf | None], + t1: float, + t2: float, + t3: float, + alpha1: float, + alpha2: float, + alpha3: float, + method: Literal["point-to-point", "point-to-plane"], +) -> NDArrayf: + """ + Fit function of ICP, a rigid transformation with 6 parameters (3 translations and 3 rotations) between closest + points (that are fixed for the optimization, and update at each iterative step). + + :param inputs: Constant input for the fit function: three arrays of size 3xN, for the reference point cloud, the + to-be-aligned point cloud and the plane normals. + :param t1: Translation in X. + :param t2: Translation in Y. + :param t3: Translation in Z. + :param alpha1: Rotation around X. + :param alpha2: Rotation around Y. + :param alpha3: Rotation around Z. + :param method: Method of iterative closest point registration, either "point-to-point" of Besl and McKay (1992) + that minimizes 3D distances, or "point-to-plane" of Chen and Medioni (1992) that minimizes 3D distances + projected on normals. + + :return Array of distances between closest points. + """ + + # Get inputs + ref, tba, norm = inputs -class AffineCoreg(Coreg): + # Build an affine matrix for 3D translations and rotations + matrix = matrix_from_translations_rotations(t1, t2, t3, alpha1, alpha2, alpha3, use_degrees=False) + + # Apply affine transformation + trans_tba = _apply_matrix_pts_mat(mat=tba, matrix=matrix) + + # Define residuals depending on type of ICP method + # Point-to-point is simply the difference, from Besl and McKay (1992), https://doi.org/10.1117/12.57955 + if method == "point-to-point": + diffs = (trans_tba - ref) ** 2 + # Point-to-plane used the normals, from Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C + # A priori, this method is faster based on Rusinkiewicz and Levoy (2001), https://doi.org/10.1109/IM.2001.924423 + elif method == "point-to-plane": + assert norm is not None + diffs = (trans_tba - ref) * norm + else: + raise ValueError("ICP method must be 'point-to-point' or 'point-to-plane'.") + + # Distance residuals summed for all 3 dimensions + res = np.sum(diffs, axis=0) + + # For point-to-point, take the squareroot of the sum + if method == "point-to-point": + res = np.sqrt(res) + + return res + + +def _icp_fit_approx_lsq( + ref: NDArrayf, + tba: NDArrayf, + norms: NDArrayf | None, + weights: NDArrayf | None = None, + method: Literal["point-to-point", "point-to-plane"] = "point-to-point", +) -> NDArrayf: """ - Generic affine coregistration class. + Linear approximation of the rigid transformation least-square optimization. - Builds additional common affine methods on top of the generic Coreg class. - Made to be subclassed. + See Low (2004), https://www.comp.nus.edu.sg/~lowkl/publications/lowk_point-to-plane_icp_techrep.pdf for the + "point-to-plane" approximation. + + :param ref: Reference point cloud as Nx3 array. + :param tba: To-be-aligned point cloud as Nx3 array. + :param norms: Plane normals as Nx3 array. + :param weights: Weights as Nx3 array. + :param method: Method of iterative closest point registration, either "point-to-point" of Besl and McKay (1992) + that minimizes 3D distances, or "point-to-plane" of Chen and Medioni (1992) that minimizes 3D distances + projected on normals. + + :return Affine transform matrix. """ - _fit_called: bool = False # Flag to check if the .fit() method has been called. - _is_affine: bool | None = None - _is_translation: bool | None = None + # Linear approximation of ICP least-squares - def __init__( - self, - subsample: float | int = 1.0, - matrix: NDArrayf | None = None, - meta: dict[str, Any] | None = None, - ) -> None: - """Instantiate a generic AffineCoreg method.""" + # Point-to-plane + if method == "point-to-plane": - if meta is None: - meta = {} - # Define subsample size - meta.update({"subsample": subsample}) - super().__init__(meta=meta) + assert norms is not None - if matrix is not None: - with warnings.catch_warnings(): - # This error is fixed in the upcoming 1.8 - warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") - valid_matrix = pytransform3d.transformations.check_transform(matrix) - self._meta["outputs"]["affine"] = {"matrix": valid_matrix} + # Define A and B as in Low (2004) + B = np.expand_dims(np.sum(ref * norms, axis=1) - np.sum(tba * norms, axis=1), axis=1) + A = np.hstack((np.cross(tba, norms), norms)) - self._is_affine = True + # Choose if using weights or not + if weights is not None: + x = np.linalg.inv(A.T @ weights @ A) @ A.T @ weights @ B + else: + x = np.linalg.inv(A.T @ A) @ A.T @ B + x = x.squeeze() - def to_matrix(self) -> NDArrayf: - """Convert the transform to a 4x4 transformation matrix.""" - return self._to_matrix_func() + # Convert back to affine matrix + matrix = matrix_from_translations_rotations( + alpha1=x[0], alpha2=x[1], alpha3=x[2], t1=x[3], t2=x[4], t3=x[5], use_degrees=False + ) - def to_translations(self) -> tuple[float, float, float]: - """ - Extract X/Y/Z translations from the affine transformation matrix. + else: + raise ValueError("Fit optimizer 'lst_approx' of ICP is only available for point-to-plane method.") - :return: Easting, northing and vertical translations (in georeferenced unit). - """ + return matrix - matrix = self.to_matrix() - shift_x = matrix[0, 3] - shift_y = matrix[1, 3] - shift_z = matrix[2, 3] - return shift_x, shift_y, shift_z +def _icp_fit( + ref: NDArrayf, + tba: NDArrayf, + norms: NDArrayf | None, + method: Literal["point-to-point", "point-to-plane"], + params_fit_or_bin: InFitOrBinDict, + only_translation: bool, + **kwargs: Any, +) -> NDArrayf: + """ + Optimization of ICP fit function, using either any optimizer or specific linearized approximations for ICP. - def to_rotations(self) -> tuple[float, float, float]: - """ - Extract X/Y/Z euler rotations (extrinsic convention) from the affine transformation matrix. + Returns affine transform optimized for this iteration. - Warning: This function only works for a rigid transformation (rotation and translation). + :param ref: Reference point cloud as 3xN array. + :param tba: To-be-aligned point cloud as 3xN array. + :param norms: Plane normals as 3xN array. + :param method: Method of iterative closest point registration, either "point-to-point" of Besl and McKay (1992) + that minimizes 3D distances, or "point-to-plane" of Chen and Medioni (1992) that minimizes 3D distances + projected on normals. + :param params_fit_or_bin: Dictionary of parameters for fitting or binning. + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + :param **kwargs: Keyword arguments passed to fit optimizer. - :return: Extrinsinc Euler rotations along easting, northing and vertical directions (degrees). - """ + :return: Affine transform matrix. + """ - matrix = self.to_matrix() - rots = pytransform3d.rotations.euler_from_matrix(matrix, i=0, j=1, k=2, extrinsic=True, strict_check=True) - rots = np.rad2deg(np.array(rots)) - return rots[0], rots[1], rots[2] + # Group inputs into a single array + inputs = (ref, tba, norms) - def centroid(self) -> tuple[float, float, float] | None: - """Get the centroid of the coregistration, if defined.""" - meta_centroid = self._meta["outputs"]["affine"].get("centroid") + # For this type of method, the procedure can only be fit + # if params_fit_or_bin["fit_or_bin"] not in ["fit"]: + # raise ValueError("ICP method only supports 'fit'.") + # + # params_fit_or_bin["fit_func"] = _icp_fit_func - if meta_centroid is None: - return None + # If we use the linear approximation + if isinstance(params_fit_or_bin["fit_minimizer"], str) and params_fit_or_bin["fit_minimizer"] == "lsq_approx": - # Unpack the centroid in case it is in an unexpected format (an array, list or something else). - return meta_centroid[0], meta_centroid[1], meta_centroid[2] + assert norms is not None - def _preprocess_rst_pts_subsample_interpolator( - self, - ref_elev: NDArrayf | gpd.GeoDataFrame, - tba_elev: NDArrayf | gpd.GeoDataFrame, - inlier_mask: NDArrayb, - aux_vars: dict[str, NDArrayf] | None = None, - weights: NDArrayf | None = None, - transform: rio.transform.Affine | None = None, - crs: rio.crs.CRS | None = None, - area_or_point: Literal["Area", "Point"] | None = None, - z_name: str = "z", - ) -> tuple[Callable[[float, float], NDArrayf], None | dict[str, NDArrayf]]: - """ - Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points - (and interpolated in the case of point-raster input). + matrix = _icp_fit_approx_lsq(ref.T, tba.T, norms.T, method=method) - Return 1D arrays of reference elevation, to-be-aligned elevation and dictionary of 1D arrays of auxiliary - variables at subsampled points. - """ + # Or we solve using any optimizer and loss function + else: + # Define loss function + loss_func = params_fit_or_bin["fit_loss_func"] + + # With rotation + if not only_translation: + + def fit_func(offsets: tuple[float, float, float, float, float, float]) -> NDArrayf: + return _icp_fit_func( + inputs=inputs, + t1=offsets[0], + t2=offsets[1], + t3=offsets[2], + alpha1=offsets[3], + alpha2=offsets[4], + alpha3=offsets[5], + method=method, + ) - # Get random parameters - params_random = self._meta["inputs"]["random"] + # Initial offset near zero + init_offsets = np.zeros(6) - # Get subsample mask (a 2D array for raster-raster, a 1D array of length the point data for point-raster) - sub_mask = _get_subsample_mask_pts_rst( - params_random=params_random, - ref_elev=ref_elev, - tba_elev=tba_elev, - inlier_mask=inlier_mask, - transform=transform, - area_or_point=area_or_point, - aux_vars=aux_vars, - ) + # Without rotation + else: - # Return interpolator of elevation differences and subsampled auxiliary variables - sub_dh_interpolator, sub_bias_vars = _subsample_on_mask_interpolator( - ref_elev=ref_elev, - tba_elev=tba_elev, - aux_vars=aux_vars, - sub_mask=sub_mask, - transform=transform, - area_or_point=area_or_point, - z_name=z_name, - ) + def fit_func(offsets: tuple[float, float, float, float, float, float]) -> NDArrayf: + return _icp_fit_func( + inputs=inputs, + t1=offsets[0], + t2=offsets[1], + t3=offsets[2], + alpha1=0.0, + alpha2=0.0, + alpha3=0.0, + method=method, + ) - # Write final subsample to class - self._meta["outputs"]["random"] = {"subsample_final": int(np.count_nonzero(sub_mask))} + # Initial offset near zero + init_offsets = np.zeros(3) - # Return 1D arrays of subsampled points at the same location - return sub_dh_interpolator, sub_bias_vars + results = params_fit_or_bin["fit_minimizer"](fit_func, init_offsets, **kwargs, loss=loss_func) - @classmethod - def from_matrix(cls, matrix: NDArrayf) -> AffineCoreg: - """ - Instantiate a generic Coreg class from a transformation matrix. + # Mypy: having results as "None" is impossible, but not understood through overloading of _bin_or_and_fit_nd... + assert results is not None + # Build matrix out of optimized parameters + matrix = matrix_from_translations_rotations(*results.x, use_degrees=False) # type: ignore - :param matrix: A 4x4 transformation matrix. Shape must be (4,4). + return matrix - :raises ValueError: If the matrix is incorrectly formatted. - :returns: The instantiated generic Coreg class. - """ - if np.any(~np.isfinite(matrix)): - raise ValueError(f"Matrix has non-finite values:\n{matrix}") - with warnings.catch_warnings(): - # This error is fixed in the upcoming 1.8 - warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") - valid_matrix = pytransform3d.transformations.check_transform(matrix) - return cls(matrix=valid_matrix) +def _icp_iteration_step( + matrix: NDArrayf, + ref_epc: NDArrayf, + tba_epc: NDArrayf, + norms: NDArrayf, + ref_epc_nearest_tree: scipy.spatial.KDTree, + params_fit_or_bin: InFitOrBinDict, + method: Literal["point-to-point", "point-to-plane"], + picky: bool, + only_translation: bool, +) -> tuple[NDArrayf, float]: + """ + Iteration step of Iterative Closest Point coregistration. - @classmethod - def from_translations(cls, x_off: float = 0.0, y_off: float = 0.0, z_off: float = 0.0) -> AffineCoreg: - """ - Instantiate a generic Coreg class from a X/Y/Z translation. + Returns affine transform optimized for this iteration, and tolerance parameters. - :param x_off: The offset to apply in the X (west-east) direction. - :param y_off: The offset to apply in the Y (south-north) direction. - :param z_off: The offset to apply in the Z (vertical) direction. + :param matrix: Affine transform matrix. + :param ref_epc: Reference point cloud as 3xN array. + :param tba_epc: To-be-aligned point cloud as 3xN array. + :param norms: Plane normals as 3xN array. + :param ref_epc_nearest_tree: Nearest neighbour mapping to reference point cloud as scipy.KDTree. + :param params_fit_or_bin: Dictionary of parameters for fitting or binning. + :param method: Method of iterative closest point registration, either "point-to-point" of Besl and McKay (1992) + that minimizes 3D distances, or "point-to-plane" of Chen and Medioni (1992) that minimizes 3D distances + projected on normals. + :param picky: Whether to use the duplicate removal for pairs of closest points of Zinsser et al. (2003). + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. - :raises ValueError: If the given translation contained invalid values. + :return: Affine transform matrix, Tolerance. + """ - :returns: An instantiated generic Coreg class. - """ - # Initialize a diagonal matrix - matrix = np.diag(np.ones(4, dtype=float)) - # Add the three translations (which are in the last column) - matrix[0, 3] = x_off - matrix[1, 3] = y_off - matrix[2, 3] = z_off + # Apply transform matrix from previous steps + trans_tba_epc = _apply_matrix_pts_mat(tba_epc, matrix=matrix) - return cls.from_matrix(matrix) + # Create nearest neighbour tree from reference elevations, and query for transformed point cloud + dists, ind = ref_epc_nearest_tree.query(trans_tba_epc.T, k=1) - @classmethod - def from_rotations(cls, x_rot: float = 0.0, y_rot: float = 0.0, z_rot: float = 0.0) -> AffineCoreg: - """ - Instantiate a generic Coreg class from a X/Y/Z rotation. + # Picky ICP: Remove duplicates of transformed points with the same closest reference points + # Keep only the one with minimum distance + if picky: + init_len = len(ind) + df = pd.DataFrame(data={"ind": ind, "dists": dists}) + ind_tba = df.groupby(["ind"]).idxmin()["dists"].values + logging.info(f"Picky ICP duplicate removal: Reducing from {init_len} to {len(ind)} point pairs.") + # In case the number of points is different for ref and tba (can't happen with current subsampling) + else: + ind_tba = ind < ref_epc.shape[1] - :param x_rot: The rotation (degrees) to apply around the X (west-east) direction. - :param y_rot: The rotation (degrees) to apply around the Y (south-north) direction. - :param z_rot: The rotation (degrees) to apply around the Z (vertical) direction. + # Reference index is the original indexed by the other + ind_ref = ind[ind_tba] - :raises ValueError: If the given rotation contained invalid values. + # Index points to get nearest + step_ref = ref_epc[:, ind_ref] + step_trans_tba = trans_tba_epc[:, ind_tba] - :returns: An instantiated generic Coreg class. - """ + if method == "point-to-plane": + step_normals = norms[:, ind_ref] + else: + step_normals = None - # Initialize a diagonal matrix - matrix = np.diag(np.ones(4, dtype=float)) - # Convert rotations to radians - e = np.deg2rad(np.array([x_rot, y_rot, z_rot])) - # Derive 3x3 rotation matrix, and insert in 4x4 affine matrix - rot_matrix = pytransform3d.rotations.matrix_from_euler(e, i=0, j=1, k=2, extrinsic=True) - matrix[0:3, 0:3] = rot_matrix + # Fit to get new step transform + step_matrix = _icp_fit( + ref=step_ref, + tba=step_trans_tba, + norms=step_normals, + params_fit_or_bin=params_fit_or_bin, + method=method, + only_translation=only_translation, + ) - return cls.from_matrix(matrix) + # Increment transformation matrix by step + new_matrix = step_matrix @ matrix - def _to_matrix_func(self) -> NDArrayf: - # FOR DEVELOPERS: This function needs to be implemented if the `self._meta['matrix']` keyword is not None. + # Compute statistic on offset to know if it reached tolerance + translations = step_matrix[:3, 3] - # Try to see if a matrix exists. - meta_matrix = self._meta["outputs"]["affine"].get("matrix") - if meta_matrix is not None: - assert meta_matrix.shape == (4, 4), f"Invalid _meta matrix shape. Expected: (4, 4), got {meta_matrix.shape}" - return meta_matrix + tolerance_translation = np.sqrt(np.sum(translations) ** 2) + # TODO: If we allow multiple tolerances in the future, here's the rotation tolerance + # rotations = step_matrix[:3, :3] + # tolerance_rotation = np.rad2deg(np.arccos(np.clip((np.trace(rotations) - 1) / 2, -1, 1))) - raise NotImplementedError("This should be implemented by subclassing") + return new_matrix, tolerance_translation -class VerticalShift(AffineCoreg): +def _icp_norms(dem: NDArrayf, transform: affine.Affine) -> tuple[NDArrayf, NDArrayf, NDArrayf]: """ - Vertical translation alignment. + Derive normals from the DEM for "point-to-plane" method. - Estimates the mean vertical offset between two elevation datasets based on a reductor function (median, mean, or - any custom reductor function). + :param dem: Array of DEM. + :param transform: Transform of DEM. - The estimated vertical shift is stored in the `self.meta["outputs"]["affine"]` key "shift_z" (in unit of the - elevation dataset inputs, typically meters). + :return: Arrays of 3D normals: east, north and upward. """ - def __init__( - self, vshift_reduc_func: Callable[[NDArrayf], np.floating[Any]] = np.median, subsample: float | int = 1.0 - ) -> None: # pylint: - # disable=super-init-not-called - """ - Instantiate a vertical shift alignment object. - - :param vshift_reduc_func: Reductor function to estimate the central tendency of the vertical shift. - Defaults to the median. - :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. - """ - self._meta: CoregDict = {} # All __init__ functions should instantiate an empty dict. + # Get DEM resolution + resolution = _res(transform) - super().__init__(meta={"vshift_reduc_func": vshift_reduc_func}, subsample=subsample) + # Generate the X, Y and Z normals + gradient_x, gradient_y = np.gradient(dem) + normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 + normal_north = np.sin(np.arctan(gradient_x / resolution[0])) + normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) - def _fit_rst_rst( - self, - ref_elev: NDArrayf, - tba_elev: NDArrayf, - inlier_mask: NDArrayb, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - area_or_point: Literal["Area", "Point"] | None, - z_name: str, - weights: NDArrayf | None = None, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any, - ) -> None: - """Estimate the vertical shift using the vshift_func.""" + return normal_east, normal_north, normal_up - # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts - self._fit_rst_pts( - ref_elev=ref_elev, - tba_elev=tba_elev, - inlier_mask=inlier_mask, - transform=transform, - crs=crs, - area_or_point=area_or_point, - z_name=z_name, - weights=weights, - **kwargs, - ) - def _fit_rst_pts( - self, - ref_elev: NDArrayf | gpd.GeoDataFrame, - tba_elev: NDArrayf | gpd.GeoDataFrame, - inlier_mask: NDArrayb, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - area_or_point: Literal["Area", "Point"] | None, - z_name: str, - weights: NDArrayf | None = None, +def icp( + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + max_iterations: int, + tolerance: float, + params_random: InRandomDict, + params_fit_or_bin: InFitOrBinDict, + method: Literal["point-to-point", "point-to-plane"] = "point-to-plane", + picky: bool = False, + only_translation: bool = False, + standardize: bool = True, +) -> tuple[NDArrayf, tuple[float, float, float], int]: + """ + Main function for Iterative Closest Point coregistration. + + This function subsamples input data, then runs ICP iteration steps to optimize its fit function until + convergence or a maximum of iterations is reached. + + Based on Besl and McKay (1992), https://doi.org/10.1117/12.57955 for "point-to-point" and on + Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C for "point-to-plane". + + The function assumes we have a two DEMs, or DEM and an elevation point cloud, in the same CRS. + The normals for "point-to-plane" are computed on the DEM for speed. + + :return: Affine transform matrix, Centroid, Subsample size. + """ + + # Derive normals if method is point-to-plane, otherwise not + if method == "point-to-plane": + # We use the DEM to derive the normals + if isinstance(ref_elev, np.ndarray): + dem = ref_elev + else: + dem = tba_elev + nx, ny, nz = _icp_norms(dem, transform) + aux_vars = {"nx": nx, "ny": ny, "nz": nz} + else: + aux_vars = None + + # Pre-process point-raster inputs to the same subsampled points + sub_ref, sub_tba, sub_aux_vars, sub_coords = _preprocess_pts_rst_subsample( + params_random=params_random, + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + aux_vars=aux_vars, + return_coords=True, + ) + + # Convert point clouds to Nx3 arrays for efficient calculations below + ref_epc = np.vstack((sub_coords[0], sub_coords[1], sub_ref)) + tba_epc = np.vstack((sub_coords[0], sub_coords[1], sub_tba)) + if sub_aux_vars is not None: + norms = np.vstack((sub_aux_vars["nx"], sub_aux_vars["ny"], sub_aux_vars["nz"])) + else: + norms = None + + # Remove centroid and standardize to facilitate numerical convergence + ref_epc, tba_epc, centroid, std_fac = _standardize_epc(ref_epc, tba_epc, scale_std=standardize) + tolerance /= std_fac + + # Define search tree outside of loop for performance + ref_epc_nearest_tree = scipy.spatial.KDTree(ref_epc.T) + + # Iterate through method until tolerance or max number of iterations is reached + init_matrix = np.eye(4) # Initial matrix is the identity transform + constant_inputs = ( + ref_epc, + tba_epc, + norms, + ref_epc_nearest_tree, + params_fit_or_bin, + method, + picky, + only_translation, + ) + final_matrix = _iterate_method( + method=_icp_iteration_step, + iterating_input=init_matrix, + constant_inputs=constant_inputs, + tolerance=tolerance, + max_iterations=max_iterations, + ) + # De-standardize + final_matrix[:3, 3] *= std_fac + + # Get subsample size + subsample_final = len(sub_ref) + + return final_matrix, centroid, subsample_final + + +######################### +# 5/ Coherent Point Drift +######################### + + +def _cpd_fit( + ref_epc: NDArrayf, + tba_epc: NDArrayf, + trans_tba_epc: NDArrayf, + weight_cpd: float, + sigma2: float, + sigma2_min: float, + scale: bool = False, + only_translation: bool = False, +) -> tuple[NDArrayf, float, float]: + """ + Fit step of Coherent Point Drift by expectation-minimization, with variance updating. + + See Fig. 2 of Myronenko and Song (2010), https://arxiv.org/pdf/0905.2635.pdf for equations below. + + Inspired from pycpd implementation: https://github.com/siavashk/pycpd/blob/master/pycpd/rigid_registration.py. + """ + + X, Y, TY = (ref_epc, tba_epc, trans_tba_epc) + + # Get shape of inputs + N, D = X.shape + M, _ = Y.shape + + # 0/ Initialize variance if not defined + diff2 = (X[None, :, :] - TY[:, None, :]) ** 2 + if sigma2 is None: + sigma2 = np.sum(diff2) / (D * N * M) + + # 1/ Expectation step + + # Sum only over D axis for numerator + P = np.sum(diff2, axis=2) + P = np.exp(-P / (2 * sigma2)) + + # Re-sum over M axis for denominator + Pden = np.sum(P, axis=0, keepdims=True) + c = (2 * np.pi * sigma2) ** (D / 2) * weight_cpd / (1.0 - weight_cpd) * M / N + Pden = np.clip(Pden, np.finfo(X.dtype).eps, None) + c + + P = np.divide(P, Pden) + + # Extract P subterms useful for next steps + Pt1 = np.sum(P, axis=0) + P1 = np.sum(P, axis=1) + Np = np.sum(P1) + PX = np.matmul(P, X) + + # 2/ Minimization step + + # Get centroid of each point cloud + muX = np.divide(np.sum(PX, axis=0), Np) + muY = np.divide(np.sum(np.dot(np.transpose(P), Y), axis=0), Np) + + # Subtract centroid from each point cloud + X_hat = X - np.tile(muX, (N, 1)) + Y_hat = Y - np.tile(muY, (M, 1)) + YPY = np.dot(np.transpose(P1), np.sum(np.multiply(Y_hat, Y_hat), axis=1)) + + # Derive A as in Fig. 2 + A = np.dot(np.transpose(X_hat), np.transpose(P)) + A = np.dot(A, Y_hat) + + # Singular value decomposition as per lemma 1 + if not only_translation: + try: + U, _, V = np.linalg.svd(A, full_matrices=True) + except np.linalg.LinAlgError: + raise ValueError("CPD coregistration numerics during np.linalg.svd(), try setting standardize=True.") + C = np.ones((D,)) + C[D - 1] = np.linalg.det(np.dot(U, V)) + + # Calculate the rotation matrix using Eq. 9 + R = np.transpose(np.dot(np.dot(U, np.diag(C)), V)) + else: + R = np.eye(3) + + # Update scale and translation using Fig. 2 + if scale is True: + s = np.trace(np.dot(np.transpose(A), np.transpose(R))) / YPY + else: + s = 1 + t = np.transpose(muX) - s * np.dot(np.transpose(R), np.transpose(muY)) + + # Store in affine matrix + matrix = np.eye(4) + matrix[:3, :3] = R + matrix[:3, 3] = -t # Translation is inverted here + + # 3/ Update variance and objective function + + # Update objective function using Eq. 7 + trAR = np.trace(np.dot(A, R)) + xPx = np.dot(np.transpose(Pt1), np.sum(np.multiply(X_hat, X_hat), axis=1)) + q = (xPx - 2 * s * trAR + s * s * YPY) / (2 * sigma2) + D * Np / 2 * np.log(sigma2) + + # Update variance using Fig. 2 + sigma2 = (xPx - s * trAR) / (Np * D) + + # If sigma2 gets negative, we use a minimal sigma value instead + if sigma2 <= 0: + sigma2 = sigma2_min + + return matrix, sigma2, q + + +def _cpd_iteration_step( + iterating_input: tuple[NDArrayf, float, float], + ref_epc: NDArrayf, + tba_epc: NDArrayf, + weight_cpd: float, + sigma2_min: float, + only_translation: bool, +) -> tuple[tuple[NDArrayf, float, float], float]: + """ + Iteration step for Coherent Point Drift algorithm. + + Returns the updated iterating input (affine matrix, variance and objective function). + """ + + matrix, sigma2, q = iterating_input + + # Apply transform matrix from previous step + trans_tba_epc = _apply_matrix_pts_mat(tba_epc, matrix=matrix, invert=True) + + # Fit to get new step transform + # (Note: the CPD algorithm re-computes the full transform from the original target point cloud, + # so there is no need to combine a step transform within the iteration as in ICP/LZD) + new_matrix, new_sigma2, new_q = _cpd_fit( + ref_epc=ref_epc.T, + tba_epc=tba_epc.T, + trans_tba_epc=trans_tba_epc.T, + sigma2=sigma2, + weight_cpd=weight_cpd, + sigma2_min=sigma2_min, + only_translation=only_translation, + ) + + # Compute statistic on offset to know if it reached tolerance + tolerance_q = np.abs(q - new_q) + + # TODO: If we allow multiple tolerances in the future, here are the translation and rotation tolerances + # translations = new_matrix[:3, 3] + # rotations = new_matrix[:3, :3] + # tolerance_translation = np.sqrt(np.sum(translations) ** 2) + # tolerance_rotation = np.rad2deg(np.arccos(np.clip((np.trace(rotations) - 1) / 2, -1, 1))) + + return (new_matrix, new_sigma2, new_q), tolerance_q + + +def cpd( + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weight_cpd: float, + params_random: InRandomDict, + max_iterations: int, + tolerance: float, + only_translation: bool = False, + standardize: bool = True, +) -> tuple[NDArrayf, tuple[float, float, float], int]: + """ + Main function for Coherent Point Drift coregistration. + See Myronenko and Song (2010), http://dx.doi.org/10.1109/TPAMI.2010.46. + + This function subsamples input data, then runs CPD iteration steps to optimize its expectation-minimization until + convergence or a maximum of iterations is reached. + + The function assumes we have a two DEMs, or DEM and an elevation point cloud, in the same CRS. + """ + + # Pre-process point-raster inputs to the same subsampled points + sub_ref, sub_tba, _, sub_coords = _preprocess_pts_rst_subsample( + params_random=params_random, + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + return_coords=True, + ) + + # Convert point clouds to Nx3 arrays for efficient calculations below + ref_epc = np.vstack((sub_coords[0], sub_coords[1], sub_ref)) + tba_epc = np.vstack((sub_coords[0], sub_coords[1], sub_tba)) + + # Remove centroid and standardize to facilitate numerical convergence + ref_epc, tba_epc, centroid, std_fac = _standardize_epc(ref_epc=ref_epc, tba_epc=tba_epc, scale_std=standardize) + tolerance /= std_fac + + # Run rigid CPD registration + # Iterate through method until tolerance or max number of iterations is reached + init_matrix = np.eye(4) # Initial matrix is the identity transform + init_q = np.inf + init_sigma2 = None + iterating_input = (init_matrix, init_sigma2, init_q) + sigma2_min = tolerance / 10 + constant_inputs = (ref_epc, tba_epc, weight_cpd, sigma2_min, only_translation) + final_matrix, _, _ = _iterate_method( + method=_cpd_iteration_step, + iterating_input=iterating_input, + constant_inputs=constant_inputs, + tolerance=tolerance, + max_iterations=max_iterations, + ) + final_matrix = invert_matrix(final_matrix) + + # De-standardize + final_matrix[:3, 3] *= std_fac + + # Get subsample size + subsample_final = len(sub_ref) + + return final_matrix, centroid, subsample_final + + +####################### +# 6/ Least Z-difference +####################### + + +def _lzd_aux_vars( + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + transform: affine.Affine, +) -> tuple[NDArrayf, NDArrayf]: + """ + Deriving gradient in X/Y expected by the Least Z-difference coregistration. + + :return: Gradient in X/Y, scaled based on the DEM resolution. + """ + + # If inputs are both point clouds, raise an error + if isinstance(ref_elev, gpd.GeoDataFrame) and isinstance(tba_elev, gpd.GeoDataFrame): + + raise TypeError( + "The Nuth and Kääb (2011) coregistration does not support two point clouds, one elevation " + "dataset in the pair must be a DEM." + ) + + # If inputs are both rasters, derive terrain attributes from ref and get 2D dh interpolator + elif isinstance(ref_elev, np.ndarray) and isinstance(tba_elev, np.ndarray): + + # Derive slope and aspect from the reference as default + gradient_y, gradient_x = np.gradient(ref_elev) + + # If inputs are one raster and one point cloud, derive terrain attribute from raster and get 1D dh interpolator + else: + + if isinstance(ref_elev, gpd.GeoDataFrame): + rst_elev = tba_elev + else: + rst_elev = ref_elev + + # Derive slope and aspect from the raster dataset + gradient_y, gradient_x = np.gradient(rst_elev) + + # Convert to unitary gradient depending on resolution + res = _res(transform) + gradient_x = gradient_x / res[0] + gradient_y = -gradient_y / res[1] # Because raster Y axis is inverted, need to add a minus + + return gradient_x, gradient_y + + +def _lzd_fit_func( + inputs: tuple[NDArrayf, NDArrayf, NDArrayf, NDArrayf, NDArrayf, NDArrayf], + t1: float, + t2: float, + t3: float, + alpha1: float, + alpha2: float, + alpha3: float, + scale: float = 0.0, +) -> NDArrayf: + """ + Fit function of Least Z-difference coregistration, Rosenholm and Torlegård (1988). + + Linearizes a rigid transformation for small rotations and utilizes dZ as a differential function of the plane + coordinates (Equation 6). + + Will solve for the 7 parameters of a scaled rigid transform. + + :param inputs: Inputs not optimized by fit function: 1D arrays of X, Y, Z, as well as elevation change and elevation + gradient along X and Y evaluated at X/Y coordinates. + :param t1: Translation in X. + :param t2: Translation in Y. + :param t3: Translation in Z. + :param alpha1: Rotation around X. + :param alpha2: Rotation around Y. + :param alpha3: Rotation around Z. + :param scale: Scaling factor. + + :return: 1D array of residuals between elevation change and approximate rigid transformation in X/Y. + """ + + # Get constant inputs + x, y, z, dh, gradx, grady = inputs + + # We compute lambda from estimated parameters (Equation 6) + lda = ( + t3 + - x * alpha2 + + y * alpha1 + + z * scale + - gradx * (t1 + x * scale - y * alpha3 + z * alpha2) + - grady * (t2 + x * alpha3 + y * scale - z * alpha1) + ) + + # Get residuals with elevation change + res = lda - dh + + return res + + +def _lzd_fit( + x: NDArrayf, + y: NDArrayf, + z: NDArrayf, + dh: NDArrayf, + gradx: NDArrayf, + grady: NDArrayf, + params_fit_or_bin: InFitOrBinDict, + only_translation: bool, + **kwargs: Any, +) -> NDArrayf: + """ + Optimization of fit function for Least Z-difference coregistration. + + :param x: X coordinate as 1D array. + :param y: Y coordinate as 1D array. + :param z: Z coordinate as 1D array. + :param dh: Elevation change with other elevation dataset at X/Y coordinates as 1D array. + :param gradx: DEM gradient along X axis at X/Y coordinates as 1D array. + :param grady: DEM gradient along Y axis at X/Y coordinates as 1D array. + :param params_fit_or_bin: Dictionary of fitting and binning parameters. + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + :param: **kwargs: Keyword arguments passed to fit optimizer. + + :return: Optimized affine matrix. + """ + + # Inputs that are not parameters to optimize + inputs = (x, y, z, dh, gradx, grady) + + # Define loss function + loss_func = params_fit_or_bin["fit_loss_func"] + + # For translation + rotation + if not only_translation: + + def fit_func(offsets: tuple[float, float, float, float, float, float]) -> NDArrayf: + return _lzd_fit_func( + inputs=inputs, + t1=offsets[0], + t2=offsets[1], + t3=offsets[2], + alpha1=offsets[3], + alpha2=offsets[4], + alpha3=offsets[5], + ) + + # Initial offset near zero + init_offsets = np.zeros(6) + + # For only translation + else: + + def fit_func(offsets: tuple[float, float, float, float, float, float]) -> NDArrayf: + return _lzd_fit_func( + inputs=inputs, + t1=offsets[0], + t2=offsets[1], + t3=offsets[2], + alpha1=0.0, + alpha2=0.0, + alpha3=0.0, + ) + + # Initial offset near zero + init_offsets = np.zeros(3) + + # Run optimizer on function + results = params_fit_or_bin["fit_minimizer"](fit_func, init_offsets, loss=loss_func, **kwargs) + + # Mypy: having results as "None" is impossible, but not understood through overloading of _bin_or_and_fit_nd... + assert results is not None + # Build matrix out of optimized parameters + matrix = matrix_from_translations_rotations(*results.x, use_degrees=False) # type: ignore + + return matrix + + +def _lzd_iteration_step( + matrix: NDArrayf, + sub_rst: Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf], + sub_pts: NDArrayf, + sub_coords: tuple[NDArrayf, NDArrayf], + centroid: tuple[float, float, float], + sub_gradx: Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf], + sub_grady: Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf], + params_fit_or_bin: InFitOrBinDict, + only_translation: bool, +) -> tuple[NDArrayf, float]: + """ + Iteration step of Least Z-difference coregistration from Rosenholm and Torlegård (1988). + + The function uses 2D array interpolators of the DEM input and its gradient, computed only once outside iteration + loops, to optimize computing time. + + Returns optimized affine matrix and tolerance for this iteration step. + + :param matrix: Affine transform matrix. + :param sub_rst: Interpolator for 2D array of DEM. + :param sub_pts: Subsampled elevation for other elevation data (DEM or point) as 1D array. + :param sub_coords: Subsampled X/Y coordinates arrays for point or second DEM input as 1D arrays. + :param centroid: Centroid removed from point or second DEM input (to facilitate numerical convergence). + :param sub_gradx: Interpolator for 2D array of DEM gradient along X axis. + :param sub_grady: Interpolator for 2D array of DEM gradient along Y axis. + :param params_fit_or_bin: Dictionary of fitting and binning parameters. + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + + :return Affine matrix, Tolerance. + """ + + # Apply transform matrix from previous steps + pts_epc = np.vstack((sub_coords[0], sub_coords[1], sub_pts)) + trans_pts_epc = _apply_matrix_pts_mat(pts_epc, matrix=matrix, centroid=centroid) + + # Evaluate dh and gradients at new X/Y coordinates + x = trans_pts_epc[0, :] + y = trans_pts_epc[1, :] + z = trans_pts_epc[2, :] + dh = sub_rst((y, x)) - z + gradx = sub_gradx((y, x)) + grady = sub_grady((y, x)) + + # Remove centroid before fit for better convergence + x -= centroid[0] + y -= centroid[1] + z -= centroid[2] + + # Remove invalid values sampled by interpolators + valids = np.logical_and.reduce((np.isfinite(dh), np.isfinite(z), np.isfinite(gradx), np.isfinite(grady))) + if np.count_nonzero(valids) == 0: + raise ValueError( + "The subsample contains no more valid values. This can happen if the affine transformation to " + "correct is larger than the data extent, or if the algorithm diverged. To ensure all possible points can " + "be used at any iteration step, use subsample=1." + ) + x = x[valids] + y = y[valids] + z = z[valids] + dh = dh[valids] + gradx = gradx[valids] + grady = grady[valids] + + # Fit to get new step transform + step_matrix = _lzd_fit( + x=x, + y=y, + z=z, + dh=dh, + gradx=gradx, + grady=grady, + params_fit_or_bin=params_fit_or_bin, + only_translation=only_translation, + ) + + # Increment transformation matrix by step + new_matrix = step_matrix @ matrix + + # Compute statistic on offset to know if it reached tolerance + translations = step_matrix[:3, 3] + + tolerance_translation = np.sqrt(np.sum(translations) ** 2) + # TODO: If we allow multiple tolerances in the future, here's the rotation tolerance + # rotations = step_matrix[:3, :3] + # tolerance_rotation = np.rad2deg(np.arccos(np.clip((np.trace(rotations) - 1) / 2, -1, 1))) + + return new_matrix, tolerance_translation + + +def lzd( + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + max_iterations: int, + tolerance: float, + params_random: InRandomDict, + params_fit_or_bin: InFitOrBinDict, + only_translation: bool, +) -> tuple[NDArrayf, tuple[float, float, float], int]: + """ + Least Z-differences coregistration. + See Rosenholm and Torlegård (1988), + https://www.asprs.org/wp-content/uploads/pers/1988journal/oct/1988_oct_1385-1389.pdf. + + This function subsamples input data, then runs LZD iteration steps to optimize its fit function until + convergence or a maximum of iterations is reached. + + The function assumes we have two DEMs, or a DEM and an elevation point cloud, in the same CRS. + """ + + logging.info("Running LZD coregistration") + + # Check that DEM CRS is projected, otherwise slope is not correctly calculated + if not crs.is_projected: + raise NotImplementedError( + f"LZD coregistration only works with a projected CRS, current CRS is {crs}. Reproject " + f"your DEMs with DEM.reproject() in a local projected CRS such as UTM, that you can find " + f"using DEM.get_metric_crs()." + ) + + # First, derive auxiliary variables of Nuth and Kääb (slope tangent, and aspect) for any point-raster input + gradx, grady = _lzd_aux_vars(ref_elev=ref_elev, tba_elev=tba_elev, transform=transform) + + # Then, perform preprocessing: subsampling and interpolation of inputs and auxiliary vars at same points + sub_ref, sub_tba, _, sub_coords = _preprocess_pts_rst_subsample( + params_random=params_random, + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + return_coords=True, + ) + # Define inputs of methods, depending on if they are point or raster data + if not isinstance(ref_elev, gpd.GeoDataFrame): + ref = "rst" + sub_pts = sub_tba + sub_rst = _reproject_horizontal_shift_samecrs(ref_elev, src_transform=transform, return_interpolator=True) + else: + ref = "pts" + sub_pts = sub_ref + sub_rst = _reproject_horizontal_shift_samecrs(tba_elev, src_transform=transform, return_interpolator=True) + # We use interpolators of gradx and grady in any case + sub_gradx = _reproject_horizontal_shift_samecrs(gradx, src_transform=transform, return_interpolator=True) + sub_grady = _reproject_horizontal_shift_samecrs(grady, src_transform=transform, return_interpolator=True) + + # Estimate centroid to use + centroid_x = float(np.nanmean(sub_coords[0])) + centroid_y = float(np.nanmean(sub_coords[1])) + centroid_z = float(np.nanmean(sub_pts)) + centroid = (centroid_x, centroid_y, centroid_z) + + logging.info("Iteratively estimating rigid transformation:") + # Iterate through method until tolerance or max number of iterations is reached + init_matrix = np.eye(4) # Initial matrix is the identity transform + constant_inputs = ( + sub_rst, + sub_pts, + sub_coords, + centroid, + sub_gradx, + sub_grady, + params_fit_or_bin, + only_translation, + ) + final_matrix = _iterate_method( + method=_lzd_iteration_step, + iterating_input=init_matrix, + constant_inputs=constant_inputs, + tolerance=tolerance, + max_iterations=max_iterations, + ) + + # Invert matrix if reference was the point data + if ref == "pts": + final_matrix = invert_matrix(final_matrix) + + subsample_final = len(sub_pts) + + return final_matrix, centroid, subsample_final + + +################################## +# Affine coregistration subclasses +################################## + +AffineCoregType = TypeVar("AffineCoregType", bound="AffineCoreg") + + +class AffineCoreg(Coreg): + """ + Generic affine coregistration class. + + Builds additional common affine methods on top of the generic Coreg class. + Made to be subclassed. + """ + + _fit_called: bool = False # Flag to check if the .fit() method has been called. + _is_affine: bool | None = None + _is_translation: bool | None = None + + def __init__( + self, + subsample: float | int = 1.0, + matrix: NDArrayf | None = None, + meta: dict[str, Any] | None = None, + ) -> None: + """Instantiate a generic AffineCoreg method.""" + + if meta is None: + meta = {} + # Define subsample size + meta.update({"subsample": subsample}) + super().__init__(meta=meta) + + if matrix is not None: + with warnings.catch_warnings(): + # This error is fixed in the upcoming 1.8 + warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") + valid_matrix = pytransform3d.transformations.check_transform(matrix) + self._meta["outputs"]["affine"] = {"matrix": valid_matrix} + + self._is_affine = True + + def to_matrix(self) -> NDArrayf: + """Convert the transform to a 4x4 transformation matrix.""" + return self._to_matrix_func() + + def to_translations(self) -> tuple[float, float, float]: + """ + Extract X/Y/Z translations from the affine transformation matrix. + + :return: Easting, northing and vertical translations (in georeferenced unit). + """ + + matrix = self.to_matrix() + shift_x, shift_y, shift_z = translations_rotations_from_matrix(matrix)[:3] + + return shift_x, shift_y, shift_z + + def to_rotations(self, return_degrees: bool = True) -> tuple[float, float, float]: + """ + Extract X/Y/Z euler rotations (extrinsic convention) from the affine transformation matrix. + + Warning: This function only works for a rigid transformation (rotation and translation). + + :param return_degrees: Whether to return degrees, otherwise radians. + + :return: Extrinsinc Euler rotations along easting, northing and vertical directions (degrees). + """ + + matrix = self.to_matrix() + alpha1, alpha2, alpha3 = translations_rotations_from_matrix(matrix, return_degrees=return_degrees)[3:] + + return alpha1, alpha2, alpha3 + + def centroid(self) -> tuple[float, float, float] | None: + """Get the centroid of the coregistration, if defined.""" + meta_centroid = self._meta["outputs"]["affine"].get("centroid") + + if meta_centroid is None: + return None + + # Unpack the centroid in case it is in an unexpected format (an array, list or something else). + return meta_centroid[0], meta_centroid[1], meta_centroid[2] + + def _preprocess_rst_pts_subsample_interpolator( + self, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + aux_vars: dict[str, NDArrayf] | None = None, + weights: NDArrayf | None = None, + transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, + area_or_point: Literal["Area", "Point"] | None = None, + z_name: str = "z", + ) -> tuple[Callable[[float, float], NDArrayf], None | dict[str, NDArrayf]]: + """ + Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points + (and interpolated in the case of point-raster input). + + Return 1D arrays of reference elevation, to-be-aligned elevation and dictionary of 1D arrays of auxiliary + variables at subsampled points. + """ + + # Get random parameters + params_random = self._meta["inputs"]["random"] + + # Get subsample mask (a 2D array for raster-raster, a 1D array of length the point data for point-raster) + sub_mask = _get_subsample_mask_pts_rst( + params_random=params_random, + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + area_or_point=area_or_point, + z_name=z_name, + aux_vars=aux_vars, + ) + + # Return interpolator of elevation differences and subsampled auxiliary variables + sub_dh_interpolator, sub_bias_vars = _subsample_on_mask_interpolator( + ref_elev=ref_elev, + tba_elev=tba_elev, + aux_vars=aux_vars, + sub_mask=sub_mask, + transform=transform, + area_or_point=area_or_point, + z_name=z_name, + ) + + # Write final subsample to class + self._meta["outputs"]["random"] = {"subsample_final": int(np.count_nonzero(sub_mask))} + + # Return 1D arrays of subsampled points at the same location + return sub_dh_interpolator, sub_bias_vars + + @classmethod + def from_matrix(cls, matrix: NDArrayf) -> AffineCoreg: + """ + Instantiate a generic Coreg class from a transformation matrix. + + :param matrix: A 4x4 transformation matrix. Shape must be (4,4). + + :raises ValueError: If the matrix is incorrectly formatted. + + :returns: The instantiated generic Coreg class. + """ + if np.any(~np.isfinite(matrix)): + raise ValueError(f"Matrix has non-finite values:\n{matrix}") + with warnings.catch_warnings(): + # This error is fixed in the upcoming 1.8 + warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") + valid_matrix = pytransform3d.transformations.check_transform(matrix) + return cls(matrix=valid_matrix) + + @classmethod + def from_translations(cls, x_off: float = 0.0, y_off: float = 0.0, z_off: float = 0.0) -> AffineCoreg: + """ + Instantiate a generic Coreg class from a X/Y/Z translation. + + :param x_off: The offset to apply in the X (west-east) direction. + :param y_off: The offset to apply in the Y (south-north) direction. + :param z_off: The offset to apply in the Z (vertical) direction. + + :raises ValueError: If the given translation contained invalid values. + + :returns: An instantiated generic Coreg class. + """ + matrix = matrix_from_translations_rotations(t1=x_off, t2=y_off, t3=z_off, alpha1=0.0, alpha2=0.0, alpha3=0.0) + + return cls.from_matrix(matrix) + + @classmethod + def from_rotations( + cls, x_rot: float = 0.0, y_rot: float = 0.0, z_rot: float = 0.0, use_degrees: bool = True + ) -> AffineCoreg: + """ + Instantiate a generic Coreg class from a X/Y/Z rotation. + + :param x_rot: The rotation to apply around the X (west-east) direction. + :param y_rot: The rotation to apply around the Y (south-north) direction. + :param z_rot: The rotation to apply around the Z (vertical) direction. + :param use_degrees: Whether to use degrees for input angles, otherwise radians. + + :raises ValueError: If the given rotation contained invalid values. + + :returns: An instantiated generic Coreg class. + """ + + matrix = matrix_from_translations_rotations( + t1=0.0, t2=0.0, t3=0.0, alpha1=x_rot, alpha2=y_rot, alpha3=z_rot, use_degrees=use_degrees + ) + + return cls.from_matrix(matrix) + + def _to_matrix_func(self) -> NDArrayf: + # FOR DEVELOPERS: This function needs to be implemented if the `self._meta['matrix']` keyword is not None. + + # Try to see if a matrix exists. + meta_matrix = self._meta["outputs"]["affine"].get("matrix") + if meta_matrix is not None: + assert meta_matrix.shape == (4, 4), f"Invalid _meta matrix shape. Expected: (4, 4), got {meta_matrix.shape}" + return meta_matrix + + raise NotImplementedError("This should be implemented by subclassing") + + +class VerticalShift(AffineCoreg): + """ + Vertical translation alignment. + + Estimates the mean vertical offset between two elevation datasets based on a reductor function (median, mean, or + any custom reductor function). + + The estimated vertical shift is stored in the `self.meta["outputs"]["affine"]` key "shift_z" (in unit of the + elevation dataset inputs, typically meters). + """ + + def __init__( + self, vshift_reduc_func: Callable[[NDArrayf], np.floating[Any]] = np.median, subsample: float | int = 1.0 + ) -> None: # pylint: + # disable=super-init-not-called + """ + Instantiate a vertical shift alignment object. + + :param vshift_reduc_func: Reductor function to estimate the central tendency of the vertical shift. + Defaults to the median. + :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. + """ + self._meta: CoregDict = {} # All __init__ functions should instantiate an empty dict. + + super().__init__(meta={"vshift_reduc_func": vshift_reduc_func}, subsample=subsample) + + def _fit_rst_rst( + self, + ref_elev: NDArrayf, + tba_elev: NDArrayf, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, + ) -> None: + """Estimate the vertical shift using the vshift_func.""" + + # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts + self._fit_rst_pts( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + weights=weights, + **kwargs, + ) + + def _fit_rst_pts( + self, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> None: @@ -1052,44 +2101,73 @@ def _to_matrix_func(self) -> NDArrayf: class ICP(AffineCoreg): """ - Iterative closest point registration, based on Besl and McKay (1992), https://doi.org/10.1117/12.57955. + Iterative closest point registration. Estimates a rigid transform (rotation + translation) between two elevation datasets. + The ICP method can be: + + - Point-to-point of Besl and McKay (1992), https://doi.org/10.1117/12.57955, where the loss function is computed + on the 3D distances of closest points (original method), + - Point-to-plane of Chen and Medioni (1992), https://doi.org/10.1016/0262-8856(92)90066-C where the loss function + is computed on the 3D distances of closest points projected on the plane normals (generally faster and more + accurate, particularly for point clouds representing contiguous surfaces). + + + Other ICP options are: + + - Linearized approximation of the point-to-plane least-square optimization of Low (2004), + https://www.cs.unc.edu/techreports/04-004.pdf (faster, only for rotations + below 30° degrees), + - Picky ICP of Zinsser et al. (2003), https://doi.org/10.1109/ICIP.2003.1246775, where closest point pairs matched + to the same point in the reference point cloud are removed, keeping only the pair with the minimum distance + (useful for datasets with different extents that won't exactly overlap). + The estimated transform is stored in the `self.meta["outputs"]["affine"]` key "matrix", with rotation centered on the coordinates in the key "centroid". The translation parameters are also stored individually in the keys "shift_x", "shift_y" and "shift_z" (in georeferenced units for horizontal shifts, and unit of the elevation dataset inputs for the vertical shift). - - Requires 'opencv'. See opencv doc for more info: - https://docs.opencv.org/master/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html """ def __init__( self, - max_iterations: int = 100, - tolerance: float = 0.05, - rejection_scale: float = 2.5, - num_levels: int = 6, + method: Literal["point-to-point", "point-to-plane"] = "point-to-plane", + picky: bool = True, + only_translation: bool = False, + fit_minimizer: Callable[..., tuple[NDArrayf, Any]] | Literal["lsq_approx"] = scipy.optimize.least_squares, + fit_loss_func: Callable[[NDArrayf], np.floating[Any]] | str = "linear", + max_iterations: int = 20, + tolerance: float = 0.01, + standardize: bool = True, subsample: float | int = 5e5, ) -> None: """ Instantiate an ICP coregistration object. - :param max_iterations: The maximum allowed iterations before stopping. - :param tolerance: The residual change threshold after which to stop the iterations. - :param rejection_scale: The threshold (std * rejection_scale) to consider points as outliers. - :param num_levels: Number of octree levels to consider. A higher number is faster but may be more inaccurate. + :param method: Method of iterative closest point registration, either "point-to-point" of Besl and McKay (1992) + that minimizes 3D distances, or "point-to-plane" of Chen and Medioni (1992) that minimizes 3D distances + projected on normals. + :param picky: Whether to use the duplicate removal for pairs of closest points of Zinsser et al. (2003). + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + :param fit_minimizer: Minimizer for the coregistration function. Use "lsq_approx" for the linearized + least-square approximation of Low (2004) (only available for "point-to-plane"). + :param fit_loss_func: Loss function for the minimization of residuals (if minimizer is not "lsq_approx"). + :param max_iterations: Maximum allowed iterations before stopping. + :param tolerance: Residual change threshold after which to stop the iterations. + :param standardize: Whether to standardize input point clouds to the unit sphere for numerical convergence + (tolerance is also standardized by the same factor). :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'") meta = { + "icp_method": method, + "icp_picky": picky, + "fit_minimizer": fit_minimizer, + "fit_loss_func": fit_loss_func, "max_iterations": max_iterations, "tolerance": tolerance, - "rejection_scale": rejection_scale, - "num_levels": num_levels, + "only_translation": only_translation, + "standardize": standardize, } super().__init__(subsample=subsample, meta=meta) @@ -1108,42 +2186,18 @@ def _fit_rst_rst( ) -> None: """Estimate the rigid transform from tba_dem to ref_dem.""" - if weights is not None: - warnings.warn("ICP was given weights, but does not support it.") - - resolution = _res(transform) - - # Generate the x and y coordinates for the reference_dem - x_coords, y_coords = _coords(transform, ref_elev.shape, area_or_point=area_or_point) - gradient_x, gradient_y = np.gradient(ref_elev) - - normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 - normal_north = np.sin(np.arctan(gradient_x / resolution[0])) - normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) - - valid_mask = np.logical_and.reduce( - (inlier_mask, np.isfinite(ref_elev), np.isfinite(normal_east), np.isfinite(normal_north)) - ) - subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask) - - ref_pts = gpd.GeoDataFrame( - geometry=gpd.points_from_xy(x=x_coords[subsample_mask], y=y_coords[subsample_mask], crs=None), - data={ - "z": ref_elev[subsample_mask], - "nx": normal_east[subsample_mask], - "ny": normal_north[subsample_mask], - "nz": normal_up[subsample_mask], - }, - ) - + # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts self._fit_rst_pts( - ref_elev=ref_pts, + ref_elev=ref_elev, tba_elev=tba_elev, inlier_mask=inlier_mask, transform=transform, crs=crs, area_or_point=area_or_point, - z_name="z", + z_name=z_name, + weights=weights, + bias_vars=bias_vars, + **kwargs, ) def _fit_rst_pts( @@ -1160,102 +2214,150 @@ def _fit_rst_pts( **kwargs: Any, ) -> None: - # Check which one is reference - if isinstance(ref_elev, gpd.GeoDataFrame): - point_elev = ref_elev - rst_elev = tba_elev - ref = "point" - else: - point_elev = tba_elev - rst_elev = ref_elev - ref = "raster" - - # Pre-process point data - point_elev = point_elev.dropna(how="any", subset=[z_name]) - - bounds = _bounds(transform=transform, shape=rst_elev.shape) - resolution = _res(transform) - - # Generate the x and y coordinates for the TBA DEM - x_coords, y_coords = _coords(transform, rst_elev.shape, area_or_point=area_or_point) - centroid = (float(np.mean([bounds.left, bounds.right])), float(np.mean([bounds.bottom, bounds.top])), 0.0) - # Subtract by the bounding coordinates to avoid float32 rounding errors. - x_coords -= centroid[0] - y_coords -= centroid[1] - - gradient_x, gradient_y = np.gradient(rst_elev) - - # This CRS is temporary and doesn't affect the result. It's just needed for Raster instantiation. - normal_east = np.sin(np.arctan(gradient_y / resolution[1])) * -1 - normal_north = np.sin(np.arctan(gradient_x / resolution[0])) - normal_up = 1 - np.linalg.norm([normal_east.data, normal_north.data], axis=0) - - valid_mask = ~np.isnan(rst_elev) & ~np.isnan(normal_east.data) & ~np.isnan(normal_north.data) - - points: dict[str, NDArrayf] = {} - points["raster"] = np.dstack( - [ - x_coords[valid_mask], - y_coords[valid_mask], - rst_elev[valid_mask], - normal_east[valid_mask], - normal_north[valid_mask], - normal_up[valid_mask], - ] - ).squeeze() - - # TODO: Should be a way to not duplicate this column and just feed it directly - point_elev["E"] = point_elev.geometry.x.values - point_elev["N"] = point_elev.geometry.y.values - - if any(col not in point_elev for col in ["nx", "ny", "nz"]): - for key, arr in [("nx", normal_east), ("ny", normal_north), ("nz", normal_up)]: - point_elev[key] = _interp_points( - arr, - transform=transform, - area_or_point=area_or_point, - points=(point_elev["E"].values, point_elev["N"].values), - ) + # Get parameters stored in class + params_random = self._meta["inputs"]["random"] + params_fit_or_bin = self._meta["inputs"]["fitorbin"] + + # Call method + matrix, centroid, subsample_final = icp( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + params_random=params_random, + params_fit_or_bin=params_fit_or_bin, + max_iterations=self._meta["inputs"]["iterative"]["max_iterations"], + tolerance=self._meta["inputs"]["iterative"]["tolerance"], + method=self._meta["inputs"]["specific"]["icp_method"], + picky=self._meta["inputs"]["specific"]["icp_picky"], + only_translation=self._meta["inputs"]["affine"]["only_translation"], + standardize=self._meta["inputs"]["affine"]["standardize"], + ) - point_elev["E"] -= centroid[0] - point_elev["N"] -= centroid[1] + # Write output to class + # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) + output_affine = OutAffineDict( + centroid=centroid, + matrix=matrix, + shift_x=matrix[0, 3], + shift_y=matrix[1, 3], + shift_z=matrix[2, 3], + ) + self._meta["outputs"]["affine"] = output_affine + self._meta["outputs"]["random"] = {"subsample_final": subsample_final} - points["point"] = point_elev[["E", "N", z_name, "nx", "ny", "nz"]].values - for key in points: - points[key] = points[key][~np.any(np.isnan(points[key]), axis=1)].astype("float32") - points[key][:, 0] -= resolution[0] / 2 - points[key][:, 1] -= resolution[1] / 2 +class CPD(AffineCoreg): + """ + Coherent Point Drift coregistration, based on Myronenko and Song (2010), https://doi.org/10.1109/TPAMI.2010.46. - # Extract parameters and pass them to method - max_it = self._meta["inputs"]["iterative"]["max_iterations"] - tol = self._meta["inputs"]["iterative"]["tolerance"] - rej = self._meta["inputs"]["specific"]["rejection_scale"] - num_lv = self._meta["inputs"]["specific"]["num_levels"] - icp = cv2.ppf_match_3d_ICP(max_it, tol, rej, num_lv) - logging.info("Running ICP...") - try: - # Use points as reference - _, residual, matrix = icp.registerModelToScene(points["raster"], points["point"]) - except cv2.error as exception: - if "(expected: 'n > 0'), where" not in str(exception): - raise exception - - raise ValueError( - "Not enough valid points in input data." - f"'reference_dem' had {points['ref'].size} valid points." - f"'dem_to_be_aligned' had {points['tba'].size} valid points." - ) + Estimate translations and rotations. + + The estimated transform is stored in the `self.meta["outputs"]["affine"]` key "matrix", with rotation centered + on the coordinates in the key "centroid". The translation parameters are also stored individually in the + keys "shift_x", "shift_y" and "shift_z" (in georeferenced units for horizontal shifts, and unit of the + elevation dataset inputs for the vertical shift). + """ + + def __init__( + self, + weight: float = 0, + only_translation: bool = False, + max_iterations: int = 100, + tolerance: float = 0.01, + standardize: bool = True, + subsample: int | float = 5e3, + ): + """ + Instantiate a CPD coregistration object. + + :param weight: Weight contribution of the uniform distribution to account for outliers, from 0 (inclusive) to + 1 (exclusive). + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + :param max_iterations: Maximum allowed iterations before stopping. + :param tolerance: Residual change threshold after which to stop the iterations. + :param standardize: Whether to standardize input point clouds to the unit sphere for numerical convergence + (tolerance is also standardized by the same factor). + :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. + """ + + meta_cpd = { + "max_iterations": max_iterations, + "tolerance": tolerance, + "cpd_weight": weight, + "only_translation": only_translation, + "standardize": standardize, + } + + super().__init__(subsample=subsample, meta=meta_cpd) # type: ignore + + def _fit_rst_rst( + self, + ref_elev: NDArrayf, + tba_elev: NDArrayf, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, + ) -> None: + """Estimate the rigid transform from tba_dem to ref_dem.""" + + # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts + self._fit_rst_pts( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + weights=weights, + bias_vars=bias_vars, + **kwargs, + ) - # If raster was reference, invert the matrix - if ref == "raster": - matrix = xdem.coreg.base.invert_matrix(matrix) + def _fit_rst_pts( + self, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, + ) -> None: - logging.info("ICP finished") + # Get parameters stored in class + params_random = self._meta["inputs"]["random"] - assert residual < 1000, f"ICP coregistration failed: residual={residual}, threshold: 1000" + # Call method + matrix, centroid, subsample_final = cpd( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + params_random=params_random, + weight_cpd=self._meta["inputs"]["specific"]["cpd_weight"], + max_iterations=self._meta["inputs"]["iterative"]["max_iterations"], + tolerance=self._meta["inputs"]["iterative"]["tolerance"], + only_translation=self._meta["inputs"]["affine"]["only_translation"], + standardize=self._meta["inputs"]["affine"]["standardize"], + ) - # Save outputs + # Write output to class # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) output_affine = OutAffineDict( centroid=centroid, @@ -1265,6 +2367,7 @@ def _fit_rst_pts( shift_z=matrix[2, 3], ) self._meta["outputs"]["affine"] = output_affine + self._meta["outputs"]["random"] = {"subsample_final": subsample_final} class NuthKaab(AffineCoreg): @@ -1292,8 +2395,8 @@ def __init__( """ Instantiate a new Nuth and Kääb (2011) coregistration object. - :param max_iterations: The maximum allowed iterations before stopping. - :param offset_threshold: The residual offset threshold after which to stop the iterations (in pixels). + :param max_iterations: Maximum allowed iterations before stopping. + :param offset_threshold: Residual offset threshold after which to stop the iterations (in pixels). :param bin_before_fit: Whether to bin data before fitting the coregistration function. For the Nuth and Kääb (2011) algorithm, this corresponds to bins of aspect to compute statistics on dh/tan(slope). :param fit_optimizer: Optimizer to minimize the coregistration function. @@ -1420,6 +2523,125 @@ def _to_matrix_func(self) -> NDArrayf: return matrix +class LZD(AffineCoreg): + """ + Least Z-difference coregistration. + + See Rosenholm and Torlegård (1988), + https://www.asprs.org/wp-content/uploads/pers/1988journal/oct/1988_oct_1385-1389.pdf. + + Estimates a rigid transform (rotation + translation) between two elevation datasets. + + The estimated transform is stored in the `self.meta["outputs"]["affine"]` key "matrix", with rotation centered + on the coordinates in the key "centroid". The translation parameters are also stored individually in the + keys "shift_x", "shift_y" and "shift_z" (in georeferenced units for horizontal shifts, and unit of the + elevation dataset inputs for the vertical shift). + """ + + def __init__( + self, + only_translation: bool = False, + fit_minimizer: Callable[..., tuple[NDArrayf, Any]] = scipy.optimize.least_squares, + fit_loss_func: Callable[[NDArrayf], np.floating[Any]] | str = "linear", + max_iterations: int = 200, + tolerance: float = 0.01, + subsample: float | int = 5e5, + ): + """ + Instantiate an LZD coregistration object. + + :param only_translation: Whether to coregister only a translation, otherwise both translation and rotation. + :param fit_minimizer: Minimizer for the coregistration function. + :param fit_loss_func: Loss function for the minimization of residuals. + :param max_iterations: Maximum allowed iterations before stopping. + :param tolerance: Residual change threshold after which to stop the iterations. + :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. + """ + meta = { + "fit_minimizer": fit_minimizer, + "fit_loss_func": fit_loss_func, + "max_iterations": max_iterations, + "tolerance": tolerance, + "only_translation": only_translation, + } + super().__init__(subsample=subsample, meta=meta) + + def _fit_rst_rst( + self, + ref_elev: NDArrayf, + tba_elev: NDArrayf, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, + ) -> None: + """Estimate the rigid transform from tba_dem to ref_dem.""" + + # Method is the same for 2D or 1D elevation differences, so we can simply re-direct to fit_rst_pts + self._fit_rst_pts( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + weights=weights, + bias_vars=bias_vars, + **kwargs, + ) + + def _fit_rst_pts( + self, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + weights: NDArrayf | None = None, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, + ) -> None: + + # Get parameters stored in class + params_random = self._meta["inputs"]["random"] + params_fit_or_bin = self._meta["inputs"]["fitorbin"] + + # Call method + matrix, centroid, subsample_final = lzd( + ref_elev=ref_elev, + tba_elev=tba_elev, + inlier_mask=inlier_mask, + transform=transform, + crs=crs, + area_or_point=area_or_point, + z_name=z_name, + params_random=params_random, + params_fit_or_bin=params_fit_or_bin, + max_iterations=self._meta["inputs"]["iterative"]["max_iterations"], + tolerance=self._meta["inputs"]["iterative"]["tolerance"], + only_translation=self._meta["inputs"]["affine"]["only_translation"], + ) + + # Write output to class + # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) + output_affine = OutAffineDict( + centroid=centroid, + matrix=matrix, + shift_x=matrix[0, 3], + shift_y=matrix[1, 3], + shift_z=matrix[2, 3], + ) + self._meta["outputs"]["affine"] = output_affine + self._meta["outputs"]["random"] = {"subsample_final": subsample_final} + + class DhMinimize(AffineCoreg): """ Elevation difference minimization coregistration. diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 41bc9040..82b7f7f5 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -114,10 +114,12 @@ "shift_y": "Northward shift estimated (georeferenced unit)", "shift_z": "Vertical shift estimated (elevation unit)", "matrix": "Affine transformation matrix estimated", - "rejection_scale": "Rejection scale", - "num_levels": "Number of levels", + "only_translation": "Only translations are considered", + "standardize": "Input data was standardized", + "icp_method": "Type of ICP method", + "icp_picky": "Picky closest pair selection", + "cpd_weight": "Weight of CPD outlier removal", } - ##################################### # Generic functions for preprocessing ########################################### @@ -612,6 +614,7 @@ def _get_subsample_mask_pts_rst( tba_elev: NDArrayf | gpd.GeoDataFrame, inlier_mask: NDArrayb, transform: rio.transform.Affine, # Never None thanks to Coreg.fit() pre-process + z_name: str, area_or_point: Literal["Area", "Point"] | None, aux_vars: None | dict[str, NDArrayf] = None, ) -> NDArrayb: @@ -659,10 +662,13 @@ def _get_subsample_mask_pts_rst( pts_elev: gpd.GeoDataFrame = ref_elev if isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev rst_elev: NDArrayf = ref_elev if not isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev + # Remove non-finite values from point dataset + pts_elev = pts_elev[np.isfinite(pts_elev[z_name].values)] + # Get coordinates pts = (pts_elev.geometry.x.values, pts_elev.geometry.y.values) - # Get valid mask ahead of subsampling to have the exact number of requested subsamples by user + # Get valid mask ahead of subsampling to have the exact number of requested subsamples if aux_vars is not None: valid_mask = np.logical_and.reduce( (inlier_mask, np.isfinite(rst_elev), *(np.isfinite(var) for var in aux_vars.values())) @@ -673,16 +679,17 @@ def _get_subsample_mask_pts_rst( # Convert inlier mask to points to be able to determine subsample later # The location needs to be surrounded by inliers, use floor to get 0 for at least one outlier # Interpolates boolean mask as integers - # TODO: Pass area_or_point all the way to here - valid_mask = np.floor( + # TODO: Create a function in GeoUtils that can compute the valid boolean mask of an interpolation without + # having to convert data to float32 + valid_mask = valid_mask.astype(np.float32) + valid_mask[valid_mask == 0] = np.nan + valid_mask = np.isfinite( _interp_points(array=valid_mask, transform=transform, points=pts, area_or_point=area_or_point) - ).astype(bool) + ) # If there is a subsample, it needs to be done now on the point dataset to reduce later calculations sub_mask = _get_subsample_on_valid_mask(params_random=params_random, valid_mask=valid_mask) - # TODO: Move check to Coreg.fit()? - return sub_mask @@ -694,13 +701,14 @@ def _subsample_on_mask( transform: rio.transform.Affine, area_or_point: Literal["Area", "Point"] | None, z_name: str, -) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf]]: + return_coords: bool = False, +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]: """ Perform subsampling on mask for raster-raster or point-raster datasets on valid points of all inputs (including potential auxiliary variables). Returns 1D arrays of subsampled inputs: reference elevation, to-be-aligned elevation and auxiliary variables - (in dictionary). + (in dictionary), and (optionally) tuple of X/Y coordinates. """ # For two rasters @@ -716,6 +724,13 @@ def _subsample_on_mask( else: sub_bias_vars = None + # Return coordinates if required + if return_coords: + coords = _coords(transform=transform, shape=ref_elev.shape, area_or_point=area_or_point) + sub_coords = (coords[0][sub_mask], coords[1][sub_mask]) + else: + sub_coords = None + # For one raster and one point cloud else: @@ -723,6 +738,9 @@ def _subsample_on_mask( pts_elev: gpd.GeoDataFrame = ref_elev if isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev rst_elev: NDArrayf = ref_elev if not isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev + # Remove invalid points + pts_elev = pts_elev[np.isfinite(pts_elev[z_name].values)] + # Subsample point coordinates pts = (pts_elev.geometry.x.values, pts_elev.geometry.y.values) pts = (pts[0][sub_mask], pts[1][sub_mask]) @@ -750,7 +768,45 @@ def _subsample_on_mask( else: sub_bias_vars = None - return sub_ref, sub_tba, sub_bias_vars + # Return coordinates if required + if return_coords: + sub_coords = pts + else: + sub_coords = None + + return sub_ref, sub_tba, sub_bias_vars, sub_coords + + +@overload +def _preprocess_pts_rst_subsample( + params_random: InRandomDict, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + aux_vars: None | dict[str, NDArrayf] = None, + *, + return_coords: Literal[False] = False, +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None]: ... + + +@overload +def _preprocess_pts_rst_subsample( + params_random: InRandomDict, + ref_elev: NDArrayf | gpd.GeoDataFrame, + tba_elev: NDArrayf | gpd.GeoDataFrame, + inlier_mask: NDArrayb, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + area_or_point: Literal["Area", "Point"] | None, + z_name: str, + aux_vars: None | dict[str, NDArrayf] = None, + *, + return_coords: Literal[True], +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], tuple[NDArrayf, NDArrayf]]: ... def _preprocess_pts_rst_subsample( @@ -763,7 +819,8 @@ def _preprocess_pts_rst_subsample( area_or_point: Literal["Area", "Point"] | None, z_name: str, aux_vars: None | dict[str, NDArrayf] = None, -) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf]]: + return_coords: bool = False, +) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]: """ Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points (and interpolated in the case of point-raster input). @@ -780,11 +837,12 @@ def _preprocess_pts_rst_subsample( inlier_mask=inlier_mask, transform=transform, area_or_point=area_or_point, + z_name=z_name, aux_vars=aux_vars, ) # Perform subsampling on mask for all inputs - sub_ref, sub_tba, sub_bias_vars = _subsample_on_mask( + sub_ref, sub_tba, sub_bias_vars, sub_coords = _subsample_on_mask( ref_elev=ref_elev, tba_elev=tba_elev, aux_vars=aux_vars, @@ -792,10 +850,11 @@ def _preprocess_pts_rst_subsample( transform=transform, area_or_point=area_or_point, z_name=z_name, + return_coords=return_coords, ) # Return 1D arrays of subsampled points at the same location - return sub_ref, sub_tba, sub_bias_vars + return sub_ref, sub_tba, sub_bias_vars, sub_coords @overload @@ -981,17 +1040,124 @@ def _bin_or_and_fit_nd( ############################################### +def matrix_from_translations_rotations( + t1: float = 0.0, + t2: float = 0.0, + t3: float = 0.0, + alpha1: float = 0.0, + alpha2: float = 0.0, + alpha3: float = 0.0, + use_degrees: bool = True, +) -> NDArrayf: + """ + Build rigid affine matrix based on 3 translations (unit of coordinates) and 3 rotations (degrees or radians). + + The euler rotations use the extrinsic convention. + + :param t1: Translation in the X (west-east) direction (unit of coordinates). + :param t2: Translation in the Y (south-north) direction (unit of coordinates). + :param t3: Translation in the Z (vertical) direction (unit of DEM). + :param alpha1: Rotation around the X (west-east) direction. + :param alpha2: Rotation around the Y (south-north) direction. + :param alpha3: Rotation around the Z (vertical) direction. + :param use_degrees: Whether to use degrees for input rotations, otherwise radians. + + :raises ValueError: If the given translation or rotations contained invalid values. + + :return: Rigid affine matrix of transformation. + """ + + # Initialize diagonal matrix + matrix = np.eye(4) + # Convert euler angles to rotation matrix + e = np.array([alpha1, alpha2, alpha3]) + # If angles were given in degrees + if use_degrees: + e = np.deg2rad(e) + rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) + + # Add rotation matrix, and translations + matrix[0:3, 0:3] = rot_matrix + matrix[:3, 3] = [t1, t2, t3] + + return matrix + + +def translations_rotations_from_matrix( + matrix: NDArrayf, return_degrees: bool = True +) -> tuple[float, float, float, float, float, float]: + """ + Extract 3 translations (unit of coordinates) and 3 rotations (degrees or radians) from rigid affine matrix. + + The extracted euler rotations use the extrinsic convention. + + :param matrix: Rigid affine matrix of transformation. + :param return_degrees: Whether to return rotations in degrees, otherwise radians. + + :return: Translations in the X, Y and Z direction and rotations around the X, Y and Z directions. + """ + + # Extract translations + t1, t2, t3 = matrix[:3, 3] + + # Get rotations from affine matrix + rots = pytransform3d.rotations.euler_from_matrix(matrix[:3, :3], i=0, j=1, k=2, extrinsic=True, strict_check=True) + if return_degrees: + rots = np.rad2deg(np.array(rots)) + + # Extract rotations + alpha1, alpha2, alpha3 = rots + + return t1, t2, t3, alpha1, alpha2, alpha3 + + def invert_matrix(matrix: NDArrayf) -> NDArrayf: - """Invert a transformation matrix.""" + """ + Invert a transformation matrix. + + :param matrix: Affine transformation matrix. + + :return: Inverted transformation matrix. + """ with warnings.catch_warnings(): # Deprecation warning from pytransform3d. Let's hope that is fixed in the near future. warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") + # return np.linalg.inv(matrix) + checked_matrix = pytransform3d.transformations.check_transform(matrix) # Invert the transform if wanted. return pytransform3d.transformations.invert_transform(checked_matrix) +def _apply_matrix_pts_mat( + mat: NDArrayf, + matrix: NDArrayf, + centroid: tuple[float, float, float] | None = None, + invert: bool = False, +) -> NDArrayf: + + # Invert matrix if required + if invert: + matrix = invert_matrix(matrix) + + # First, get 4xN array, adding a column of ones for translations during matrix multiplication + points = np.concatenate((mat, np.ones((1, mat.shape[1])))) + + # Temporarily subtract centroid coordinates + if centroid is not None: + points[:3, :] -= np.array(centroid)[:, None] + + # Transform using matrix multiplication, and get only the first three columns + transformed_points = (matrix @ points)[:3, :] + + # Add back centroid coordinates + if centroid is not None: + transformed_points += np.array(centroid)[:, None] + + return transformed_points + + def _apply_matrix_pts_arr( x: NDArrayf, y: NDArrayf, @@ -1195,21 +1361,6 @@ def _iterate_affine_regrid_small_rotations( return transformed_dem.data.filled(np.nan), transform -def _get_rotations_from_matrix(matrix: NDArrayf) -> tuple[float, float, float]: - """ - Get rotation angles along each axis from the 4x4 affine matrix, derived as Euler extrinsic angles in degrees. - - :param matrix: Affine matrix. - - :return: Euler extrinsic rotation angles along X, Y and Z (degrees). - """ - - # The rotation matrix is composed of the first 3 rows/columns - rot_matrix = matrix[0:3, 0:3] - angles = pytransform3d.rotations.euler_from_matrix(R=rot_matrix, i=0, j=1, k=2, extrinsic=True) - return np.rad2deg(angles) - - def _apply_matrix_rst( dem: NDArrayf, transform: rio.transform.Affine, @@ -1261,7 +1412,7 @@ def _apply_matrix_rst( return dem + matrix[2, 3], new_transform # 3/ If matrix contains only small rotations (less than 20 degrees), use the fast iterative reprojection - rotations = _get_rotations_from_matrix(matrix) + rotations = translations_rotations_from_matrix(matrix)[3:] if all(np.abs(rot) < 20 for rot in rotations) and force_regrid_method is None or force_regrid_method == "iterative": new_dem, transform = _iterate_affine_regrid_small_rotations( dem=dem, transform=transform, matrix=matrix, centroid=centroid, resampling=resampling @@ -1551,10 +1702,13 @@ class InSpecificDict(TypedDict, total=False): angle: float # (Using Deramp) Polynomial order selected for deramping poly_order: int + # (Using ICP) Method type to compute 3D distances + icp_method: Literal["point-to-point", "point-to-plane"] + # (Using ICP) Picky selection of closest pairs + icp_picky: bool - # (Using ICP) - rejection_scale: float - num_levels: int + # (Using CPD) Weight for outlier removal + cpd_weight: float class OutSpecificDict(TypedDict, total=False): @@ -1573,6 +1727,10 @@ class InAffineDict(TypedDict, total=False): vshift_reduc_func: Callable[[NDArrayf], np.floating[Any]] # Vertical shift activated apply_vshift: bool + # Apply coregistration method only for translations + only_translation: bool + # Standardize input data for numerics + standardize: bool class OutAffineDict(TypedDict, total=False): @@ -1903,11 +2061,12 @@ def _preprocess_rst_pts_subsample( inlier_mask=inlier_mask, transform=transform, area_or_point=area_or_point, + z_name=z_name, aux_vars=aux_vars, ) # Perform subsampling on mask for all inputs - sub_ref, sub_tba, sub_bias_vars = _subsample_on_mask( + sub_ref, sub_tba, sub_bias_vars, _ = _subsample_on_mask( ref_elev=ref_elev, tba_elev=tba_elev, aux_vars=aux_vars, diff --git a/xdem/filters.py b/xdem/filters.py index c80b87eb..88438271 100644 --- a/xdem/filters.py +++ b/xdem/filters.py @@ -21,12 +21,6 @@ import warnings -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False import numpy as np import scipy @@ -76,70 +70,6 @@ def gaussian_filter_scipy(array: NDArrayf, sigma: float) -> NDArrayf: return gauss -def gaussian_filter_cv(array: NDArrayf, sigma: float) -> NDArrayf: - """ - Apply a Gaussian filter to a raster that may contain NaNs, using OpenCV's implementation. - Arguments are for now hard-coded to be identical to scipy. - - N.B: kernel_size is set automatically based on sigma - - :param array: the input array to be filtered. - :param sigma: the sigma of the Gaussian kernel - - :returns: the filtered array (same shape as input) - """ - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'.") - - # Check that array dimension is 2, or can be squeezed to 2D - orig_shape = array.shape - if len(orig_shape) == 2: - pass - elif len(orig_shape) == 3: - if orig_shape[0] == 1: - array = array.squeeze() - else: - raise NotImplementedError("Case of array of dimension 3 not implemented.") - else: - raise ValueError(f"Invalid array shape given: {orig_shape}. Expected 2D or 3D array.") - - # In case array does not contain NaNs, use OpenCV's gaussian filter directly - # With kernel size (0, 0), i.e. set to default, and borderType=BORDER_REFLECT, the output is equivalent to scipy - if np.count_nonzero(np.isnan(array)) == 0: - gauss = cv2.GaussianBlur(array, (0, 0), sigmaX=sigma, borderType=cv2.BORDER_REFLECT) - - # If array contain NaNs, need a more sophisticated approach - # Inspired by https://stackoverflow.com/a/36307291 - else: - - # Run filter on a copy with NaNs set to 0 - array_no_nan = array.copy() - array_no_nan[np.isnan(array)] = 0 - gauss_no_nan = cv2.GaussianBlur(array_no_nan, (0, 0), sigmaX=sigma, borderType=cv2.BORDER_REFLECT) - del array_no_nan - - # Mask of NaN values - nan_mask = 0 * array.copy() + 1 - nan_mask[np.isnan(array)] = 0 - gauss_mask = cv2.GaussianBlur(nan_mask, (0, 0), sigmaX=sigma, borderType=cv2.BORDER_REFLECT) - del nan_mask - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", message="invalid value encountered") - gauss = gauss_no_nan / gauss_mask - - return gauss.reshape(orig_shape) - - -# Median filters - -# To be added - -# Min/max filters - -# To be added - - def distance_filter(array: NDArrayf, radius: float, outlier_threshold: float) -> NDArrayf: """ Filter out pixels whose value is distant more than a set threshold from the average value of all neighbor \ @@ -155,7 +85,7 @@ def distance_filter(array: NDArrayf, radius: float, outlier_threshold: float) -> :returns: the filtered array (same shape as input) """ # Calculate the average value within the radius - smooth = gaussian_filter_cv(array, sigma=radius) + smooth = gaussian_filter_scipy(array, sigma=radius) # Filter outliers outliers = (np.abs(array - smooth)) > outlier_threshold diff --git a/xdem/misc.py b/xdem/misc.py index d6ad04a3..bf8fc3e6 100644 --- a/xdem/misc.py +++ b/xdem/misc.py @@ -33,62 +33,7 @@ except ImportError: _has_yaml = False -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False - -import numpy as np - import xdem -from xdem._typing import NDArrayf - - -def generate_random_field( - shape: tuple[int, int], corr_size: int, random_state: int | np.random.Generator | None = None -) -> NDArrayf: - """ - Generate a semi-random gaussian field (to simulate a DEM or DEM error) - - :param shape: The output shape of the field. - :param corr_size: The correlation size of the field. - :param random_state: Seed for random number generator. - - :examples: - >>> generate_random_field((4, 5), corr_size=2, random_state=1).round(2) - array([[0.74, 0.74, 0.75, 0.75, 0.75], - [0.69, 0.69, 0.7 , 0.71, 0.71], - [0.51, 0.51, 0.54, 0.57, 0.58], - [0.45, 0.47, 0.5 , 0.53, 0.54]]) - - :returns: A numpy array of semi-random values from 0 to 1 - """ - - rng = np.random.default_rng(random_state) - - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'.") - - field = cv2.resize( - cv2.GaussianBlur( - np.repeat( - np.repeat( - rng.integers(0, 255, (shape[0] // corr_size, shape[1] // corr_size), dtype="uint8"), - corr_size, - axis=0, - ), - corr_size, - axis=1, - ), - ksize=(2 * corr_size + 1, 2 * corr_size + 1), - sigmaX=corr_size, - ) - / 255, - dsize=(shape[1], shape[0]), - ) - return field def deprecate(removal_version: Version = None, details: str = None) -> Callable[[Any], Any]: diff --git a/xdem/volume.py b/xdem/volume.py index 1ae5aeb5..30bfe856 100644 --- a/xdem/volume.py +++ b/xdem/volume.py @@ -36,13 +36,6 @@ ) from tqdm import tqdm -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False - import xdem from xdem._typing import MArrayf, NDArrayf @@ -321,8 +314,6 @@ def idw_interpolation( :returns: A filled array with no NaNs """ - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'.") # Create a mask for where nans exist nan_mask = get_mask_from_array(array) @@ -334,8 +325,8 @@ def idw_interpolation( # Remove extrapolated values: gaps up to the size of max_search_distance are kept, # but surfaces that artificially grow on the edges are removed if not extrapolate: - interp_mask = cv2.morphologyEx( - (~nan_mask).squeeze().astype("uint8"), cv2.MORPH_CLOSE, kernel=np.ones((max_search_distance - 1,) * 2) + interp_mask = scipy.ndimage.binary_closing( + (~nan_mask).squeeze().astype("uint8"), structure=np.ones((max_search_distance - 1,) * 2) ).astype("bool") if np.ndim(array) == 3: interpolated_array[:, ~interp_mask] = np.nan