I guess this is related to this thread but I cannot manage to generate an interpolated time series using mask values… I guess this is due to the modification to datacube properties when masking. Ther error I get when launching the 2 job is:
error processing batch job Traceback (most recent call last): File “batch_job.py”, line 328, in main run_driver() File “batch_job.py”, line 301, in run_driver run_job( File “/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py”, line 43, in memory_logging_wrapper return function(*args, **kwargs) File “batch_job.py”, line 397, in run_job assets_metadata = result.write_assets(str(output_file)) File “/opt/venv/lib64/python3.8/site-packages/openeo_driver/save_result.py”, line 110, in write_assets return self.cube.write_assets(filename=directory, format=self.format, format_options=self.options) File “/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py”, line 1691, in write_assets asset_paths = self._get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters( File “/opt/spark3_2_0/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py”, line 1309, in call return_value = get_return_value( File “/opt/spark3_2_0/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py”, line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling z:org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 20.0 failed 4 times, most recent failure: Lost task 58.3 in stage 20.0 (TID 1515) (epod027.vgt.vito.be executor 109): ExecutorLostFailure (executor 109 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 135598 ms Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.count(RDD.scala:1253) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.cacheAndRepartition(NetCDFRDDWriter.scala:211) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:123) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:105) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.writeRasters(NetCDFRDDWriter.scala:77) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters(NetCDFRDDWriter.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829)
Here is my code, working as expected when not masking…
# load data from openo
con = openeo.connect("https://openeo.vito.be").authenticate_oidc(provider_id="egi")
# Load data cube from TERRASCOPE_S2_LAI_V2 collection.
datacube = con.load_collection("TERRASCOPE_S2_LAI_V2",
spatial_extent={"west": 5.60, "south": 50.42, "east": 6.3, "north": 50.7},
temporal_extent=["2022-01-01", "2022-08-15"],
bands=["LAI_10M","SCENECLASSIFICATION_20M"])
# get classification band
SCL = datacube.band("SCENECLASSIFICATION_20M")
LAI = con.load_collection("TERRASCOPE_S2_LAI_V2",
spatial_extent={"west": 5.60, "south": 50.42, "east": 6.3, "north": 50.7},
temporal_extent=["2022-01-01", "2022-08-15"],
bands=["LAI_10M"])
# masking
mask = ~ ((SCL == 4) | (SCL == 5) | (SCL == 6))
LAI_masked = LAI.mask(mask)
# temporal aggregation
LAI_masked_month = LAI_masked.aggregate_temporal_period(period = "month" , reducer="mean")
LAI_month = LAI.aggregate_temporal_period(period = "month" , reducer="mean")
# interpolation attemps
LAI_masked_month_interpolate = LAI_masked_month.apply_dimension(process = "array_interpolate_linear", dimension = "t")
LAI_month_interpolate = LAI_month.apply_dimension(process = "array_interpolate_linear", dimension = "t")
# saving results
res = LAI_month_interpolate.save_result(format="netCDF")
job = res.create_job(title = "LAI_unmasked_monthly2")
job.start_job()
res2 = LAI_masked_month_interpolate.save_result(format="netCDF")
job2 = res2.create_job(title = "LAI_masked_monthly2")
job2.start_job()