Hi, I am developing an udf to perform the Spearman correlation between a datacube and 4 timeseries.
The udf i developed is below:
from openeo.udf import XarrayDataCube
from typing import Dict
import numpy as np
from datetime import datetime
from scipy.stats import rankdata
import xarray
def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
dat=cube.get_array()
C=0
V=0
V2=0
W=0
D=0
D=np.array([datetime.fromordinal(d) for d in D])
IP=np.empty([D.size,5]).astype(object)
IP[:,0:4]=np.nan
IP[:,0]=C
IP[:,1]=V
IP[:,2]=V2
IP[:,3]=W
IP[:,4]=D
IP[IP==-9999]=np.nan
D = np.array ([datetime.strptime(str(d)[0:10],'%Y-%m-%d') for d in dat.t.values])
mat0=dat.values[1,:,:,:].astype(float)
ID=np.where(np.in1d(D, IP[:,4]))[0]
res=np.zeros([4,mat0.shape[1],mat0.shape[2]])
pix=IP[ID,0:4].astype(float).transpose()
pix=np.tile(pix,[mat0.shape[1],mat0.shape[2],1,1])
pix=np.moveaxis(pix,[0,1],[-2,-1])
for i in range(0,4):
mat2=mat0.copy()
pix2=pix[i,:,:,:]
ID=np.logical_or(np.isnan(pix2),np.isnan(mat2))
mat2[ID]=999999
pix2[ID]=999999
mat2=rankdata(mat2,axis=0)
pix2=rankdata(pix2,axis=0)
mat2[ID]=np.nan
pix2[ID]=np.nan
res[i,:,:]=(np.nansum((mat2-np.array([np.nanmean(mat2,0)]))*(pix2-np.array([np.nanmean(pix2,0)])),axis=0))/((np.nansum((mat2-np.array([np.nanmean(mat2,0)]))**2,axis=0)**0.5)*(np.nansum((pix2-np.array([np.nanmean(pix2,0)]))**2,axis=0)**0.5))
corrT=np.nanmax(res[0:3,:,:],axis=0)
corrW=res[3,:]
Mmask=np.logical_or(np.logical_and(corrT<=np.nanpercentile(corrT,10),corrW<=0.6),np.logical_and(corrW<=np.nanpercentile(corrW,10),corrT<=0.7))
out = xarray.DataArray(data=Mmask, coords={'x': (['x'],dat.x.values,dat.x.attrs ),'y': (['y'], dat.y.values,dat.y.attrs)})
out=XarrayDataCube(out)
return out
Thanks to @jeroen.dries I was able to pass some information to the UDF (the timeseries that I extract from openeo)
spearman_udf = load_udf('spearman_pixels.py')
spearman_udf=spearman_udf.replace("C=0",'C=np.array('+str(IP[:,0].tolist())+')')
spearman_udf=spearman_udf.replace("V=0",'V=np.array('+str(IP[:,1].tolist())+')')
spearman_udf=spearman_udf.replace("V2=0",'V2=np.array('+str(IP[:,2].tolist())+')')
spearman_udf=spearman_udf.replace("W=0",'W=np.array('+str(IP[:,3].tolist())+')')
spearman_udf=spearman_udf.replace("D=0",'D=np.array('+str(IP[:,4].tolist())+')')
And everything works fine if I apply the udf locally by downloading the datacube:
datacube=xarray.open_dataset('data.nc')
dat=XarrayDataCube(datacube.to_array(dim='bands'))
dat=dat.get_array()
But when I run the algorithm in openeo an error is raised:
mask = datacube.apply_dimension(code=spearman_udf, runtime='Python')
Mmask.download("Mmask.nc", format="netcdf")
Traceback (most recent call last):
Input In [6] in <cell line: 1>
Mmask.download("Mmask.nc", format="netcdf")
File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\datacube.py:1589 in download
return self._connection.download(cube.flat_graph(), outputfile)
File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:1028 in download
response = self.post(path="/result", json=request, expected_status=200, stream=True, timeout=timeout)
File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:173 in post
return self.request("post", path=path, json=json, allow_redirects=False, **kwargs)
File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:121 in request
self._raise_api_error(resp)
File ~\miniconda3\envs\openeo\lib\site-packages\openeo\rest\connection.py:152 in _raise_api_error
raise exception
OpenEoApiError: [500] unknown: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 41, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 522, in tile_function
result_data = run_udf_code(code=function, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 178, in run_udf_code
result_cube = func(data.get_datacube_list()[0], data.user_context)
File "<string>", line 47, in apply_datacube
IndexError: boolean index did not match indexed array along dimension 0; dimension is 1 but corresponding boolean dimension is 70
It seems to me that there are some differences between the downloaded datacube in netcdf format and the one online.
Do you have any advices on how to proceed?