Metadata are not updated when changing spatial dimensions with an UDF

I am applying an UDF that should aggregate a binary snow cover map to a coarser resolution.

I am working with the FRACTIONAL_SNOW_COVER dataset.
To summarize:

  1. I am resampling the dataset and filtering to a new extent that I set manually in order to have a grid that is exactly aligned with a 500 m specific grid. I would like to use resample_cube_spatial in the future to resample it w.r.t. a collection that is currently not available unfortunately. I am not sure if there is a nicer way to do it for the moment. I was reading this discussion Resampling a datacube on a user-defined regular grid and so far I understand that there is no way to resample to an user defined extent, any update on that?!?
  2. I am binarizing it, as explained in my previous question Passing user defined parameters to an UDF - #10 by valentina.premier
  3. I want to aggregate it with a resolution of 500 m
# authentication
eoconn = openeo.connect("https://openeo-dev.vito.be")
eoconn.authenticate_oidc()
eoconn.describe_account()

# load the Copernicus fractional snow cover collection
scf = eoconn.load_collection(
    "FRACTIONAL_SNOW_COVER",
    spatial_extent  = {'west':10.728539,'east':11.039333,'south':46.647281,'north':46.796379, 'crs':4326},
    temporal_extent=['2023-08-02','2023-08-07'],
    bands=["FSCTOC"]
)

# Resampling and set a new extent
scf_rsmpl = scf.resample_spatial(resolution=20, projection=32632,
                                        method = "near")
scf_bbox = scf_rsmpl.filter_bbox(west=631910, south=5167410, east=655890, 
                                 north=5184190, crs=32632)


# Binarization
binarize = openeo.UDF.from_file('udf-binarize.py', context={"from_parameter": "context"})
scf_binary = scf_bbox.apply(process=binarize, context={"snowT": 20})

aggregation = openeo.UDF.from_file('udf-scf.py')
scf_aggregated = scf_binary.apply(process=aggregation)

The udf-scf.py is the one in the code below:


from openeo.udf import XarrayDataCube
from openeo.udf.debug import inspect
from openeo.metadata import CollectionMetadata

import numpy as np
import xarray as xr



def apply_metadata(input_metadata:CollectionMetadata, 
                   context:dict) -> CollectionMetadata:

    pixel_ratio = 20
    
    xstep = input_metadata.get('x','step')
    ystep = input_metadata.get('y','step')
    new_metadata = {
          "x": {"type": "spatial", 
                "axis": "x", 
                "step": xstep*pixel_ratio, 
                "reference_system": 32632},
          "y": {"type": "spatial", 
                "axis": "y", 
                "step": ystep*pixel_ratio, 
                "reference_system": 32632},
          "t": {"type": "temporal"}
    }
    return CollectionMetadata(new_metadata)


def scf(array: np.array, 
        pixel_ratio: int = 20,
        scf_max: bool = False,
        scf_min: bool = False,
        codes: list = [205, 210, 254, 255],
        nv_thres: int = 40) -> np.array:
    
    """
    This function calculates the snow cover fraction from a snow cover map which 
    is given as input as a np.array
    
    Parameters
    ----------
    array : np.array 
        representing the snow cover map
    pixel_ratio : int, optional
        number of pixels to be aggregated to obtain the new resolution 
        (e.g. 20x20)
    scf_max : bool, optional
        if True, consider all the non valid pixels as snow
    scf_min : bool, optional
        if True, consider all the non valid pixels as snow free
    codes : list, optional
        list of the codes assigned as no data value
    nv_thres : int, optional
        max number of pixels which can be no data. If this number is matched or 
        exceeded, the aggregated pixel is assigned as no data
        
    Returns
    -------
    aggregated : np.array
        array with the aggregated fractional snow cover
    
    """
    #x and y dimensions
    assert array.ndim == 2
    

    # number of columns and rows of the output aggregated array
    nrows = int(np.shape(array)[0]/pixel_ratio)
    ncols = int(np.shape(array)[1]/pixel_ratio)
    
    #initialize aggregated array
    aggregated = np.ones(shape=(nrows, ncols)) * 255
    
    scf_correct = not(scf_min) and not(scf_max)
    
    # iterate over rows
    y_j = 0
    x_i = 0
    for j in range(0, nrows):                                    
        # reset column counter
        x_i = 0             
        # iterate over columns
        for i in range(0, ncols):

            # read the slice of the scene matching the current
            # estimator pixel
            data_ij = array[y_j:y_j + pixel_ratio,x_i: x_i + pixel_ratio]

            # check if the pixels are valid
            if any([erc in data_ij for erc in codes]):
                nv_sum = sum([np.sum(data_ij == erc) for erc in codes])
                
                if scf_min:
                    aggregated[j, i] = np.sum(data_ij[data_ij <= 100]) / data_ij.size 
                    
                if scf_max:
                    aggregated[j,i] = (np.sum(data_ij[data_ij <= 100]) + \
                                      nv_sum*100) / data_ij.size 
                
                
                if scf_correct and nv_sum < nv_thres:
                    # calculate snow cover fraction
                    aggregated[j, i] = np.sum(data_ij[data_ij <= 100]) / (data_ij.size - nv_sum)
            else: 
                
                aggregated[j,i] = np.sum(data_ij) / data_ij.size
            # advance column counter by number of high resolution pixels
            # contained in one low resoution pixels
            x_i += pixel_ratio
        
        # advance row counter by number of high resolution pixels
        # contained in one low resoution pixels
        y_j += pixel_ratio
        
    return aggregated
    

def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
    
    array: xr.DataArray = cube.get_array()

    # Pixel size of the original image
    init_pixel_size_x = array.coords['x'][-1] - array.coords['x'][-2]
    init_pixel_size_y = array.coords['y'][-1] - array.coords['y'][-2]


    cubearray: xr.DataArray = cube.get_array().copy()
    cubearray = cubearray[0]
     
    predicted_array = scf(cubearray.data)
    
    # NEW COORDINATES
    coord_x = np.linspace(start = array.coords['x'].min(), 
                          stop = array.coords['x'].max() + init_pixel_size_x,
                          num = predicted_array.shape[-2], 
                          endpoint=False)
    coord_y = np.linspace(start = array.coords['y'].min(), 
                          stop = array.coords['y'].max() + init_pixel_size_y,
                          num = predicted_array.shape[-1], 
                          endpoint=False)
    
    predicted_cube = xr.DataArray(predicted_array, 
                                  dims=['x', 'y'], 
                                  coords=dict(x=coord_x, y=coord_y))


    return XarrayDataCube(predicted_cube)

My main problem is that it returns me the correct output but with wrong metadata, i.e. it keeps the original image size. However, when running the UDF locally, it creates an array with the correct smaller size.

Also, I am not really understanding how the function deals with the temporal dimension. It returns the correct output, but so the udf understands automatically to “loop” over the time dimension? or should I use apply_dimension instead of apply?

Many thanks
Valentina

Hi Valentina,

It is correct that there is currently no way to resample to an user defined extent. That is still under investigation.

The UDF based resampling method from your example is still an experimental feature and it should only be available in the Python-Jep runtime. You can activate it using the following code:

scf_binary.apply(process=aggregation, runtime="Python-Jep")

Note that there will still be some bugs with this feature when you use it with e.g. apply_neighborhood.

For your final question, the apply() method will split your original (t, bands, y, x) datacube into several (bands, 256, 256) datacubes and then run the UDF once for each of these chunks. At the end all results are merged back into a (t, bands, y, x) datacube. So it does automatically loop over the time dimension.

apply_dimension(dimension=“t”) will split it into (t, bands, 256, 256) chunks.
apply_dimension(dimension=“bands”) will split it into (bands, 256, 256) chunks.
apply_neighborhood(size_x, size_y, size_t) will split it into (size_t, bands, size_y, size_x) chunks.

Hope that helps,
Jeroen

Thank you for your answer. I am coming back to this conversation because I have managed to run the udf but I recently realized that there are differences when I run it locally with execute_local_udf (seems like I get the correct output) while when running it on the cloud the output changes. Is it possible that this is due to the default chunks that are applied when using apply?

Hi, just FYI, @jeroen.verstraelen is away this week, so taking a bit longer to respond.

I can confirm that execute_local_udf will indeed not have the same chunks as on the server, so especially for a more advanced case like this, it may not really be super helpfull.

If you can share more details about what you are eventually running, or perhaps a job id, Jeroen can look at it next week.