Reducing masks in openEO

Hi all!

I want to stack filter, e.g. using openeo.processes.product to reduce multiple DataCubes with (I think) boolean values, as I used some logical operators as openeo.processes.gt to create the masks.

Currently, reducing using product is not support (I see that in the error message). Is there another way to do this? Or is this a type error that I am having?

Error:

OpenEoApiError: [500] Internal: Failed to process synchronously on backend vito: OpenEoApiError("[501] FeatureUnsupported: Reducer 'product' not supported")

Code:

from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from openeo.processes import count, product, gt
from typing import Dict, Union, List

import pathlib

vito_url: str = "https://openeo.vito.be/openeo/1.0"
con: Connection = connect("openeo.cloud")
con.authenticate_oidc(provider_id="egi")

out_dir = pathlib.Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

denia_harbour_bbox: Dict[str, Union[float, str]] = {"west": 0.10594089795383788, "east": 0.12937267590793944, "south": 38.83464299556706, "north": 38.85035302841166, "crs": "EPSG:4326"}
temporal_extent: List[str] = ["2021-01-01", "2021-04-01"]

collection = ("TERRASCOPE_S2_TOC_V2", ["B06", "B05", "B03"])
band_names: str = ["swir1", "nir", "green"]

dc: DataCube = con.load_collection(
        collection_id=collection[0],
        spatial_extent=denia_harbour_bbox,
        temporal_extent=temporal_extent,
        bands=collection[1]
    ).add_dimension(name="source_name", label=collection[0], type="other") \
    .rename_labels(dimension="bands", source=collection[1], target=band_names)

# Create bucketed DC based on percentile of images
t_bucketed_dc: DataCube = dc \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: quantiles(data, probabilities=[percentile]),
        labels=[t_int[0] for t_int in t_intervals]
    )

dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq=f"2MS")
t_intervals = [[str(d), str(dr[i+1])] for i, d in enumerate(dr[:-1])]

count_dc: DataCube = dc.band("green") \
    .multiply(0).add(1) \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: sum(data),
        labels=[t_int[0] for t_int in t_intervals]
    )

mask: DataCube = count_dc.apply(lambda val: lt(x=val, y=3))
# Try to reduce over time to filter for missing images at any timestep
mask_no_t = mask.reduce_dimension(dimension="t", reducer=product)

dc = t_bucketed_dc.mask(mask_no_t)

dc.download(out_dir / "test2.nc", format="netcdf")

Product seems like a perfectly valid reducer, so lets try to add support for it:

It will appear on our dev instances first, will need to check if there’s already an openeo.cloud url for that.

Also a good reminder to raise priority on refactoring that bit of code entirely, to be more generic about supporting reduce over time.

2 Likes

Support for the product reducer should be available in production. Please let us know if you still see issues!

1 Like

Could it be that product as a reducer works very slowly?
I am not even executing the request to the backend, but still my editor seems to hang indefinitely

import pathlib
from typing import Dict, Union, List

from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from openeo.processes import count, product, gt, quantiles
import pandas as pd

con: Connection = connect("openeo.cloud")
con.authenticate_oidc(provider_id="egi")

out_dir = pathlib.Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

denia_harbour_bbox: Dict[str, Union[float, str]] = {"west": 0.10594089795383788, "east": 0.12937267590793944, "south": 38.83464299556706, "north": 38.85035302841166, "crs": "EPSG:4326"}
temporal_extent: List[str] = ["2021-01-01", "2021-04-01"]

collection = ("TERRASCOPE_S2_TOC_V2", ["B06", "B05", "B03"])
band_names = ["swir1", "nir", "green"]
percentile = 0.2

dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq="MS")
t_intervals = [[str(d), str(dr[i+1])] for i, d in enumerate(dr[:-1])]

dc: DataCube = con.load_collection(
        collection_id=collection[0],
        spatial_extent=denia_harbour_bbox,
        temporal_extent=temporal_extent,
        bands=collection[1]
    ).add_dimension(name="source_name", label=collection[0], type="other") \
    .rename_labels(dimension="bands", source=collection[1], target=band_names)

# Create bucketed DC based on percentile of images
t_bucketed_dc: DataCube = dc \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: quantiles(data, probabilities=[percentile]),
        labels=[t_int[0] for t_int in t_intervals]
    )

print(len(t_intervals))

# count_dc: DataCube = dc.band("green") \
count_dc: DataCube = dc \
    .multiply(0).add(1) \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: sum(data),
        labels=[t_int[0] for t_int in t_intervals]
    )

mask: DataCube = count_dc.apply(lambda val: lt(x=val, y=6))
# Try to reduce over time to filter for missing images at any timestep
mask_no_t = mask.reduce_dimension(dimension="t", reducer=product)

Hi Jaap,

thanks for the report. I just gave it a shot, too. It seems the issue is not product itself as it also hangs for sum, mean, and other reducers. So I assume there’s something fishy going on beforehand and there are some things that look suspicious to me:

  1. dc.multiply(0).add(1) - I assume you want to multiply and add to all pixels in the data cube? Then you’d need to do that in apply.
  2. You are using sum and lt without importing them, so I assume it hangs here because it tries to use Python’s internal sum function. I must admit this is not very obvious and could be a common pitfall for users, so we may want to check that in the Python client and reject passing Python’s internal functions. @stefaan.lippensOverloading pythons internal functions can easily lead to confusion through hanging processes · Issue #262 · Open-EO/openeo-python-client · GitHub

I haven’t fixed the first issue in my code and instead just removed it for now, but if I simply import sum and lt, then it doesn’t hang for me any longer.

Best,
Matthias

The following code works for me (i.e. the JSON process graph looks good to me):

import pathlib
from typing import Dict, Union, List

from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from openeo.processes import count, product, quantiles, sum, lt, add
import pandas as pd

con: Connection = connect("openeo.cloud")
con.authenticate_oidc(provider_id="egi")

out_dir = pathlib.Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

denia_harbour_bbox: Dict[str, Union[float, str]] = {"west": 0.10594089795383788, "east": 0.12937267590793944, "south": 38.83464299556706, "north": 38.85035302841166, "crs": "EPSG:4326"}
temporal_extent: List[str] = ["2021-01-01", "2021-04-01"]

collection = ("TERRASCOPE_S2_TOC_V2", ["B06", "B05", "B03"])
band_names = ["swir1", "nir", "green"]
percentile = 0.2

dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq="MS")
t_intervals = [[str(d), str(dr[i+1])] for i, d in enumerate(dr[:-1])]

dc: DataCube = con.load_collection(
        collection_id=collection[0],
        spatial_extent=denia_harbour_bbox,
        temporal_extent=temporal_extent,
        bands=collection[1]
    ).add_dimension(name="source_name", label=collection[0], type="other") \
    .rename_labels(dimension="bands", source=collection[1], target=band_names)

# Create bucketed DC based on percentile of images
t_bucketed_dc: DataCube = dc \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: quantiles(data, probabilities=[percentile]),
        labels=[t_int[0] for t_int in t_intervals]
    )

print(len(t_intervals))

# count_dc: DataCube = dc.band("green") \
count_dc: DataCube = dc \
    .apply(process = lambda x: add(x = multiply(x = x, y = 0), y = 1)) \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: sum(data),
        labels=[t_int[0] for t_int in t_intervals]
    )

mask: DataCube = count_dc.apply(process = lambda x: lt(x=x, y=6))
# Try to reduce over time to filter for missing images at any timestep
mask_no_t = mask.reduce_dimension(dimension="t", reducer=product)

Please verify that my changes actually reflect your intention.

1 Like

Thank you so much @m.mohr !

I messed up my imports, so there was my mistake. Any reason why you use .apply() over .product() and .add()?

I’ve used apply because I’m more used to the openEO processes used internally rather than to the Python Client implementation of it. I’ve just looked up the Python Client implementation and it seems they implemented what you used as a shortcut for apply so the result should be the same and you can use product and add as you have them in your original example.

Hi sorry for late response, I’m just back from holidays.

I’m still looking into the sum issue, but some notes about .multiply(0).add(1) vs .apply():

Both will/should work indeed: .multiply(0) will be converted automaticaly by the client to an apply call:

>>> dc.multiply(0).flat_graph()
...
  'apply1': {
    'process_id': 'apply',
    'arguments': {
      'process': {'process_graph': {
        'multiply1': {
          'process_id': 'multiply', 
          'arguments': {'x': {'from_parameter': 'x'}, 'y': 0},

However, if you do dc.multiply(0).add(1), two separate apply calls will be generated:

>>> dc.multiply(0).add(1).flat_graph()
...
 'apply1': {'process_id': 'apply',
  'arguments': {'data': {'from_node': 'renamelabels1'},
   'process': {'process_graph': {'multiply1': {'process_id': 'multiply',
   ...
 'apply2': {'process_id': 'apply',
  'arguments': {'data': {'from_node': 'apply1'},
   'process': {'process_graph': {'add1': {'process_id': 'add',

So if you want to apply multiple operators, it’s recommended for performance reasons to do it with a single explicit .apply() instead of autogenerating multiple with .multiply(), .add(), …

Moreover, this suggestion of @m.mohr can be made more readable:

dc.apply(process = lambda x: add(x = multiply(x = x, y = 0), y = 1)) \

The lambda/callback handling in the client supports regular python operators, so this will be equivalent:

dc.apply(process = lambda x: x * 0 + 1)
2 Likes

Ok I found the problem with builtin sum. Apparently sum(data) causes the construction of an (infinite) process graph that does infinite summation

array_element(data, 0) + array_element(data, 1) + array_element(data, 2)
 + array_element(data, 3) + ...

See Overloading pythons internal functions can easily lead to confusion through hanging processes · Issue #262 · Open-EO/openeo-python-client · GitHub for more info on detecting this and comparable issues

1 Like