Array memory error

Hello all

I am developing a Gaussian Process regression python package (pyeogpr) running with openEO.

The package needs to retrieve biophysical trait maps, yielding two bands: variable and uncertainty. To infer these the package uses Gaussian Process regression, with matrix/array mathematics.

However, due to the back-end chunking up the satellite image into blocks of 128, the matrix multiplication gets memory inefficient. Is there a ways to solve this? For the variable band it works smooth, however for uncertainty the calculation is more complex and the batch job fails.

Job id: j-240827f758f84022aa3e1771fdc335c1

The code seems to fail when I declare the “Uncertainty” variable. Please help

udf_cloud = openeo.UDF("""

import importlib
import os
import sys
import time


dir_path = 'tmp/venv'
sys.path.insert(0, dir_path)

python_files = [f[:-3] for f in os.listdir(dir_path) if f.endswith('.py') and f !="__init__.py"]

for lib in python_files:
    globals()[lib] = __import__(lib)


from configparser import ConfigParser
from typing import Dict
from openeo.metadata import Band, CollectionMetadata
from openeo.udf import XarrayDataCube, inspect

import numpy as np
import xarray as xr
from pathlib import Path
from openeo.udf.debug import inspect

chunks = 128
def broadcaster(array):
    return np.broadcast_to(array[:, np.newaxis, np.newaxis], (10, chunks, chunks))

init_xr = xr.DataArray()
init_xr_u = xr.DataArray()

def apply_datacube(cube: xarray.DataArray, context: dict) -> xarray.DataArray:

    pixel_spectra = (cube.values)

    sensor = context["sensor"]
    variable = context["biovar"]
    model = globals()[sensor + "_" + variable]

    hyp_ell_GREEN = broadcaster(model.hyp_ell_GREEN)
    mx_GREEN = broadcaster(model.mx_GREEN.ravel())
    sx_GREEN = broadcaster(model.sx_GREEN.ravel())
    XDX_pre_calc_GREEN_broadcast = np.broadcast_to(model.XDX_pre_calc_GREEN.ravel()[:,np.newaxis,np.newaxis],(model.XDX_pre_calc_GREEN.shape[0], chunks, chunks))


    im_norm_ell2D_hypell  = ((pixel_spectra - mx_GREEN) / sx_GREEN) * hyp_ell_GREEN
    im_norm_ell2D  = ((pixel_spectra - mx_GREEN) / sx_GREEN)

    PtTPt = np.einsum('ijk,ijk->ijk', im_norm_ell2D_hypell, im_norm_ell2D) * (-0.5)
    PtTDX = np.einsum('ij,jkl->ikl', model.X_train_GREEN,im_norm_ell2D_hypell)
    arg1 = np.exp(PtTPt[0]) * model.hyp_sig_GREEN

    k_star_im = np.exp(PtTDX - (XDX_pre_calc_GREEN_broadcast * (0.5)))
    mean_pred = (np.einsum('ijk,i->jk',k_star_im, model.alpha_coefficients_GREEN.ravel()) * arg1) + model.mean_model_GREEN

    k_star_uncert_im = np.exp(PtTDX - (XDX_pre_calc_GREEN_broadcast * (0.5))) * arg1
    k_star_uncert = np.expand_dims(k_star_uncert_im, axis=0)

    Vvector = np.einsum('ij,jabc->ibc', model.Linv_pre_calc_GREEN, k_star_uncert)#.reshape(-1,256,256)
    #the code seems to fail here
    Uncertainty = math.sqrt(abs(np.expand_dims(model.hyp_sig_GREEN, axis=0) - np.dot(Vvector,Vvector))) # this should be unc. Currently overreaching memory limits


    init_xr = mean_pred
    # init_xr_u = Uncertainty
    returned = xr.DataArray(init_xr)
    # returned = xr.DataArray(np.stack([init_xr, init_xr_u], axis=0),
    # dims=["band", "x", "y"],
    # coords={"band": ["Variable", "Uncertainty"]})

    return returned
""",context={"from_parameter": "context"})

sensor = "SENTINEL2_L2A"
biovar = "CNC_Cab"
context = {"sensor": sensor,
           "biovar":biovar}



S2_cloud = s2_cube.apply_dimension(process=udf_cloud,
                                   dimension="bands",
                                   context =context)#.filter_bands(bands = ["B02"])

S2_cloud_process = S2_cloud.execute_batch(
    title=f"{sensor} _ {biovar}",
    outputfile=f"{sensor} _ {biovar}.nc",
    job_options = {
        'executor-memory': '10g',
        'udf-dependency-archives': [
            'https://github.com/daviddkovacs/pyeogpr/raw/main/models/GPR_models_bulk.zip#tmp/venv'
        ]
    }
)

Hi,

It looks like that in this line you create a (366, 128, 366, 128) array:

Uncertainty = math.sqrt(abs(np.expand_dims(model.hyp_sig_GREEN, axis=0) - np.dot(Vvector,Vvector))) # this should be unc. Currently overreaching memory limits

(366 * 128 * 366 * 128) * 8 bytes = 17557880832 bytes = 16.35 GB which is over the maximum executor memory we would advise for our backend. I think the best way forward is to use the apply_neighborhood process which allows you to define your own chunk size. It is mostly identical to the process you are already using and shouldn’t require any changes in your UDF code. That way you can hopefully reduce the required size for the Uncertainty matrix to something between 1-7Gb.

Another solution could be to use a np.dtype other than np.float64 for your intermediary calculations:

np.float64 uses 8 bytes
np.float32 uses 4 bytes
np.float16 uses 2 bytes

As long as your result uses np.float64.

Hi Jeroen, thanks for your reply

I would still not want to use apply_neighborhood as it does not automatically handle multi-temporal datacubes. It creates an extra dimension as the 0th dimension the array of cube.values which makes matrix algrebra extremely difficult for 4-5-6dimensional arrays. On the figure I attach a S2 L2A datacube where the 0th dimension is the time iteratations:

image

Job id:
j-240828fe043445ce90df925021cfde0d

Now I am trying the method of using float16 as numpy arrays with my original apply_dimension method, but this way the batch job is stuck and frozen for hours now, but it still shows it ran for a few minutes…


job id:
j-240829a9da1942898c0300cc2377c64e

Please let me know if there are any other ways to reduce the memory size and continue working properly
Dávid

You can specify that you only want one date in the apply_neighborhood chunks. In the following example your input data cube will only have (bands, y, x) as dimensions:

segmentationband = ndviband.apply_neighborhood(
    process=openeo.UDF.from_file("udf_segmentation.py"),
    size=[
        {"dimension": "t", "value": "P1D"},
        {"dimension": "x", "value": 64, "unit": "px"},
        {"dimension": "y", "value": 64, "unit": "px"},
    ],
)

I can verify that your previous job was processing the UDF when it was cancelled, but I can’t see if it was hanging or just taking a long time. Hopefully the trick with apply_neighborhood can help you out here.

Hi Jeroen,
Thanks for your reply. Is there a way to process multi-temporal datacube with apply_neighborhood? My code is designed to process time series data, thus it would be useless for me to process only 1 temporal step.

If your original datacube has dimensions (18,10,1024,1024) and you use the following size in apply_neighborhood:

size=[
    {"dimension": "t", "value": "P1D"},
    {"dimension": "x", "value": 512, "unit": "px"},
    {"dimension": "y", "value": 512, "unit": "px"},
],

Then the UDF will run 18 * 2 * 2 times. Where each UDF receives a unique (1,10,512,512) chunk out of your original data cube. So, once for each date multiplied by once for each 512x512 spatial chunk in the 1024x1024 data cube. That way you should be able to process your data cube with a time dimension. Or do you mean with multi-temporal that you are using multiple time dimensions?

Note that in the previous example the (1,10,512,512) chunks that each UDF receives are normally simplified to (10,512,512) chunks. So the time dimension is automaticallly removed for ease of use.

Hi Jeroen,

I have now tried the method you showed, and the processing is currently stuck for hours now. Can you please check?

The Job ID is :
j-240902f0312542b282b657aa4a8223ff

It looks like the job failed on:

File "", line 74, in apply_datacube
AttributeError: 'float' object has no attribute 'astype'

This did happen around 1.9 h into the job while the previous aggregate_temporal step was already done after about 1 minute. So it does seem that your UDF code is doing a lot of processing. How long does it take you to process one (10,512,512) chunk locally?

Hi Jeroen,

yesterday, this process was running for almost 8hrs when it gave me the error. I do local processing of the same matrix algebra on a downloaded S2 tile in np.array format and it runs within 40-50mins. The uncertainty of the ML model requires a large array computation, but it should not be this long to process on openEO. Do you have any solutions why it is taking so long? is there a way to optimize more rapid processing of matrix algebra?

Hi David,

the backend executes python code just like it would run locally, but it might be utilizing spare cpu’s. On the backend, it effectively parallelize the work, so we are processing multiple chunks on the same machine. Hence if your algorithm is cpu bound, it might take more time compared to your laptop.

The individual task time before the error is between 1.4 hours and 1.9 hours.
The backend automatically retries tasks before giving up, which leads to the long time you are seeing before getting an error.

It is possible to assign more cpu resources to your jobs:

 job_options={
        "executor-cores": "8",
        "task-cpus": "8",
        "executor-memoryOverhead": "2g",
    }

This is for instance done in this FuseTS notebook as well:

If the algorithm allows it, I would also consider further reducing the chunk size to 64x64.

Hi Jeroen,

Thanks for your feedback. The code you provided only works on VITO back ends. I have been testing it the past few days, however I keep getting this error:

OpenEO batch job failed: Exception during Spark execution: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 4 times, most recent failure: Lost task 0.3 in stage 34.0 (TID 120) (epod139.vgt.vito.be executor 32): ExecutorLostFailure (executor 32 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding physical memory limits. 5.1 GB of 4 GB physical memory used. Consider boosting spark.executor.memoryOverhead. Driver stacktrace:

Job ID:
j-2409056e483c4af7924e432ad58c2863

The job still fails after 2-3 hours. I also reduced the chunk size to 64.

Thanks
Dávid

Hi David,
the message recommends to increase memory overhead. This is needed when working with udf’s that require more memory.
You can modify the relevant job option, for instance:
“executor-memoryOverhead”: “6g”

More documentation on job options is available here:

Hi @jeroen.dries

I have been testing the process you mentioned the past few days. However, I get similar errors as previously, even with the increased memoryOverhead.
job id:
j-2409093aa7b74dae925fc2efc910920e

moreover, even a smal ROI is processed and I ran out of credits (this runs on VITO backends as CDSE backends dont support 8 CPU cores)

Do you know any way to process this? I was thinking of a way of somehow bypassing the “chunking”, in order to reduce the memory allocated for an array…
Thanks for you cooperation
Dávid

Hi David,
chunking reduces the input size, so the more chunking, the better for memory and parallellization.

The underlying problem is that your python library somehow uses a lot of memory. This is not uncommon, but requires knowledge of the specific library in able to optimize it.

One generic trick that you can however try, is to set an overall memory limit in Python, in your udf code:

import resource
resource.setrlimit(resource.RLIMIT_AS, (soft_limit_in_bytes, hard_limit_in_bytes))

Try setting the limit in bytes to something lower than the memory overhead you assign.

To reduce credit use, try using TERRASCOPE_S2_TOC_V2 explicitly for your testing.

Hi Jeroen,

I have been testing the inputs you suggested. However, I still encounter some issues.
Checking the limits with:

soft, hard = resource.getrlimit(resource.RLIMIT_AS)

both limits were found to be: 1577058304 thus, setting to these limits, the process still gives me the error: " Unable to allocate 887. MiB for an array with shape (337, 64, 337, 64) and data type float16"

When I increase this limit with even 1 byte, I get the error: “ValueError: not allowed to raise maximum limit”

Job ID:
j-24091682d4dc43e0a2c30636e15accae

Thanks for your help
Dávid

Hi Dávid,

j-24091682d4dc43e0a2c30636e15accae looks like a job on CDSE rather than OpenEO Platform; could you try it there?

Cheers,
Jan