UDF correlation

Hi, I am developing an udf to perform the Spearman correlation between a datacube and 4 timeseries.
The udf i developed is below:

from openeo.udf import XarrayDataCube
from typing import Dict
import numpy as np
from datetime import datetime
from scipy.stats import rankdata
import xarray

def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
    dat=cube.get_array()
    C=0
    V=0
    V2=0
    W=0
    D=0
    D=np.array([datetime.fromordinal(d) for d in D])
    
    IP=np.empty([D.size,5]).astype(object)
    IP[:,0:4]=np.nan

    IP[:,0]=C
    IP[:,1]=V
    IP[:,2]=V2
    IP[:,3]=W
    IP[:,4]=D

    IP[IP==-9999]=np.nan
    
    D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d') for d in dat.t.values])
    mat0=dat.values[1,:,:,:].astype(float)

    ID=np.where(np.in1d(D, IP[:,4]))[0]
    res=np.zeros([4,mat0.shape[1],mat0.shape[2]])
    pix=IP[ID,0:4].astype(float).transpose()
    pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
    pix=np.moveaxis(pix,[0,1],[-2,-1])

    for i in range(0,4):
        mat2=mat0.copy()
        pix2=pix[i,:,:,:]
        ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
        mat2[ID]=999999
        pix2[ID]=999999
        mat2=rankdata(mat2,axis=0)
        pix2=rankdata(pix2,axis=0)
        mat2[ID]=np.nan
        pix2[ID]=np.nan

        res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))

    corrT=np.nanmax(res[0:3,:,:],axis=0)
    corrW=res[3,:]
    Mmask=np.logical_or(np.logical_and(corrT<=np.nanpercentile(corrT,10),corrW<=0.6),np.logical_and(corrW<=np.nanpercentile(corrW,10),corrT<=0.7))
    out = xarray.DataArray(data=Mmask, coords={'x': (['x'],dat.x.values,dat.x.attrs ),'y': (['y'], dat.y.values,dat.y.attrs)})
    out=XarrayDataCube(out)

    
    return out

Thanks to @jeroen.dries I was able to pass some information to the UDF (the timeseries that I extract from openeo)

spearman_udf = load_udf('spearman_pixels.py')
spearman_udf=spearman_udf.replace("C=0",'C=np.array('+str(IP[:,0].tolist())+')')
spearman_udf=spearman_udf.replace("V=0",'V=np.array('+str(IP[:,1].tolist())+')')
spearman_udf=spearman_udf.replace("V2=0",'V2=np.array('+str(IP[:,2].tolist())+')')
spearman_udf=spearman_udf.replace("W=0",'W=np.array('+str(IP[:,3].tolist())+')')
spearman_udf=spearman_udf.replace("D=0",'D=np.array('+str(IP[:,4].tolist())+')')

And everything works fine if I apply the udf locally by downloading the datacube:

datacube=xarray.open_dataset('data.nc')
dat=XarrayDataCube(datacube.to_array(dim='bands'))
dat=dat.get_array()

But when I run the algorithm in openeo an error is raised:

mask = datacube.apply_dimension(code=spearman_udf, runtime='Python')
Mmask.download("Mmask.nc", format="netcdf")

Traceback (most recent call last):

  Input In [6] in <cell line: 1>
    Mmask.download("Mmask.nc", format="netcdf")

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\datacube.py:1589 in download
    return self._connection.download(cube.flat_graph(), outputfile)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:1028 in download
    response = self.post(path="/result", json=request, expected_status=200, stream=True, timeout=timeout)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:173 in post
    return self.request("post", path=path, json=json, allow_redirects=False, **kwargs)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:121 in request
    self._raise_api_error(resp)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:152 in _raise_api_error
    raise exception

OpenEoApiError: [500] unknown: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 41, in memory_logging_wrapper
    return function(*args, **kwargs)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 522, in tile_function
    result_data = run_udf_code(code=function, data=data)
  File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 178, in run_udf_code
    result_cube = func(data.get_datacube_list()[0], data.user_context)
  File "<string>", line 47, in apply_datacube
IndexError: boolean index did not match indexed array along dimension 0; dimension is 1 but corresponding boolean dimension is 70

It seems to me that there are some differences between the downloaded datacube in netcdf format and the one online.
Do you have any advices on how to proceed?

It’s a bit hard to pinpoint the problem from this information, but could you add printing of some debug information, to inspect the dimensions and shape of the data you are working on.

e.g.

def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
    dat=cube.get_array()
    print(data) # Or just `print(data.shape)`

likewise, what is the configuration of your downloaded data.nc file?

Also: what is the processing (process graph or python code) you do just before you invoke the UDF?

I runned again through the code and I found an error by myself. Sadly, this was not enough to make it works, since a new issue is generated. I will copy below the text of the python code, of my udf and of the error:


# Import required packages
import matplotlib.pyplot as plt
import openeo
from openeo.processes import process, gte, is_nan, eq, not_, sd, lte, sqrt, power, mean, array_create, quantiles
from openeo.udf import execute_local_udf
import numpy as np
from netCDF4 import Dataset
import time
import csv
from datetime import datetime,timedelta
from scipy.stats import spearmanr,pearsonr,rankdata
import xarray
from openeo.udf.xarraydatacube import XarrayDataCube

def load_udf(relative_path):
    with open(relative_path, 'r+') as f:
        return f.read()

# %% Initialization
#connection = openeo.connect("https://openeo.cloud")
connection = openeo.connect("https://openeo-dev.vito.be")
connection.authenticate_oidc()


# Input variables
t0= ["2016-10-01","2017-10-01","2018-10-01","2019-10-01","2020-10-01"]
s0=["2018-09-30","2019-09-30","2020-09-30","2021-09-30","2022-09-30"]

temp_ext=[t0[0],s0[0]]
print('initialization '+t0[0])
spat_extent={"west": 11.577, "south": 44.871, "east": 11.617, "north": 44.911}
rect={
  "type":"FeatureCollection",
  "features":[
    {
      "type":"Feature",
      "geometry": {
        "type":"Polygon",
        "coordinates":[[[11.577,44.871],[11.577,44.911],[11.617,44.911],[11.617,44.871],[11.577,44.871]]]
      },
      "properties":{"name":"area1"}
    }
  ]
}
band=["B08","B04"]
maskband=["CLD","CLP"]
maskthr=50
filt= (np.ones([5,5])/25).tolist()

# S2 datacube
datacube = connection.load_collection(collection_id = "SENTINEL2_L1C_SENTINELHUB", spatial_extent = spat_extent, temporal_extent = temp_ext, bands = band[0],properties={"eo:cloud_cover": lambda x: lte(x, 70)}).linear_scale_range(0, 10000, 0, 1)

# mask datacube
mask0 = connection.load_collection(collection_id = "SENTINEL2_L2A_SENTINELHUB", spatial_extent = spat_extent, temporal_extent = temp_ext, bands = maskband,properties={"eo:cloud_cover": lambda x: lte(x, 70)})
mask0 = mask0 >= maskthr
mask0 = mask0.band(maskband[0]).logical_or(mask0.band(maskband[1]))
mask0 = mask0.resample_cube_spatial(datacube)

# water mask
wat = connection.load_collection(collection_id = "GLOBAL_SURFACE_WATER", spatial_extent = spat_extent, temporal_extent = ["1980-01-01", "2020-12-31"], bands = ['extent']).max_time()
wat = wat.apply(lambda val: not_(is_nan(x=val)))
wat = wat.apply_kernel(filt).resample_cube_spatial(datacube) == 0 #.apply(lambda val: eq(x=val,y=0))
land = wat == 0 #.apply(lambda val: eq(x=val,y=0))

# masking data

datacube=datacube.mask(mask0)

# %% C mask calculation
asd=0
while asd==0:
    try:
        connection = openeo.connect("https://openeo-dev.vito.be")
        connection.authenticate_oidc()

        print('C mask calculation')
        
        m=datacube.mean_time()
        s=datacube.apply_dimension(process='sd',dimension='t')

        cv=s/m
        cv=cv.mask(land)
        cv=cv.drop_dimension("t")
        
        val = cv.aggregate_spatial(rect, lambda pixels: array_create([quantiles(pixels,q=20)]))
        print('extracting fifth percentile')
        val=val.execute()
        val=val[0][0]
        
        Cmask=cv<float(val)
        Cmask=Cmask==0
        
        C=datacube.mask(Cmask)
        print('extracting masked dataset')
        C=C.aggregate_spatial(rect, "mean")
        print('extracting masked timeseries')
        C=C.execute()
        
        asd=1
    except Exception as e: 
        print(e)
        print('outtime. reload')
            

# %% download datacube

datacube=datacube.mask(wat)

asd=0
while asd==0:
    try:
        connection = openeo.connect("https://openeo-dev.vito.be")
        connection.authenticate_oidc()

        print('M mask calculation')
        print('full datacube download')
        datacube.download("data.nc", format="netcdf")

        asd=1
    except Exception as e: 
        print(e)
        print('outtime. reload')

# %% reading data

DC = list(C)
DC = np.array ([datetime.strptime(d[0:10],'%Y-%m-%d') for d in DC])
C = list(C.values())
ID = [len(sublist[0])!=0 for sublist in C]
DC = DC[ID]
C = np.array([item for sublist in C for item in sublist[0]]).astype(float)


# %% calculating Spearman
C[np.isnan(C)]=-9999

IP=np.empty([DC.size,2]).astype(object)
IP[:,0:1]=-9999

IP[:,0]=C
D=[d.toordinal() for d in DC]
IP[:,1]=D

# fix ex_loc = 1 to apply the algorithm on the downloaded data and ex_loc=0 otherwise
ex_loc=0

if ex_loc==1:
    cube=xarray.open_dataset('data.nc')
    cube=XarrayDataCube(cube.to_array(dim='bands'))
    dat=cube.get_array()
    
    IP[IP==-9999]=np.nan
    
    D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d').toordinal() for d in dat.t.values])
    mat0=dat.values[1,:,:,:].astype(float)

    ID=np.where(np.in1d(D, IP[:,1]))[0]
    res=np.zeros([1,mat0.shape[1],mat0.shape[2]])
    pix=IP[ID,0:1].astype(float).transpose()
    pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
    pix=np.moveaxis(pix,[0,1],[-2,-1])

    for i in range(0,1):
        mat2=mat0.copy()
        pix2=pix[i,:,:,:]
        ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
        mat2[ID]=999999
        pix2[ID]=999999
        mat2=rankdata(mat2,axis=0)
        pix2=rankdata(pix2,axis=0)
        mat2[ID]=np.nan
        pix2[ID]=np.nan

        res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))

    Mmask=res[0,:,:]

else:
    connection = openeo.connect("https://openeo-dev.vito.be")
    connection.authenticate_oidc()


    spearman_udf = load_udf('spearman_pixels_eo.py')
    spearman_udf=spearman_udf.replace("C=0",'C=np.array('+str(IP[:,0].tolist())+')')
    spearman_udf=spearman_udf.replace("D=0",'D=np.array('+str(IP[:,1].tolist())+')')

    Mmask = datacube.apply_dimension(code=spearman_udf, runtime='Python')
    Mmask.download("Mmask.nc", format="netcdf")

# -*- coding: utf-8 -*-
"""
Created on Wed May  4 11:04:57 2022

@author: Paolo
"""
from openeo.udf import XarrayDataCube
from typing import Dict
import numpy as np
from datetime import datetime
from scipy.stats import rankdata
import xarray

def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
    dat=cube.get_array()
    
    C=0
    D=0
    D=np.array([datetime.fromordinal(d) for d in D])
    
    IP=np.empty([D.size,5]).astype(object)
    IP[:,0:1]=np.nan

    IP[:,0]=C
    IP[:,1]=D

    IP[IP==-9999]=np.nan
    
    D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d').toordinal() for d in dat.t.values])
    mat0=dat.values[1,:,:,:].astype(float)

    ID=np.where(np.in1d(D, IP[:,1]))[0]
    res=np.zeros([1,mat0.shape[1],mat0.shape[2]])
    pix=IP[ID,0:1].astype(float).transpose()
    pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
    pix=np.moveaxis(pix,[0,1],[-2,-1])

    for i in range(0,1):
        mat2=mat0.copy()
        pix2=pix[i,:,:,:]
        ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
        mat2[ID]=999999
        pix2[ID]=999999
        mat2=rankdata(mat2,axis=0)
        pix2=rankdata(pix2,axis=0)
        mat2[ID]=np.nan
        pix2[ID]=np.nan

        res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))

    Mmask=res[0,:,:]
    out = xarray.DataArray(data=Mmask, coords={'x': (['x'],dat.x.values,dat.x.attrs ),'y': (['y'], dat.y.values,dat.y.attrs)})
    out=XarrayDataCube(out)

    
    return out
Traceback (most recent call last):

  Input In [5] in <cell line: 11>
    Mmask.download("Mmask.nc", format="netcdf")

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\datacube.py:1589 in download
    return self._connection.download(cube.flat_graph(), outputfile)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:1028 in download
    response = self.post(path="/result", json=request, expected_status=200, stream=True, timeout=timeout)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:173 in post
    return self.request("post", path=path, json=json, allow_redirects=False, **kwargs)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:121 in request
    self._raise_api_error(resp)

  File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:152 in _raise_api_error
    raise exception

OpenEoApiError: [500] unknown: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 41, in memory_logging_wrapper
    return function(*args, **kwargs)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 522, in tile_function
    result_data = run_udf_code(code=function, data=data)
  File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 178, in run_udf_code
    result_cube = func(data.get_datacube_list()[0], data.user_context)
  File "<string>", line 62, in apply_datacube
  File "/opt/venv/lib64/python3.8/site-packages/xarray/core/dataarray.py", line 403, in __init__
    coords, dims = _infer_coords_and_dims(data.shape, coords, dims)
  File "/opt/venv/lib64/python3.8/site-packages/xarray/core/dataarray.py", line 110, in _infer_coords_and_dims
    raise ValueError(
ValueError: inferring DataArray dimensions from dictionary like ``coords`` is no longer supported. Use an explicit list of ``dims`` instead.

Thanks for the extra info. The line numbers are a bit off, but I think the current problem is on this line:

    out = xarray.DataArray(data=Mmask, coords={'x': (['x'],dat.x.values,dat.x.attrs ),'y': (['y'], dat.y.values,dat.y.attrs)})

Can you try adding an argument dims=["t", "x", "y"] as well? I think that is what the exception is suggesting. (I’m not completely sure about the actual values of the dims argument though)

(note: we are currently running xarray 0.16.2 back-end side, which might be a bit older that what you are running locally)

I solved the error with that, but I still am not able to get the result I want. I am thinking that the reason could be the function that I use for apply my udf: apply_dimension. Is this the right function to work on a full datacube or should I apply a different function?

With the code as it is, I can download the results without errors, but the resulting matrix is a matrix of NaN

I was finally able to make the UDF works.
The issue was in the xarray shape: when I read the downloaded netcdf data, the xarray shape was [bands, time, x, y], while inside openeo the order is [time, bands, x, y].
I was therefore able to obtain the spearman correlation, as desired, but there is a new issue: the values I obtain from the UDF are different from those that I obtain locally:

immagine

Since the program I apply inside the udf is identical to the one applied locally, I guess that there is something wrong internally, but notwithstanding my attempts I can’t figure it out where is the issue.

Below there is the code of the comparison and the one of the udf:

# -*- coding: utf-8 -*-
"""
Created on Thu Apr 21 11:14:59 2022

@author: Paolo
"""

# Import required packages
import matplotlib.pyplot as plt
import openeo
from openeo.processes import process, gte, is_nan, eq, not_, sd, lte, sqrt, power, mean, array_create, quantiles
from openeo.udf import execute_local_udf
import numpy as np
from netCDF4 import Dataset
import time
import csv
from datetime import datetime,timedelta
from scipy.stats import spearmanr,pearsonr,rankdata
import xarray
from openeo.udf.xarraydatacube import XarrayDataCube

def load_udf(relative_path):
    with open(relative_path, 'r+') as f:
        return f.read()

# %% Initialization
#connection = openeo.connect("https://openeo.cloud")
connection = openeo.connect("https://openeo-dev.vito.be")
connection.authenticate_oidc()


# Input variables
t0= ["2016-10-01","2017-10-01","2018-10-01","2019-10-01","2020-10-01"]
s0=["2018-09-30","2019-09-30","2020-09-30","2021-09-30","2022-09-30"]

temp_ext=[t0[0],s0[0]]
print('initialization '+t0[0])
spat_extent={"west": 11.577, "south": 44.871, "east": 11.617, "north": 44.911}
rect={
  "type":"FeatureCollection",
  "features":[
    {
      "type":"Feature",
      "geometry": {
        "type":"Polygon",
        "coordinates":[[[11.577,44.871],[11.577,44.911],[11.617,44.911],[11.617,44.871],[11.577,44.871]]]
      },
      "properties":{"name":"area1"}
    }
  ]
}
band=["B08","B04"]
maskband=["CLD","CLP"]
maskthr=50
filt= (np.ones([5,5])/25).tolist()

# S2 datacube
datacube = connection.load_collection(collection_id = "SENTINEL2_L1C_SENTINELHUB", spatial_extent = spat_extent, temporal_extent = temp_ext, bands = band[0],properties={"eo:cloud_cover": lambda x: lte(x, 70)}).linear_scale_range(0, 10000, 0, 1)

# mask datacube
mask0 = connection.load_collection(collection_id = "SENTINEL2_L2A_SENTINELHUB", spatial_extent = spat_extent, temporal_extent = temp_ext, bands = maskband,properties={"eo:cloud_cover": lambda x: lte(x, 70)})
mask0 = mask0 >= maskthr
mask0 = mask0.band(maskband[0]).logical_or(mask0.band(maskband[1]))
mask0 = mask0.resample_cube_spatial(datacube)

# water mask
wat = connection.load_collection(collection_id = "GLOBAL_SURFACE_WATER", spatial_extent = spat_extent, temporal_extent = ["1980-01-01", "2020-12-31"], bands = ['extent']).max_time()
wat = wat.apply(lambda val: not_(is_nan(x=val)))
wat = wat.apply_kernel(filt).resample_cube_spatial(datacube) == 0 #.apply(lambda val: eq(x=val,y=0))
land = wat == 0 #.apply(lambda val: eq(x=val,y=0))

# masking data

datacube=datacube.mask(mask0)

# %% C mask calculation
asd=0
while asd==0:
    try:
        connection = openeo.connect("https://openeo-dev.vito.be")
        connection.authenticate_oidc()

        print('C mask calculation')
        
        m=datacube.mean_time()
        s=datacube.apply_dimension(process='sd',dimension='t')

        cv=s/m
        cv=cv.mask(land)
        cv=cv.drop_dimension("t")
        
        val = cv.aggregate_spatial(rect, lambda pixels: array_create([quantiles(pixels,q=20)]))
        print('extracting fifth percentile')
        val=val.execute()
        val=val[0][0]
        
        Cmask=cv<float(val)
        Cmask=Cmask==0
        
        C=datacube.mask(Cmask)
        print('extracting masked dataset')
        C=C.aggregate_spatial(rect, "mean")
        print('extracting masked timeseries')
        C=C.execute()
        
        asd=1
    except Exception as e: 
        print(e)
        print('outtime. reload')
            

# %% download datacube

datacube=datacube.mask(wat)

asd=0
while asd==0:
    try:
        connection = openeo.connect("https://openeo-dev.vito.be")
        connection.authenticate_oidc()

        print('M mask calculation')
        print('full datacube download')
        datacube.download("data0.nc", format="netcdf")
        

        asd=1
    except Exception as e: 
        print(e)
        print('outtime. reload')


# %% reading data


DC = list(C)
DC = np.array ([datetime.strptime(d[0:10],'%Y-%m-%d') for d in DC])
C = list(C.values())
ID = [len(sublist[0])!=0 for sublist in C]
DC = DC[ID]
C = np.array([item for sublist in C for item in sublist[0]]).astype(float)


C[np.isnan(C)]=-9999

IP=np.empty([DC.size,2]).astype(object)
IP[:,0:1]=-9999

IP[:,0]=C
D=[d.toordinal() for d in DC]
IP[:,1]=D

# %% calculating Spearman openeo

connection = openeo.connect("https://openeo-dev.vito.be")
connection.authenticate_oidc()

spearman_udf = load_udf('spearman_pixels_eo.py')
spearman_udf=spearman_udf.replace("C=0",'C=np.array('+str(IP[:,0].tolist())+')')
spearman_udf=spearman_udf.replace("D=0",'D=np.array('+str(IP[:,1].tolist())+')')

Mmask = datacube.apply_dimension(code=spearman_udf, runtime='Python')
Mmask.download("Mmask0.nc", format="netcdf")

# %% calculating Spearman locally

cube=xarray.open_dataset('data0.nc')
cube=XarrayDataCube(cube.to_array(dim='bands'))
dat=cube.get_array()
    
IP[IP==-9999]=np.nan

D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d').toordinal() for d in dat.t.values])
mat0=dat.values[1,:,:,:].astype(float)

ID=np.where(np.in1d(D, IP[:,1]))[0]
res=np.zeros([1,mat0.shape[1],mat0.shape[2]])
pix=IP[ID,0:1].astype(float).transpose()
pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
pix=np.moveaxis(pix,[0,1],[-2,-1])

for i in range(0,1):
    mat2=mat0.copy()
    pix2=pix[i,:,:,:]
    ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
    """
    mat2[ID]=999999
    pix2[ID]=999999
    mat2=rankdata(mat2,axis=0)
    pix2=rankdata(pix2,axis=0)
    """
    mat2[ID]=np.nan
    pix2[ID]=np.nan

    res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))

Mmask2=res[0,:,:]

# %% comparing results
data= Dataset("Mmask0.nc", "r")
mat=data.variables['B08'][:].data
plt.figure()
plt.plot(Mmask2.flatten(),mat.flatten(),'o')
plt.xlabel('local spearman')
plt.ylabel('openeo spearman')

# -*- coding: utf-8 -*-
"""
Created on Wed May  4 11:04:57 2022

@author: Paolo
"""
from openeo.udf import XarrayDataCube
from typing import Dict
import numpy as np
from datetime import datetime
from scipy.stats import rankdata
import xarray

def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
    dat=cube.get_array()
    
    C=0
    D=0
    
    IP=np.empty([D.size,5]).astype(float)
    IP[:,0:1]=np.nan

    IP[:,0]=C
    IP[:,1]=D

    IP[IP==-9999]=np.nan
    
    D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d').toordinal() for d in dat.t.values])
    mat0=dat.values[:,0,:,:].astype(float)

    ID=np.where(np.in1d(D, IP[:,1]))[0]
    res=np.zeros([1,mat0.shape[1],mat0.shape[2]])
    pix=IP[ID,0:1].astype(float).transpose()
    pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
    pix=np.moveaxis(pix,[0,1],[-2,-1])

    for i in range(0,1):
        mat2=mat0.copy()
        pix2=pix[i,:,:,:]
        ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
        """
        mat2[ID]=999999
        pix2[ID]=999999
        mat2=rankdata(mat2,axis=0)
        pix2=rankdata(pix2,axis=0)
        """
        mat2[ID]=np.nan
        pix2[ID]=np.nan

        res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))

    Mmask=res[0,:,:]
    out = xarray.DataArray(data=Mmask,dims=["x", "y"], coords={'x': (['x'],dat.x.values,dat.x.attrs ),'y': (['y'], dat.y.values,dat.y.attrs)})
    
    
    return XarrayDataCube(out)

1 Like

Can you also show the resulting images of both approaches? Maybe there there is axes swapping in play here, or small (sub)pixels shifts/rescaling/interpolation?


Sadly the errors seems quite distributed. Maybe it is due to an uncorrect identification of the nan values? or again, due to an issue in the function rankdata?


I tried to repeat the analysis by commenting the rankdata operation (as it was in the code I posted) to obtain pearson correlation, but the issue is not resolved, therefore the fault is not of the “rankdata” operation

Hi, a small update. I’ve been trying your code out locally on my system, and so far I could not really reproduce the weird differences between local and openeo spearman values.
I just get a clean one-on-one mapping:
Screenshot from 2022-05-18 18-18-30

One thing that might be different with an impact are library versions. The versions available in the UDF runtime are listed here: https://openeo.vito.be/openeo/1.0/udf_runtimes
e.g.:

netCDF4 1.5.8
numpy 1.18.5
pandas 1.3.5
rasterio 1.1.8
scipy 1.8.0
xarray 0.16.2
...

Could you check if the versions you have locally differ a lot from these?
(My local versions are pretty close)

This is indeed a good news, since it means that openeo works fine. I’m comparing my version with the one you sent me and there are indeed some differences:
First my python version is 4.11 (instead of 3.8.8).
There are also a few packages that I did not have installed (rasterio, geopandas and openeo_driver, which I don’t know where to download).
The others where quite similar (netCDF4 1.5.7 instead if 1.5.8; Numpy 1.21.5 instead of 1.18.5; openeo 0.10.0 instead of 0.10.1a1.dev20220516+121; pandas 1.4.1 instead of 1.3.5; pyproj 2.6.1.post1; scipy 1.7.3 instead of 1.8.0; shapely 1.7.1 instead of 1.8.2) beside xarray, since I have the version 2022.3.0 while in the udf there is the version 0.16.2.

You did not change anything of the script I sent you, am I right?
Maybe you could share the yml of you environment. In the meanwhile, I’ll try to downgrade my xarray.

Well, I did change some bits of your script to see intermediate results and better understand what is going on (some sections are quite cryptic :wink: ) .
I also changed the time window to have a smaller data set to work with and ease debugging, but apparently that caused the issue to disappear. I just reran your original code for your original time window, and I do reproduce your problem:

some minor in-between notes:

  • I noticed the CLD and CLP bands have a different range: CLP has [1, 255], while CLD has [1, 100], for the cloud masking you use the same threshold maskthr=50 which might not give you the results you expect
  • I see you try to reconnect with connection = openeo.connect( ...); authenticate_oidc() in a couple of places in your script. Note that this does not work as you intend (I think). You create datacubes starting from connection.load_collection(... and these datacube keep track of that original connection. Even if you overwrite that connection variable with a new openeo.connect() call, the original connection is still preserved in and used by the datacube objects. So it is useless to do connection = openeo.connect() if you don’t actually use that new connection variable value in some way

I’m sorry about that, I am accostumed to work by myself and I never respect the python “code of conduct” ^^

That is quite sad. I am trying to ultimate my calculation for the LPS presentation, therefore for now I will try to show both the results I obtain by using UDF and by downloading the data.

Thanks for this, since it is noted as “probability” in the description I just interpreted it as a percentage. I’ll change the value accordingly in my code!

I noticed that if I wait too much to run the code sections, the openeo connection timeout, and I tried to use this expedient just to renew it. Is there a more “correct” way?

the good news is that I have a parameter now (time window) to play with and zoom in on the problem. I hope I can find more details today, and we can resolve it before LPS

If you get a timeout, you don’t necessarily have to reconnect. Just retrying with an existing connection should work. If you really want a new connection, you should also rebuild all datacubes you created with the old one to eliminate all traces of the old connection. But again: in most cases you just need to call openeo.connect() just once in a script and retry .download() or .execute() calls if you get timeouts

small status update
I’ve been playing with the time window (temp_ext variable at the top) and the resulting scatter plots behave pretty weird as a function of that parameter (used temp_ext is in the plot title):

Screenshot from 2022-05-19 10-35-19 Screenshot from 2022-05-19 11-03-10 Screenshot from 2022-05-19 11-08-28 Screenshot from 2022-05-19 11-12-12

These ellipse-shaped structures are pretty unexpected and suspicious here

feel free to confirm this on your end

I have tried the period 2018-02-01 to 2018-04-01 and I found the same weird ellipse. There’s something fishy about it :slight_smile:

I just noticed this in the code for local calculation:
Screenshot from 2022-05-20 14-32-51

Note how the y and x dimensions are swapped. Not sure if it is very relevant here, but just posting it here.