UDP creation

Hi all,

I am working on creating a User-Defined Process (UDP). Initially, I generate a process graph and save it in a .json file, and I can successfully execute it using datacube_from_process, as shown below:

start_date = Parameter(name="start_date", description = "The start date.", schema=DATE_SCHEMA)
spatial_extent = Parameter(name="bbox", schema=bbox_schema, description="The spatial extent, as a bounding box")
region = Parameter.string("region",description="Eco-Region on which to compute water probability", default="Deserts", values=list(wwt.LOOKUPTABLE.keys()))
output, s2_cube = wwt._water_extent_for_month(connection, spatial_extent, region, start_date, date_shift(start_date,value=1,unit="month"), 85, 75, True)

udp = build_process_dict(output,"worldwater_water_extent","Computes water extent for a given month.")
cube = connection.datacube_from_process(udp, bbox = spatial_extent, region = 'Tundra',  start_date = '2023-07-01')
cube.download("extent_udp.tif")

Now, I am attempting to save the UDP directly in openEO’s backend using the following code, but I get an error: Preflight process graph validation failed: ‘process’ . Unfortunately, I don’t receive any additional information about what might be causing this error.

process_id = "water_extent"
udp_result = connection.save_user_defined_process(
    user_defined_process_id=process_id,
    process_graph=output,
    parameters=[start_date, spatial_extent, region]
)

Could anyone provide insights into what this error message means or how I can identify what causes it?

Hi,

I’m still looking into it, but I think you hit a bug in the python client here.
However, this “error” you see is essentially just a warning, and the UDP should be stored on the back-end

1 Like

I created bug report here: broken pre-flight validation in `con.save_user_defined_process` · Issue #526 · Open-EO/openeo-python-client · GitHub

Thanks for the quick reply.
Unfortunately I still get errors, and actually now I get another error when I try to load the graph from the json file.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_274/877011331.py in <module>
     10 udp = build_process_dict(output,"worldwater_water_extent","Computes water extent for a given month.")
     11 cube = connection.datacube_from_process(udp, bbox = spatial_extent, region = 'Tundra',  start_date = '2023-07-01')
---> 12 cube.download("extent_udp.tif")

/opt/conda/lib/python3.9/site-packages/openeo/rest/datacube.py in download(self, outputfile, format, options, validate)
   2100             format = guess_format(outputfile)
   2101         cube = self._ensure_save_result(format=format, options=options)
-> 2102         return self._connection.download(cube.flat_graph(), outputfile, validate=validate)
   2103 
   2104     def validate(self) -> List[dict]:

/opt/conda/lib/python3.9/site-packages/openeo/rest/_datacube.py in flat_graph(self)
     58         """
     59         # TODO: wrap in {"process_graph":...} by default/optionally?
---> 60         return self._pg.flat_graph()
     61 
     62     @property

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in flat_graph(self)
    200     def flat_graph(self) -> Dict[str, dict]:
    201         """Get the process graph in internal flat dict representation."""
--> 202         return GraphFlattener().flatten(node=self)
    203 
    204     @staticmethod

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in flatten(self, node)
    315     def flatten(self, node: PGNode) -> Dict[str, dict]:
    316         """Consume given nested process graph and return flat dict representation"""
--> 317         self.accept_node(node)
    318         assert len(self._argument_stack) == 0
    319         self._flattened[self._last_node_id]["result"] = True

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in accept_node(self, node)
    324         node_id = id(node)
    325         if node_id not in self._node_cache:
--> 326             super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace)
    327             self._node_cache[node_id] = self._last_node_id
    328         else:

/opt/conda/lib/python3.9/site-packages/openeo/internal/process_graph_visitor.py in _accept_process(self, process_id, arguments, namespace)
     98             elif isinstance(value, dict):
     99                 self.enterArgument(argument_id=arg_id, value=value)
--> 100                 self._accept_argument_dict(value)
    101                 self.leaveArgument(argument_id=arg_id, value=value)
    102             else:

/opt/conda/lib/python3.9/site-packages/openeo/internal/process_graph_visitor.py in _accept_argument_dict(self, value)
    118             self.accept_node(value['node'])
    119         elif value.get("from_node"):
--> 120             self.accept_node(value['from_node'])
    121         elif "process_id" in value:
    122             self.accept_node(value)

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in accept_node(self, node)
    324         node_id = id(node)
    325         if node_id not in self._node_cache:
--> 326             super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace)
    327             self._node_cache[node_id] = self._last_node_id
    328         else:

/opt/conda/lib/python3.9/site-packages/openeo/internal/process_graph_visitor.py in _accept_process(self, process_id, arguments, namespace)
    102             else:
    103                 self.constantArgument(argument_id=arg_id, value=value)
--> 104         self.leaveProcess(process_id=process_id, arguments=arguments, namespace=namespace)
    105         assert self.process_stack.pop() == process_id
    106 

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in leaveProcess(self, process_id, arguments, namespace)
    333 
    334     def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
--> 335         node_id = self._node_id_generator.generate(process_id)
    336         self._flattened[node_id] = dict_no_none(
    337             process_id=process_id,

/opt/conda/lib/python3.9/site-packages/openeo/internal/graph_building.py in generate(self, process_id)
    299     def generate(self, process_id: str):
    300         """Generate new key for given process id."""
--> 301         self._counters[process_id] += 1
    302         return "{p}{c}".format(p=process_id.replace('_', ''), c=self._counters[process_id])
    303 

TypeError: unhashable type: 'dict'

while when I try to save it as a UDP I get: Preflight process graph validation raised: [UpstreamValidationInfo] Backend ‘vito’ reported validation errors [ProcessParameterRequired] Process ‘n/a’ parameter ‘bbox’ is required.. And if I try to run:

geometry = '/home/jovyan/work/WorldWater/aoi_greenland.geojson'
spatial_extent = wwt._get_spatial_extent(geometry)

start_date = '2023-08-01'
region = 'Tundra'

wt_detections = connection.datacube_from_process(
    process_id="water_extent_estimation",
    start_date=start_date,
    region=region,
    bbox=spatial_extent
)
job = wt_detections.execute_batch(outputfile="greenland_udp.tif")

I get the following error:

Your batch job 'vito-j-240117dc8a9d4762a7d98588ee59dfa1' failed. Error logs:
[{'id': '[1705492664563, 200462]', 'time': '2024-01-17T11:57:44.563Z', 'level': 'error', 'message': 'OpenEO batch job failed: org.openeo.geotrellissentinelhub.package$NoSuchFeaturesException: no features found for criteria:\ncollection ID "sentinel-1-grd"\n1 polygon(s)\n[2023-08-01T00:00:00+00:00, 2023-09-01T00:00:00+00:00)\nmetadata properties {polarization={eq=DV}}'}, {'id': '[1705492685854, 9395497]', 'time': '2024-01-17T11:58:05.854Z', 'level': 'error', 'message': 'YARN application status reports error diagnostics: User application exited with status 1'}]
Full logs can be inspected in an openEO (web) editor or with `connection.job('vito-j-240117dc8a9d4762a7d98588ee59dfa1').logs()`.
---------------------------------------------------------------------------
JobFailedException                        Traceback (most recent call last)
/tmp/ipykernel_274/1079812014.py in <module>
      7 # wt_detections.download("test/wt_test.nc")
      8 # job = wt_detections.download("test/wt_33WXP_2_1.nc")
----> 9 job = wt_detections.execute_batch(outputfile="greenland_udp.tif")

/opt/conda/lib/python3.9/site-packages/openeo/rest/datacube.py in execute_batch(self, outputfile, out_format, print, max_poll_interval, connection_retry_interval, job_options, validate, **format_options)
   2224 
   2225         job = self.create_job(out_format=out_format, job_options=job_options, validate=validate, **format_options)
-> 2226         return job.run_synchronous(
   2227             outputfile=outputfile,
   2228             print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval

/opt/conda/lib/python3.9/site-packages/openeo/rest/job.py in run_synchronous(self, outputfile, print, max_poll_interval, connection_retry_interval)
    231     ) -> BatchJob:
    232         """Start the job, wait for it to finish and download result"""
--> 233         self.start_and_wait(
    234             print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
    235         )

/opt/conda/lib/python3.9/site-packages/openeo/rest/job.py in start_and_wait(self, print, max_poll_interval, connection_retry_interval, soft_error_max)
    313                 f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
    314             )
--> 315             raise JobFailedException(
    316                 f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).",
    317                 job=self,

JobFailedException: Batch job 'vito-j-240117dc8a9d4762a7d98588ee59dfa1' didn't finish successfully. Status: error (after 0:03:07).

Finally, If I run the following code, I get the expected outcome without any errors, so I guess there is nothing wrong with the pipeline itself.

output, s2_cube = wwt._water_extent_for_month(connection, spatial_extent, region, start_date, date_shift(start_date,value=1,unit="month"), 85, 75, True, False)
output.execute_batch(outputfile="local_test.tif")

Sorry for the delay, I somehow missed the notification for this thread

This looks invalid:

udp = build_process_dict(output, ...)
cube = connection.datacube_from_process(udp, bbox = spatial_extent, ...)

build_process_dict produces a Python dictionary and the first argument to datacube_from_process should be a string.
That is why you get a TypeError: unhashable type: 'dict' error

I think some steps are missing here: you should first save your UDP on the backend before you try to load it with datacube_from_process. Normally you should also not have to use build_process_dict explicitly here. Something like (roughly):

# Store UDP on backend
connection.save_user_defined_process(
    user_defined_process_id="worldwater_water_extent",
    process_graph=output,
    parameters=[start_date, spatial_extent, region],
)

# Use UDP
cube = connection.datacube_from_process(
    "worldwater_water_extent", 
    bbox = {"west": ...}, 
    region = 'Tundra', 
    start_date = '2023-07-01',
)

Also see this workflow example: User-Defined Processes — openEO Python Client 0.29.0a1 documentation

I also noted this from the snippet you provided:

here you use spatial_extent, which is a non-concrete parameter, as input to datacube_from_process which expects concrete values. I think that is the reason you get an error like " Process ‘n/a’ parameter ‘bbox’ is required"