Job splitting

Better yet: @daniel.thiex might be able to shed some light on this.

Dear @sulova.andrea,

the problem you are observing is in the data itself (nothing we as data provider can influence) and comes from the Sen2Cor processing (TOA reflectance to BOA reflectance). Itā€™s rare and usually appears of smoke/cloudy areas (pixels that you should mask/not include in your analysis anyway).

Depending on your use case Iā€™d recommend to either search for cloud free data on an other day, filter out those pixel with the help of scene classification band or use L1C collection that doesnā€™t have this problem.

Best,
Daniel

Thanks for your support and explanation - If you are sure that these mysterious goups of pixels happen only in the cloudy areas then it should be fine.

There is another issue: When we tried to scale more complex code it seems that there are some limitations. Could you let us know how to overcome this problem?

Job ID: ā€˜j-884f58e6b8d449ce82e1820b03029cd4ā€™

Error message:
**OpenEoApiError** : [400] BadRequest: requirement failed: metadata_properties are not supported yet (ref: r-5845a79f598045d785a8e3a1225d094c)

The code:

start_date           = '2021-06-01'
spatial_extent       = {'west': -74.5, 'east': -73, 'south': 4.5, 'north': 5, 'crs': 'epsg:4326'} #colombia

## Get the Sentinel-2 data for a 3 month window.
start_date_dt_object = datetime.strptime(start_date, '%Y-%m-%d')
end_date             = (start_date_dt_object + relativedelta(months = +1)).date() ## End date, 1 month later (1st Feb. 2021)
start_date_exclusion = (start_date_dt_object + relativedelta(months = -1)).date() ## exclusion date, to give a 3 month window.
bands                = ['B02', 'B03', 'B04', 'B08', 'CLP', 'SCL' , 'sunAzimuthAngles', 'sunZenithAngles'] 


start_date           = '2021-06-01'
spatial_extent       = {'west': -74.5, 'east': -73, 'south': 4.5, 'north': 5, 'crs': 'epsg:4326'} #colombia
zone                 = "tropical"


## Get the Sentinel-2 data for a 3 month window.
start_date_dt_object = datetime.strptime(start_date, '%Y-%m-%d')
end_date             = (start_date_dt_object + relativedelta(months = +3)).date() ## End date, 1 month later (1st Feb. 2021)
start_date_exclusion = (start_date_dt_object + relativedelta(months = -3)).date() ## exclusion date, to give a 3 month window.


LOOKUPTABLE = {
    "tropical": {
        "S1": lambda vv: 1 / (1 + exp(- (-7.17 + (-0.48 * vv)))),
        "S2": lambda ndvi, ndwi: 1 / (1 + exp(- (0.845 + (2.14 * ndvi) + (13.5 * ndwi)))),
        "S1_S2": lambda vv, ndwi: 1 / (1 + exp(- (-2.64 + (-0.23 * vv) + (8.6 * ndwi)))),
    },
    "subtropical":
        {
            "S1": lambda vv, vh: 1 / (1 + exp(- (-8.1 + (-0.13 * vv) + (-0.27 * vh)))),
            "S2": lambda ndvi, ndwi: 1 / (1 + exp(- (0.845 + (2.14 * ndvi) + (13.5 * ndwi)))),
            "S1_S2": lambda vv, ndwi: 1 / (1 + exp(- (-2.64 + (-0.23 * vv) + (8.6 * ndwi)))),
        }
}

s2_cube = connection.load_collection(
    'SENTINEL2_L2A_SENTINELHUB',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date_exclusion, end_date],
    bands=['B02', 'B03', 'B04', 'B08', 'sunAzimuthAngles', 'sunZenithAngles'])



# This will avoid loading data from sentinelhub which is then discarded later on

s2_cube_masking = connection.load_collection(
    'SENTINEL2_L2A_SENTINELHUB',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date_exclusion, end_date],
    bands=['CLP', 'SCL'])

scl = s2_cube_masking.band("SCL")
mask_scl = (scl == 3) | (scl == 8) | (scl == 9) | (scl == 10) | (scl == 11)

clp = s2_cube_masking.band("CLP")
mask_clp = mask_scl | (clp / 255) > 0.3
s2_cube = s2_cube.mask(mask_clp.resample_cube_spatial(s2_cube))


s2_count = s2_cube.filter_bands(bands=["B08"])
s2_count = s2_count.reduce_dimension(reducer=lambda data: data.count(), dimension="t")
s2_count = s2_count.rename_labels("bands", ["count"])

s2_cube = append_indices(s2_cube, ["NDWI","NDVI"]) 

def water_function(data):
    return LOOKUPTABLE[zone]["S2"](ndwi=data[6], ndvi=data[7])

s2_cube_water = s2_cube.reduce_dimension(reducer=water_function, dimension="bands")
s2_cube_water = s2_cube_water.add_dimension("bands", "water_prob", type="bands")


s2_cube_water_threshold = s2_cube_water.apply_dimension(dimension="bands", process=lambda x: if_(x > 0.75, x, 0))
s2_cube_water_threshold = s2_cube_water_threshold.rename_labels("bands", ["w_T75"])



s2_cube_water_sum = s2_cube_water_threshold.reduce_dimension(reducer="sum", dimension="t")
s2_cube_water_sum = s2_cube_water_sum.rename_labels("bands", ["sum"])


s2_cube_swf = s2_cube_water_sum.resample_cube_spatial(s2_count) / s2_count
s2_cube_swf = s2_cube_swf.rename_labels("bands", ["swf"])

s2_median_water = s2_cube_water.filter_temporal([start_date, end_date]).median_time()

s2_cube_median = s2_cube.filter_temporal([start_date, end_date]).median_time()

## Get Sentinel-1 data for a 1 month window and convert to ARD data.
s1_cube = connection.load_collection(
    'SENTINEL1_GRD',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date, end_date],
    bands=['VH', 'VV'],
    properties={"polarization": lambda p: p == "DV"})

s1_cube = s1_cube.sar_backscatter(coefficient="gamma0-terrain", mask=True, elevation_model="COPERNICUS_30")

s1_cube = s1_cube.rename_labels("bands", ["VH", "VV", "mask", "incidence_angle"])
s1_cube_mask = s1_cube.band("mask")
s1_mask_RS = (s1_cube_mask == 2)
s1_cube = s1_cube.mask(s1_mask_RS)

def log_(x):
    return 10 * log(x, 10)
    
s1_median = s1_cube.median_time().apply(log_)


job_options = {
        "tile_grid": "utm-50km",
        "executor-memory": "10G",
        "executor-memoryOverhead": "10G",
        "executor-cores": "4"}

s2_cube_scale_save = s1_median.save_result(format='netCDF') #GTiff #netCDF
my_job  = s2_cube_scale_save.create_job(title="s2_cube_scale_Nov_1", job_options=job_options)
results = my_job.start_and_wait().get_results()
results.download_files("s2_cube_scale_Nov_1") 

Andrea, this is a known issue on openeo-dev.vito.be; in the meanwhile, openeo.cloud has been updated with the fix so please target that instead.

openeo.cloud does not provide postprocessing, meaning all ouput files are merged. Thus, aopenEO.nc file is missing in the ouputs (see image below)

openeo-dev.vito.be produces a merged file (openEO.nc )

Are you planning to implement ā€œpostprocessingā€ on openeo.cloud?

openeo-dev.vito.be:
image
openeo.cloud :
image

When I have tried to run the script I was gettting this error message,
Is there any problem to scale my code which works for the small area?

I have run it 5 times but the same error message:
job ID: agg-pj-20221107-133630

ConnectionResetError                      Traceback (most recent call last)
File c:\Users\ansu\Miniconda3\envs\py39\lib\site-packages\urllib3\connectionpool.py:703, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    702 # Make the request on the httplib connection object.
--> 703 httplib_response = self._make_request(
    704     conn,
    705     method,
    706     url,
    707     timeout=timeout_obj,
    708     body=body,
    709     headers=headers,
    710     chunked=chunked,
    711 )
    713 # If we're going to release the connection in ``finally:``, then
    714 # the response doesn't need to know about the connection. Otherwise
    715 # it will also try to release it and we'll have a double-release
    716 # mess.

File c:\Users\ansu\Miniconda3\envs\py39\lib\site-packages\urllib3\connectionpool.py:449, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    445         except BaseException as e:
    446             # Remove the TypeError from the exception chain in
    447             # Python 3 (including for exceptions like SystemExit).
    448             # Otherwise it looks like a bug in the code.
--> 449             six.raise_from(e, None)
    450 except (SocketTimeout, BaseSSLError, SocketError) as e:
...
    549 except MaxRetryError as e:
    550     if isinstance(e.reason, ConnectTimeoutError):
    551         # TODO: Remove this in 3.0.0: see #2811

ConnectionError: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 

This is the code:

start_date           = '2021-06-01'
spatial_extent       = {'west': -74.5, 'east': -73, 'south': 4.5, 'north': 5, 'crs': 'epsg:4326'} #colombia

## Get the Sentinel-2 data for a 3 month window.
start_date_dt_object = datetime.strptime(start_date, '%Y-%m-%d')
end_date             = (start_date_dt_object + relativedelta(months = +1)).date() ## End date, 1 month later (1st Feb. 2021)
start_date_exclusion = (start_date_dt_object + relativedelta(months = -1)).date() ## exclusion date, to give a 3 month window.
bands                = ['B02', 'B03', 'B04', 'B08', 'CLP', 'SCL' , 'sunAzimuthAngles', 'sunZenithAngles'] 


start_date           = '2021-06-01'
spatial_extent       = {'west': -74.5, 'east': -73, 'south': 4.5, 'north': 5, 'crs': 'epsg:4326'} #colombia
zone                 = "tropical"


## Get the Sentinel-2 data for a 3 month window.
start_date_dt_object = datetime.strptime(start_date, '%Y-%m-%d')
end_date             = (start_date_dt_object + relativedelta(months = +3)).date() ## End date, 1 month later (1st Feb. 2021)
start_date_exclusion = (start_date_dt_object + relativedelta(months = -3)).date() ## exclusion date, to give a 3 month window.


LOOKUPTABLE = {
    "tropical": {
        "S1": lambda vv: 1 / (1 + exp(- (-7.17 + (-0.48 * vv)))),
        "S2": lambda ndvi, ndwi: 1 / (1 + exp(- (0.845 + (2.14 * ndvi) + (13.5 * ndwi)))),
        "S1_S2": lambda vv, ndwi: 1 / (1 + exp(- (-2.64 + (-0.23 * vv) + (8.6 * ndwi)))),
    },
    "subtropical":
        {
            "S1": lambda vv, vh: 1 / (1 + exp(- (-8.1 + (-0.13 * vv) + (-0.27 * vh)))),
            "S2": lambda ndvi, ndwi: 1 / (1 + exp(- (0.845 + (2.14 * ndvi) + (13.5 * ndwi)))),
            "S1_S2": lambda vv, ndwi: 1 / (1 + exp(- (-2.64 + (-0.23 * vv) + (8.6 * ndwi)))),
        }
}

s2_cube = connection.load_collection(
    'SENTINEL2_L2A_SENTINELHUB',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date_exclusion, end_date],
    bands=['B02', 'B03', 'B04', 'B08', 'sunAzimuthAngles', 'sunZenithAngles'])

# This will avoid loading data from sentinelhub which is then discarded later on

s2_cube_masking = connection.load_collection(
    'SENTINEL2_L2A_SENTINELHUB',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date_exclusion, end_date],
    bands=['CLP', 'SCL'])

scl = s2_cube_masking.band("SCL")
mask_scl = (scl == 3) | (scl == 8) | (scl == 9) | (scl == 10) | (scl == 11)

clp = s2_cube_masking.band("CLP")
mask_clp = mask_scl | (clp / 255) > 0.3
s2_cube = s2_cube.mask(mask_clp.resample_cube_spatial(s2_cube))


s2_count = s2_cube.filter_bands(bands=["B08"])
s2_count = s2_count.reduce_dimension(reducer=lambda data: data.count(), dimension="t")
s2_count = s2_count.rename_labels("bands", ["count"])

s2_cube = append_indices(s2_cube, ["NDWI","NDVI"]) 

def water_function(data):
    return LOOKUPTABLE[zone]["S2"](ndwi=data[6], ndvi=data[7])

s2_cube_water = s2_cube.reduce_dimension(reducer=water_function, dimension="bands")
s2_cube_water = s2_cube_water.add_dimension("bands", "water_prob", type="bands")


s2_cube_water_threshold = s2_cube_water.apply_dimension(dimension="bands", process=lambda x: if_(x > 0.75, x, 0))
s2_cube_water_threshold = s2_cube_water_threshold.rename_labels("bands", ["w_T75"])
s2_cube_water_sum = s2_cube_water_threshold.reduce_dimension(reducer="sum", dimension="t")
s2_cube_water_sum = s2_cube_water_sum.rename_labels("bands", ["sum"])

s2_cube_swf = s2_cube_water_sum.resample_cube_spatial(s2_count) / s2_count
s2_cube_swf = s2_cube_swf.rename_labels("bands", ["swf"])

s2_median_water = s2_cube_water.filter_temporal([start_date, end_date]).median_time()
s2_cube_median = s2_cube.filter_temporal([start_date, end_date]).median_time()


s1_cube = connection.load_collection(
    'SENTINEL1_GRD',
    spatial_extent=spatial_extent,
    temporal_extent=[start_date, end_date],
    bands=['VH', 'VV'],
    properties={"polarization": lambda p: p == "DV"})

s1_cube = s1_cube.sar_backscatter(coefficient="gamma0-terrain", mask=True, elevation_model="COPERNICUS_30")

s1_cube = s1_cube.rename_labels("bands", ["VH", "VV", "mask", "incidence_angle"])
s1_cube_mask = s1_cube.band("mask")
s1_mask_RS = (s1_cube_mask == 2)
s1_cube = s1_cube.mask(s1_mask_RS)

def log_(x):
    return 10 * log(x, 10)
    
s1_median = s1_cube.median_time().apply(log_)

def s1_water_function(data):
    return LOOKUPTABLE[zone]["S1"](vv=data[1])
s1_median_water = s1_median.reduce_dimension(reducer=s1_water_function, dimension="bands")

exclusion_mask = (s1_median_water.resample_cube_spatial(s2_cube_swf) > 0.5) & (s2_cube_swf < 0.33)
s1_median_water_mask = s1_median_water.mask(exclusion_mask.resample_cube_spatial(s1_median_water))

def s1_s2_water_function(data):
    return LOOKUPTABLE[zone]["S1_S2"](vv=data[0], ndwi=data[1])

s1_s2_cube = s1_median.filter_bands(['VV']).resample_cube_spatial(s2_cube_median).merge_cubes(s2_cube_median.filter_bands(['NDWI'])) 
s1_s2_water = s1_s2_cube.reduce_dimension(reducer=s1_s2_water_function, dimension="bands").add_dimension("bands", "var", type="bands")
                      
s1_s2_mask = (s1_s2_water >= 0)
s2_mask = s2_median_water.mask(s1_s2_mask) >= 0
s1_mask = s1_median_water.mask(s1_s2_mask).mask(s2_mask) >= 0

s1_s2_masked = s1_s2_water.mask(s1_s2_mask.apply(lambda x: x.eq(0)), replacement = 0)
s2_masked = s2_median_water.mask(s2_mask.apply(lambda x: x.eq(0)), replacement = 0)
s1_masked = s1_median_water.mask(s1_mask.apply(lambda x: x.eq(0)), replacement = 0)

merge_all = s1_s2_masked.merge_cubes(s2_masked, overlap_resolver='sum').merge_cubes(s1_masked, overlap_resolver='sum')

worldcover_cube = connection.load_collection("ESA_WORLDCOVER_10M_2020_V1", 
                                            temporal_extent = ['2020-12-30', '2021-01-01'], 
                                            spatial_extent = spatial_extent, 
                                            bands = ["MAP"])
                                         
builtup_mask = worldcover_cube.band("MAP") == 50
water_probability = merge_all.mask(builtup_mask.max_time().resample_cube_spatial(merge_all))
water_T75 = water_probability > 0.75

job_options = {
        "tile_grid": "utm-50km",
        "executor-memory": "10G",
        "executor-memoryOverhead": "10G",
        "executor-cores": "4"}

water_T75_save = water_T75.save_result(format='netCDF') #GTiff #netCDF
my_job  = water_T75_save.create_job(title="water_T75_0711", job_options=job_options)
results = my_job.start_and_wait().get_results()
results.download_files("water_T75_0711") 

Hi Andrea,
there indeed seems to be a problem with the job splitting. They also now take very long, because it resulted in so many smaller jobs that none of them actually get enough resources to properly finish. Weā€™ll look into addressing that on our side.

I will also look at your script to see if we can somehow get better performance, which will also improve the situation.
This does mean that some of the currently running jobs may need to be stopped to allow for some testing. In case you do submit new jobs, please be careful not to try to have too many at the same time.

thanks!
Jeroen

Hi Jeroen,

Thank you for your effort to look at this problem. It is highly appreciated.

You highlighted a good point regarding the open jobs. I have a problem to delete agg-pj jobs while sub-jobs can be deleted (fx: ā€¦part 0002 (3/12) on Web Editor. There are a few jobs running, some of them orginate from Sep 2022.

This is an overview of currently running jobs which I can not delete via the Delete button on web editor:

Examples:

  • Url: /jobs/agg-pj-20220923-141409
  • Url:/jobs/agg-pj-20221107-130714

This is the error message which I get when the delele button is pressed :
Server error: AttributeError("'JobAdapter' object has no attribute 'delete_job'")

I am using open.cloud but info shows * Base URL: https://openeocloud.vito.be/openeo/1.0.0.

1 Like

That is indeed a known issue: Partitioned jobs: support job cancel/delete Ā· Issue #39 Ā· Open-EO/openeo-aggregator Ā· GitHub
(also mentioned here https://github.com/openEOPlatform/architecture-docs/issues/192)