UDF on vector cube

We are trying to run a UDF with a vector cube as an input, but anything we try results in an error.

First, we tried to use the function apply_vectorcube:

{
  "process_graph": {
    "1": {
      "process_id": "load_url",
      "arguments": {
        "format": "PARQUET",
        "url": "https://zenodo.org/records/14513586/files/airquality.no2.o3.so2.pm10.pm2p5_4.annual_pnt_20150101_20231231_eu_epsg.3035_v20240718.parquet?download=1"
      }
    },
    "run2": {
      "process_id": "run_udf",
      "arguments": {
        "runtime": "Python",
        "context": {
          "time": [
            "2020-01-01T00:00:00Z",
            "2020-12-31T23:59:59Z"
          ],
          "pollutant": "O3",
          "station-type": "rural-background"
        },
        "data": {
          "from_node": "1"
        },
        "udf": "# Filter station geoParquet file\nimport geopandas\nimport logging\nfrom openeo.udf import XarrayDataCube\n\n# Configure logging\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(message)s'  # Simple format showing only the message\n)\nlogger = logging.getLogger(__name__)\n\ndef lnp(message):\n    \"\"\"Log and print the message\"\"\"\n    logger.info(message)\n    #print(message)\n\ndef apply_vectorcube(geometries, cube, context: dict):\n    lnp(geometries.head(10))\n    lnp(f\"Input cube shape: {cube.shape}\")\n    lnp(f\"Input cube dimensions: {cube.dims}\")\n    lnp(f\"Input cube coordinates: {list(cube.coords.keys())}\")\n    \n    return (geometries, cube)",
        "version": "3"
      }
    },
    "save9": {
      "process_id": "save_result",
      "arguments": {
        "data": {
          "from_node": "run2"
        },
        "format": "CSV"
      },
      "result": true
    }
  },
  "parameters": []
}

We then get the error:

Server error: Python exception while evaluating processing graph: openeo.udf.OpenEoUdfException: No UDF found.

But it’s clearly there, it seems that it’s trying to detect the signature but fails. Then we tried an Xarray option:

{
  "process_graph": {
    "1": {
      "process_id": "load_url",
      "arguments": {
        "format": "PARQUET",
        "url": "https://zenodo.org/records/14513586/files/airquality.no2.o3.so2.pm10.pm2p5_4.annual_pnt_20150101_20231231_eu_epsg.3035_v20240718.parquet?download=1"
      }
    },
    "run2": {
      "process_id": "run_udf",
      "arguments": {
        "runtime": "Python",
        "context": {
          "time": [
            "2020-01-01T00:00:00Z",
            "2020-12-31T23:59:59Z"
          ],
          "pollutant": "O3",
          "station-type": "rural-background"
        },
        "data": {
          "from_node": "1"
        },
        "udf": "# Filter station geoParquet file\nimport geopandas\nimport logging\nfrom openeo.udf import XarrayDataCube\n\n# Configure logging\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(message)s'  # Simple format showing only the message\n)\nlogger = logging.getLogger(__name__)\n\ndef lnp(message):\n    \"\"\"Log and print the message\"\"\"\n    logger.info(message)\n    #print(message)\n\ndef apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:\n    # Print context for debugging\n    lnp(f\"UDF CONTEXT: {context}\")\n    \n    # Get the DataArray and ensure the correct dimension order\n    original_array = cube.get_array()\n    lnp(f\"Input cube shape: {original_array.shape}\")\n    lnp(f\"Input cube dimensions: {original_array.dims}\")\n    lnp(f\"Input cube coordinates: {list(original_array.coords.keys())}\")\n\n    return cube\n",
        "version": "3"
      }
    },
    "save9": {
      "process_id": "save_result",
      "arguments": {
        "data": {
          "from_node": "run2"
        },
        "format": "CSV"
      },
      "result": true
    }
  },
  "parameters": []
}

We then get a different error:

Server error: Python exception while evaluating processing graph: TypeError: object of type 'NoneType' has no len() (Upstream ref: 'r-25040209373640c18814ecded8c12d21')

Could it be that we are trying to do run_udf without wrapping it in any type of apply? However, it’s a vector cube, and no apply process at the moment supports those. Is there any working example of a vector UDF?

Hi,

General question: on which documentation are/were you building here?

About the first case, basically this:

def apply_vectorcube(geometries, cube, context: dict):
    lnp(geometries.head(10))
    lnp(f"Input cube shape: {cube.shape}")

To fix the UDF detection: both geometries and cube should have a type annotation, e.g. along the lines of:

def apply_vectorcube(
    geometries: geopandas.geodataframe.GeoDataFrame, 
    cube: xarray.DataArray, 
    context: dict
):

The second attempt fails I guess because it found a raster cube UDF and tries to apply it to vector cube data.

On a more general note, I see you do a lot of effort to set up an use logging (basicConfig, lnp, print, etc). This will not work as expected in a UDF context. In general, it is recommended to just use the inspect() helper provided by the openeo package as discussed at User-Defined Functions (UDF) explained — openEO Python Client 0.40.0a1 documentation

Thanks, that’s useful! On the documentation, well, there isn’t much, so we are trying to figure it out as we go :smiley: For the signatures, I looked at the same page you just linked to, but I was not aware that they have to be that explicit, thanks. However, it still doesn’t seem to run for us, we get a different error message:

Server error: Python exception while evaluating processing graph: ValueError: The provided UDF expects a FeatureCollection and a datacube, but received None and None

Here’s the new process graph:

{
  "process_graph": {
    "1": {
      "process_id": "load_url",
      "arguments": {
        "format": "PARQUET",
        "url": "https://zenodo.org/records/14513586/files/airquality.no2.o3.so2.pm10.pm2p5_4.annual_pnt_20150101_20231231_eu_epsg.3035_v20240718.parquet?download=1"
      }
    },
    "run2": {
      "process_id": "run_udf",
      "arguments": {
        "runtime": "Python",
        "context": {
          "time": [
            "2020-01-01T00:00:00Z",
            "2020-12-31T23:59:59Z"
          ],
          "pollutant": "O3",
          "station-type": "rural-background"
        },
        "data": {
          "from_node": "1"
        },
        "udf": "# Filter station geoParquet file\nimport geopandas\nimport logging\nfrom openeo.udf import XarrayDataCube\nfrom openeo.udf import inspect\n\n# Configure logging\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(message)s'  # Simple format showing only the message\n)\nlogger = logging.getLogger(__name__)\n\ndef lnp(message):\n    \"\"\"Log and print the message\"\"\"\n    logger.info(message)\n    #print(message)\n\t\n\ndef apply_vectorcube(geometries: geopandas.geodataframe.GeoDataFrame, cube: xarray.DataArray, context: dict):\n    inspect(data=[cube.shape], message=\"UDF logging shape of my cube\")\n    lnp(geometries.head(10))\n    lnp(f\"Input cube shape: {cube.shape}\")\n    lnp(f\"Input cube dimensions: {cube.dims}\")\n    lnp(f\"Input cube coordinates: {list(cube.coords.keys())}\")\n    \n    return (geometries, cube)\n",
        "version": "3"
      }
    },
    "save9": {
      "process_id": "save_result",
      "arguments": {
        "data": {
          "from_node": "run2"
        },
        "format": "CSV"
      },
      "result": true
    }
  },
  "parameters": []
}

And thanks for the inspect tip, we will check it out. The rather complex logging setup is so that it’s easier to have UDFs that can run both on the cloud and also locally, and produce the same debug output.

Can you include the job id (j-2504022343234254...) or request id (r-345982352353453...) of this failure? That makes it easier for us to look up relevant logs

That would be:

Server error: Python exception while evaluating processing graph: ValueError: The provided UDF expects a FeatureCollection and a datacube, but received None and None
error
Code: Internal
ID: r-25040215202742ff858182fe98806dbf

(Looping in colleague @jeroen.verstraelen who worked on apply_vectorcube before)

Hi, similarily to rastercubes you should use apply_dimension to run an UDF on a vectorcube. The following example should work for your usecase:

import openeo
from openeo.rest.datacube import UDF

conn = openeo.connect("openeo.dataspace.copernicus.eu").authenticate_oidc()
vectorcube = conn.load_url("https://zenodo.org/records/14513586/files/airquality.no2.o3.so2.pm10.pm2p5_4.annual_pnt_20150101_20231231_eu_epsg.3035_v20240718.parquet?download=1", format="parquet")
udf = UDF.from_file("vector_udf.py")
vectorcube = vectorcube.apply_dimension(process=udf, dimension="geometry")
vectorcube.execute_batch("udf_result.parquet", format="parquet")

However, I noticed a small bug in the backend related to this example and I pushed a fix just now. It should be live on the staging environment today, and on the production instance somewhere next week. The bug is only related to saving the vectorcube to disk, so you can still use the vectorcube for spatial filtering/vector_to_raster etc. in the meantime.

Hope that helps!