Caching OpenEO results

Hi all,

When using a large initial dataset and performing some aggregation, I want to cache the results of the aggregation using “DataCube.save_result” and sending a batch job. (id = vito-1d885703-4970-4cac-a1a8-961f6e0296a3) In this case I use format="gtiff" and my backend is the openEO platform.
Then I want to load using DataCube.load_result. This works, but gives off a warning:

/opt/conda/lib/python3.9/site-packages/openeo/metadata.py:240: UserWarning: No cube:dimensions metadata
  complain("No cube:dimensions metadata")

A job that directly downloads this data (again with a batch job, id=vito-816e931b-aadd-4430-9760-70fc2eee8f31) gives the following error:

error processing batch job Traceback (most recent call last): File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/utils.py", line 28, in memory_logging_wrapper from spark_memlogger import memlogger ModuleNotFoundError: No module named 'spark_memlogger' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/job_registry.py", line 267, in _read data, stat = self._zk.get(path) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/kazoo/client.py", line 1165, in get return self.get_async(path, watch=watch).get() File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/kazoo/handlers/utils.py", line 75, in get raise self._exception kazoo.exceptions.NoNodeError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/job_registry.py", line 273, in _read data, stat = self._zk.get(path) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/kazoo/client.py", line 1165, in get return self.get_async(path, watch=watch).get() File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/kazoo/handlers/utils.py", line 75, in get raise self._exception kazoo.exceptions.NoNodeError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "batch_job.py", line 307, in main run_driver() File "batch_job.py", line 281, in run_driver run_job( File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/utils.py", line 30, in memory_logging_wrapper return function(*args, **kwargs) File "batch_job.py", line 334, in run_job result = ProcessGraphDeserializer.evaluate(process_graph, env=env, do_dry_run=tracer) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 262, in evaluate return convert_node(result_node, env=env) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 268, in convert_node return apply_process( File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1075, in apply_process args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())} File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1075, in <dictcomp> args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())} File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 273, in convert_node return convert_node(processGraph['node'], env=env) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 268, in convert_node return apply_process( File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1186, in apply_process return process_function(args=args, env=env) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1466, in load_result return env.backend_implementation.load_result(job_id=job_id, user=user, load_params=load_params, env=env) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/backend.py", line 498, in load_result for _, asset in self.batch_jobs.get_results(job_id=job_id, user_id=user.user_id).items() File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/backend.py", line 1355, in get_results job_info = self._get_job_info(job_id=job_id, user_id=user_id) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/backend.py", line 708, in _get_job_info job_info = registry.get_job(job_id, user_id) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/job_registry.py", line 186, in get_job job_info, _ = self._read(job_id, user_id, include_done=True) File "/data2/hadoop/yarn/local/usercache/jaapel/appcache/application_1643116788003_0783/container_e5013_1643116788003_0783_01_000002/venv/lib/python3.8/site-packages/openeogeotrellis/job_registry.py", line 275, in _read raise JobNotFoundException(job_id) openeo_driver.errors.JobNotFoundException: The batch job 'vito-1d885703-4970-4cac-a1a8-961f6e0296a3' does not exist.

I know I can directly download results with job.get_results, but for further processing, I need the load_result to work. Any ideas why it cannot find my previous job? And what is up with the warning that the cube has no dimensions metadata?

Thanks,
Jaap

Just to be sure: you are using the same connection to “openeo.cloud” (= openEO platform) in all cases?
Because “openeo.cloud” will add a prefix (vito- in your case) to the actual job id on the VITO backend.

The aggregator (which is the openEO backend that runs at “openeo.cloud”) should take care of adding/stripping this prefix so that you as a user do not have to worry about that, unless you are mixing “openeo.cloud” and “openeo.vito.be” connections yourself.

(FYI: this job-id-prefix handling for load_result in the aggregator was introduces in strip backend prefix from job_id in load_result · Issue #19 · Open-EO/openeo-aggregator · GitHub)

Hi Stefaan!
Yes I did use the same backend for both jobs!
Jaap

Could you try executing the process graph that uses the load_result synchronously instead of as a batch job. I suspect that the prefix-stripping does not work for batch jobs.

Like-wise, can you try running the process graph with load_result as batch job, but remove the vito- prefix yourself from the job id in load_result()?

It would be handy if you could confirm that both or either one of these workaround work.

  1. Synchronous download:
OpenEoApiError: [500] Internal: Failed to process synchronously on backend vito: OpenEoApiError('[500] unknown: /data/projects/OpenEO/9ab07fef-24de-454d-8009-c320ba21c7e3/openEO_2021-03-01Z.tif (No such file or directory)')
  1. Remove prefix works for my testing code!

The No cube:dimensions metadata warning is still there, and when continuing with a datacube from the load_result step, later processes throw an error:

---------------------------------------------------------------------------
MetadataException                         Traceback (most recent call last)
/tmp/ipykernel_82/269978305.py in <module>
      2 from typing import Tuple
      3 
----> 4 green: DataCube = t_bucketed_dc.band("green")
      5 nir: DataCube = t_bucketed_dc.band("nir")
      6 swir: DataCube = t_bucketed_dc.band("swir")

/opt/conda/lib/python3.9/site-packages/openeo/rest/datacube.py in band(self, band)
    439         :return: a DataCube instance
    440         """
--> 441         band_index = self.metadata.get_band_index(band)
    442         return self._reduce_bands(reducer=PGNode(
    443             process_id='array_element',

/opt/conda/lib/python3.9/site-packages/openeo/metadata.py in get_band_index(self, band)
    346 
    347     def get_band_index(self, band: Union[int, str]) -> int:
--> 348         return self.band_dimension.band_index(band)
    349 
    350     def filter_bands(self, band_names: List[Union[int, str]]) -> 'CollectionMetadata':

/opt/conda/lib/python3.9/site-packages/openeo/metadata.py in band_dimension(self)
    315         """Dimension corresponding to spectral/logic/thematic "bands"."""
    316         if not self.has_band_dimension():
--> 317             raise MetadataException("No band dimension")
    318         return self._band_dimension
    319 

MetadataException: No band dimension

I believe that the problem with load_result is that it doesn’t query collection metadata, like we do with load_collection. The consequence is that we get this client side errors later on when trying to use that metadata.

A quick fix would be to configure that metadata client side, but I believe the client can also query result metadata to get some insight into available bands?

Looking through the REST interface I cannot quickly find a way to get this metadata from client-side, can that be correct?
Mainly looking at the connection object. Also the ResultAsset metadata seem to not contain this information.
Then the next step would be to assign this metadata to the datacube, so I will need some help here.

Hi Jaap,
I had a look as well, and would suggest that we do not try to retrieve it, but fill it in client side. Here is an example that might work:

from openeo.metadata import CollectionMetadata
m = CollectionMetadata({
    "cube:dimensions": {
        "x": {"type": "spatial", "extent": [-10, 10]},
        "y": {"type": "spatial", "extent": [-56, 83], "reference_system": 123},
        "t": {"type": "temporal", "extent": ["2020-02-20", None]},
        "spectral": {"type": "bands", "values": ["r", "g", "b"]},
    },
    "summaries": {
        "eo:bands": [
            {"name": "r", "common_name": "red", "center_wavelength": 5},
            {"name": "g", "center_wavelength": 8},
            {"name": "b", "common_name": "blue"},
        ]
    }
})
yourcube.metadata = m

The band names are the most important, you may want to skip other things in your case.
Note that this is obviously a hack, the caching/load_result functionality is still something we’re working on over the next months, so this is a temporary solution, while we’re still building the full feature.

1 Like

@jeroen.dries when implementing, I used to refer to the previous job that had been completed and had the same job title. Looking at the openeo platform, my jobs have no titles, did something change in the last few weeks?

For now I can save some job data locally, but it is something I noticed

FYI: I fixed the issue that the back-end id was not stripped from job-ids in load_result calls in batch jobs. It should now work correctly on the development instance of the openeo platform aggregator at https://openeocloud-dev.vito.be/openeo/1.0/
It’s not yet deployed in production

1 Like

@jeroen.dries I finally came around to implement this. I took same metadata from the previous cube and inserted it into the loaded cube. Now the backend gives me a ProcessGraphInvalid exception.
Process graph looks like the following:

{'loadresult1': {'process_id': 'load_result',
  'arguments': {'id': '-6b3da423-5bc5-40f6-a0e7-3dd5a77b9565'}},
 'reducedimension1': {'process_id': 'reduce_dimension',
  'arguments': {'data': {'from_node': 'loadresult1'},
   'dimension': 'spectral',
   'reducer': {'process_graph': {'arrayelement1': {'process_id': 'array_element',
      'arguments': {'data': {'from_parameter': 'data'}, 'index': 2}},
     'arrayelement2': {'process_id': 'array_element',
      'arguments': {'data': {'from_parameter': 'data'}, 'index': 0}},
     'subtract1': {'process_id': 'subtract',
      'arguments': {'x': {'from_node': 'arrayelement1'},
       'y': {'from_node': 'arrayelement2'}}},
     'add1': {'process_id': 'add',
      'arguments': {'x': {'from_node': 'arrayelement1'},
       'y': {'from_node': 'arrayelement2'}}},
     'divide1': {'process_id': 'divide',
      'arguments': {'x': {'from_node': 'subtract1'},
       'y': {'from_node': 'add1'}}},
     'subtract2': {'process_id': 'subtract',
      'arguments': {'x': {'from_node': 'divide1'}, 'y': -0.6}},
     'divide2': {'process_id': 'divide',
      'arguments': {'x': {'from_node': 'subtract2'}, 'y': 1.2},
      'result': True}}}}},
 'resamplespatial1': {'process_id': 'resample_spatial',
  'arguments': {'align': 'upper-left',
   'data': {'from_node': 'reducedimension1'},
   'method': 'cubic',
   'projection': None,
   'resolution': 1000.0},
  'result': True}}

Any ideas where my mistake can be? Is there a process graph debugging tool?

Hi Jaap,
if you go to https://editor.openeo.cloud/
you can paste a json version of your process graph in the main window. This would already tell us if somethings wrong with that graph.

Otherwise it would be useful to know if you got this in a specific batch job, or on which endpoint.

thanks!
Jeroen

Hi @jeroen.dries,
After some trial and error, I got this process running by prefixing the json with "process_graph":.
Trying to send this job using the GUI, I got the following error (copied from the UI):

Invalid process graph specified.

    Code: ProcessGraphInvalid
    Data:
        Config:
            Transitional:
                Silent JSONParsing: ✔️
                Forced JSONParsing: ✔️
                Clarify Timeout Error: ❌
            Adapter: JavaScript Function
            Transform Request:
                JavaScript Function
            Transform Response:
                JavaScript Function
            Timeout: 0
            Xsrf Cookie Name: XSRF-TOKEN
            Xsrf Header Name: X-XSRF-TOKEN
            Max Content Length: -1
            Max Body Length: -1
            Validate Status: JavaScript Function
            Headers:
                Accept: application/json, text/plain, */*
                Content Type: application/json
                Authorization: Bearer oidc/egi <<REDACTED>>
            Method: post
            Response Type: json
            Url: /jobs
            Data: {"title":"testcache","description":"Testing load_result functionality","plan":"early-adopter","budget":null,"process":{"process_graph":{"loadresult1":{"process_id":"load_result","arguments":{"id":"-6b3da423-5bc5-40f6-a0e7-3dd5a77b9565"}},"reducedimension1":{"process_id":"reduce_dimension","arguments":{"data":{"from_node":"loadresult1"},"dimension":"spectral","reducer":{"process_graph":{"arrayelement1":{"process_id":"array_element","arguments":{"data":{"from_parameter":"data"},"index":2}},"arrayelement2":{"process_id":"array_element","arguments":{"data":{"from_parameter":"data"},"index":0}},"subtract1":{"process_id":"subtract","arguments":{"x":{"from_node":"arrayelement1"},"y":{"from_node":"arrayelement2"}}},"add1":{"process_id":"add","arguments":{"x":{"from_node":"arrayelement1"},"y":{"from_node":"arrayelement2"}}},"divide1":{"process_id":"divide","arguments":{"x":{"from_node":"subtract1"},"y":{"from_node":"add1"}}},"subtract2":{"process_id":"subtract","arguments":{"x":{"from_node":"divide1"},"y":-0.6}},"divide2":{"process_id":"divide","arguments":{"x":{"from_node":"subtract2"},"y":1.2},"result":true}}}}},"resamplespatial1":{"process_id":"resample_spatial","arguments":{"align":"upper-left","data":{"from_node":"reducedimension1"},"method":"cubic","projection":null,"resolution":1000}},"saveresult1":{"process_id":"save_result","arguments":{"data":{"from_node":"resamplespatial1"},"format":"gtiff","options":{}},"result":true}}}}
            Base URL: https://openeocloud.vito.be/openeo/1.0.0
        Request: Empty
        Response:
            Data:
                Code: ProcessGraphInvalid
                id: ccaf0b12-c1de-4245-a6e8-f1bc04b76ca3
                Message: Invalid process graph specified.
            Status: 400
            Status Text: Bad Request
            Headers:
                Content Length: 120
                Content Type: application/json
            Config:
                Transitional:
                    Silent JSONParsing: ✔️
                    Forced JSONParsing: ✔️
                    Clarify Timeout Error: ❌
                Adapter: JavaScript Function
                Transform Request:
                    JavaScript Function
                Transform Response:
                    JavaScript Function
                Timeout: 0
                Xsrf Cookie Name: XSRF-TOKEN
                Xsrf Header Name: X-XSRF-TOKEN
                Max Content Length: -1
                Max Body Length: -1
                Validate Status: JavaScript Function
                Headers:
                    Accept: application/json, text/plain, */*
                    Content Type: application/json
                    Authorization: Bearer oidc/egi <<REDACTED>>
                Method: post
                Response Type: json
                Url: /jobs
                Data: {"title":"testcache","description":"Testing load_result functionality","plan":"early-adopter","budget":null,"process":{"process_graph":{"loadresult1":{"process_id":"load_result","arguments":{"id":"-6b3da423-5bc5-40f6-a0e7-3dd5a77b9565"}},"reducedimension1":{"process_id":"reduce_dimension","arguments":{"data":{"from_node":"loadresult1"},"dimension":"spectral","reducer":{"process_graph":{"arrayelement1":{"process_id":"array_element","arguments":{"data":{"from_parameter":"data"},"index":2}},"arrayelement2":{"process_id":"array_element","arguments":{"data":{"from_parameter":"data"},"index":0}},"subtract1":{"process_id":"subtract","arguments":{"x":{"from_node":"arrayelement1"},"y":{"from_node":"arrayelement2"}}},"add1":{"process_id":"add","arguments":{"x":{"from_node":"arrayelement1"},"y":{"from_node":"arrayelement2"}}},"divide1":{"process_id":"divide","arguments":{"x":{"from_node":"subtract1"},"y":{"from_node":"add1"}}},"subtract2":{"process_id":"subtract","arguments":{"x":{"from_node":"divide1"},"y":-0.6}},"divide2":{"process_id":"divide","arguments":{"x":{"from_node":"subtract2"},"y":1.2},"result":true}}}}},"resamplespatial1":{"process_id":"resample_spatial","arguments":{"align":"upper-left","data":{"from_node":"reducedimension1"},"method":"cubic","projection":null,"resolution":1000}},"saveresult1":{"process_id":"save_result","arguments":{"data":{"from_node":"resamplespatial1"},"format":"gtiff","options":{}},"result":true}}}}
                Base URL: https://openeocloud.vito.be/openeo/1.0.0
            Request: Empty
        Is Axios Error: ✔️
        To JSON: JavaScript Function
        Code: ProcessGraphInvalid
        id: ccaf0b12-c1de-4245-a6e8-f1bc04b76ca3
        Links: Empty
    ID: ccaf0b12-c1de-4245-a6e8-f1bc04b76ca3

Something wrong the timeout error?

The process graph itself looks good, I guess some arguments are not correct. For example, the id in load_results looks suspicious. Is the id correct? It starts with a dash, maybe it would help to remove the dash?

Readding vito- prefix to the job id made sure that the job was valid!
Now that I am one step further, I get a new error:

OpenEoApiError: [500] unknown: [400] ProcessParameterInvalid: The value passed for parameter 'dimension' in process 'reduce_dimension' is invalid: got 'spectral', but should be one of [] (ref: f409cd49-d006-4752-a31f-e35b12705c28)

Maybe something wrong with the load_result metadata? I did add the metadata to the loaded cube as follows:

loaded_cube: DataCube = cached_cube._connection.load_result(
        # id=re.sub(r"vito", r"", job.job_id)  # Temp workaround for broken backend
        id=job.job_id
    )

    # Set metadata based on previous metadata with matching spatial and temporal extents
    bands: List[Band] = deepcopy(cached_cube.metadata).band_dimension.bands

    def get_band_meta_dict(b: Band):
        return { 
            "name": b.name,
            "common_name": b.common_name,
            "center_wavelength": b.wavelength_um,
            "gsd": b.gsd
        }

    m: CollectionMetadata = CollectionMetadata({
        "cube:dimensions": {
            "x": {
                "type": "spatial",
                "extent": spatial_extent["x"],
                "reference_system": reference_system
            },
            "y": {
                "type": "spatial",
                "extent": spatial_extent["y"],
                "reference_system": reference_system
            },
            "t": {
                "type": "temporal",
                "extent": temporal_extent
            },
            "spectral": {
                "type": "bands",
                "values": list(map(lambda band: band.name, bands))
            }
        },
        "summaries": {
            "eo:bands": list(map(lambda band: get_band_meta_dict(band), bands))
        }
    })

I’m afraid we’re now hitting another issue with the experimental version of load_result:
you set the metadata client side as a workaround, but the error is from server side.

The workaround is to also use methods like add_dimension and rename_labels to correctly configure the server side datacube after doing load_result.

Could you try, or if you provide a larger snippet, I could also try to see what you need exactly.

Adding

loaded_cube = loaded_cube \
        .add_dimension("spectral", "some_label", type="bands") \
        .rename_labels("spectral", list(map(lambda band: band.name, bands)))

I now get the following:
[500] Internal: Failed to process synchronously on backend vito: OpenEoApiError('[500] unknown: / by zero')
I am dividing by band values, so I think something is still missing.

Apologies, I was too slow, and now I have trouble finding the error.
Could you perhaps run it as a batch job? That makes it easier to look up logs for me.