Source code for s1tiling.libs.api

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =========================================================================
#   Program:   S1Processor
#
#   All rights reserved.
#   Copyright 2017-2026 (c) CNES.
#   Copyright 2022-2024 (c) CS GROUP France.
#
#   This file is part of S1Tiling project
#       https://gitlab.orfeo-toolbox.org/s1-tiling/s1tiling
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       https://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
# =========================================================================
#
# Authors: Thierry KOLECK (CNES)
#          Luc HERMITTE (CS Group)
#          Fabien CONTIVAL (CS Group)
#
# =========================================================================

"""
Submodule that defines all API related functions and classes.
"""

from collections.abc import Callable
import contextlib
import logging
import os
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union, cast

from distributed.scheduler import KilledWorker
from dask.distributed import Client
from eodag.api.core import EODataAccessGateway

from .S1DateAcquisition import S1DateAcquisition
from .S1FileManager import (
    S1FileManager,
    EODAG_DEFAULT_DOWNLOAD_WAIT,
    EODAG_DEFAULT_DOWNLOAD_TIMEOUT,
    EODAG_DEFAULT_SEARCH_MAX_RETRIES,
    EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE,
)
from . import exits
from . import exceptions
from . import Utils
from .configuration import (
    Configuration,
    LIAConfiguration,
    dname_fmt_filtered,
    dname_fmt_gamma_area_product,
    dname_fmt_lia_product,
    dname_fmt_tiled,
    fname_fmt_concatenation,
    fname_fmt_filtered,
    fname_fmt_gamma_area_corrected,
    fname_fmt_gamma_area_product,
    fname_fmt_lia_corrected,
)
from .utils.FileManager import FileManager
from .otbpipeline import (
    FirstStep,
    PipelineDescription,
    PipelineDescriptionSequence,
    StepFactory,
    AbstractStep,
)
from .otbwrappers import (
    # Main S1 -> S2 Step Factories
    ExtractSentinel1Metadata,
    AnalyseBorders,
    Calibrate,
    CorrectDenoising,
    CutBorders,
    OrthoRectify,
    Concatenate,
    BuildBorderMask,
    SmoothBorderMask,
    GenerateQuickLook,
    # LIA related Step Factories
    AgglomerateDEMOnS2,
    ProjectDEMToS2Tile,
    ProjectGeoidToS2Tile,
    SumAllHeights,
    ComputeGroundAndSatPositionsOnDEM,
    ComputeGroundAndSatPositionsOnDEMFromEOF,
    ComputeLIAOnS2,
    filter_LIA,
    ComputeNormalsOnS2,
    ComputeEllipsoidNormalsOnS2,
    ComputeIAOnS2,
    ComputeGroundAndSatPositionsOnEllipsoid,
    ApplyLIACalibration,
    # Deprecated LIA related Step Factories
    AgglomerateDEMOnS1,
    SARDEMProjection,
    SARCartesianMeanEstimation,
    ComputeNormalsOnS1,
    OrthoRectifyLIA,
    ComputeLIAOnS1,
    ConcatenateLIA,
    SelectBestCoverage,
    # Gamma Area related Step Factories
    ResampleDEM,
    SARDEMProjectionImageEstimation,
    SARGammaAreaImageEstimation,
    OrthoRectifyGAMMA_AREA,
    ConcatenateGAMMA_AREA,
    SelectGammaNaughtAreaBestCoverage,
    ApplyGammaNaughtRTCCalibration,
    NaNifyNoData,
    ProjectGeoidToDEM,
    # Filter Step Factories
    SpatialDespeckle,
)
from .outcome     import Outcome
from .orbit       import EOFFileManager
from .utils.dask  import DaskContext
from .utils       import eodag
from .utils.layer import filter_existing_tiles
from .utils.timer import timethis

from .vis import SimpleComputationGraph  # Graphs
from .workspace import DEMWorkspace, WorkspaceKinds, ensure_tiled_workspaces_exist


IntersectingS1FilesOutcome = Outcome[List[Dict]]
FileManagerBuilder         = Callable[[Configuration, Optional[EODataAccessGateway]], FileManager]


logger = logging.getLogger('s1tiling.api')


def main_output_name_formats(configuration: Configuration) -> List[Tuple[str,str]]:
    """
    Helper function that generates the list of output name formats (dirname + filename) in the
    main case scenarios: :ref:`scenario.S1Processor`, :ref:`scenario.S1ProcessorLIA` and
    :ref:`scenario.S1ProcessorRTC`.
    """
    res = []
    if configuration.calibration_type == 'normlim':
        res.append((dname_fmt_tiled(configuration), fname_fmt_lia_corrected(configuration)))
    elif configuration.calibration_type == 'gamma_naught_rtc':
        res.append((dname_fmt_tiled(configuration), fname_fmt_gamma_area_corrected(configuration)))
        res.append((dname_fmt_gamma_area_product(configuration), fname_fmt_gamma_area_product(configuration)))
    else:
        res.append((dname_fmt_tiled(configuration), fname_fmt_concatenation(configuration)))
    if configuration.filter:
        res.append((dname_fmt_filtered(configuration), fname_fmt_filtered(configuration)))
    return res


@timethis("Sanitizing requested tiles", log_level=logging.INFO)
def extract_tiles_to_process(cfg: Configuration, s1_file_manager: Optional[S1FileManager]) -> List[str]:
    """
    Deduce from the configuration all the tiles that need to be processed.

    :return: the sorted list of all the tile names to process
    """
    logger.info('Requested tiles: %s', cfg.tile_list)

    tiles_to_process = []
    if cfg.tile_list[0] == "ALL":
        if not s1_file_manager:
            raise exceptions.ConfigurationError("tile_list=ALL mode is not compatible with this scenario", "")
        # Check already done in the configuration object
        assert not (
            cfg.download and "ALL" in cfg.roi_by_tiles
        ), "Can not request to download 'ROI_by_tiles : ALL' if 'Tiles : ALL'. Change either value or deactivate download instead"
        tiles_to_process = s1_file_manager.get_tiles_covered_by_products()
        logger.info("All tiles for which more than %s%% of the surface is covered by products will be produced: %s",
                100 * cfg.tile_to_product_overlap_ratio, tiles_to_process)
    else:
        tiles_to_process = filter_existing_tiles(cfg.output_grid, cfg.tile_list)

    # We can not require both to process all tiles covered by downloaded products
    # and download all tiles

    logger.info('The following tiles will be processed: %s', tiles_to_process)
    return tiles_to_process


def _how2str(how: Union[Tuple, AbstractStep]) -> str:
    """
    Make task definition from logger friendly
    """
    if isinstance(how, AbstractStep):
        return str(how)
    else:
        return f"Task(pipeline: {how[1]}; keys: {how[2]})"


def _execute_tasks_debug(dsk: Dict, tile_name: str) -> List:
    """
    Execute the tasks directly, one after the other, without Dask layer.
    The objective is to be able to debug OTB applications.
    """
    tasks = list(Utils.tsort(dsk, dsk.keys(),
        lambda dasktask_data : [] if isinstance(dasktask_data, FirstStep) else dasktask_data[2])
    )
    logger.debug('Debug execution of %s tasks', len(tasks))
    for product in reversed(tasks):
        how = dsk[product]
        logger.debug('- task: %s <-- %s', product, _how2str(how))
    logger.info('Executing tasks one after the other for %s (debugging OTB)', tile_name)
    results = []
    for product in reversed(tasks):
        how = dsk[product]
        logger.info('- execute: %s <-- %s', product, _how2str(how))
        if not issubclass(type(how), FirstStep):
            results += [how[0](*list(how)[1:])]
    return results


def _execute_tasks_with_dask(  # pylint: disable=too-many-arguments
    *,
    dsk:                   Dict[str, Union[Tuple, "FirstStep"]],
    tile_name:             str,
    tile_idx:              int,
    required_products:     List[str],
    client:                Client,
    pipelines:             PipelineDescriptionSequence,
    do_watch_ram:          bool,
    debug_tasks:           bool
) -> List:
    """
    Execute the tasks in parallel through Dask.
    """
    if debug_tasks:
        SimpleComputationGraph().simple_graph(
                dsk, filename=f'tasks-{tile_idx+1}-{tile_name}.svg')
    logger.info('Start S1 -> S2 transformations for %s', tile_name)
    nb_tries = 2
    for run_attempt in range(1, nb_tries + 1):
        try:
            logger.debug("  Execute tasks, attempt #%s", run_attempt)
            results = client.get(dsk, required_products)
            return results
        except KilledWorker as e:
            logger.critical('%s', dir(e))
            logger.exception("Worker %s has been killed when processing %s on %s tile: (%s). Workers will be restarted: %s/%s",
                    e.last_worker.name, e.task, tile_name, e, run_attempt, nb_tries)
            # TODO: don't overwrite previous logs
            # And we'll need to use the synchronous=False parameter to be able to check
            # successful executions but then, how do we clean up futures and all??
            client.restart()
            # Update the list of remaining tasks
            if run_attempt < nb_tries:
                dsk, required_products, errors = pipelines.generate_tasks(do_watch_ram=do_watch_ram)
                # it's unlikely for errors to appear here
                assert not errors, f"No errors regarding task generation shall appear here: {errors}"
            else:
                raise
    return []


def get_s1_files_for_tile(
    s1_file_manager:     S1FileManager,
    tile_name:           str,
    output_name_formats: List[Tuple[str, str]],
    dryrun:              bool,
) -> IntersectingS1FilesOutcome:
    """
    Returns the list of all S1 files intersecting the given S2 MGRS tile name.

    :return: An :class:`Outcome` of list of S1 image information, or the :class:`RuntimeError` that has happened.
    :raise DownloadS1FileError: if a critical error occurs
    """
    s1_file_manager.keep_X_latest_S1_files(1000, tile_name, output_name_formats)

    try:
        s1_file_manager.download_images(tiles=[tile_name], output_name_formats=output_name_formats, dryrun=dryrun)
        # download_images will have updated the list of know products
    except RuntimeError as e:
        logger.warning('Cannot download S1 images associated to %s: %s', tile_name, e)
        # logger.critical(e, exc_info=True)
        return IntersectingS1FilesOutcome(e)

    except BaseException as e:
        logger.debug('Download error intercepted: %s', e)
        raise exceptions.DownloadS1FileError(tile_name) from e

    intersect_raster_list = s1_file_manager.get_s1_intersect_by_tile(tile_name, output_name_formats)
    logger.debug('%s products found to intersect %s: %s', len(intersect_raster_list), tile_name, intersect_raster_list)
    return IntersectingS1FilesOutcome(intersect_raster_list)


@timethis("Processing of tile {tile_name}", log_level=logging.INFO)
def process_one_tile(  # pylint: disable=too-many-arguments
    *,
    tile_name:               str,
    tile_idx:                int,
    tiles_nb:                int,
    cfg:                     Configuration,
    pipelines:               PipelineDescriptionSequence,
    client:                  Optional[Client],
    required_workspaces:     List[WorkspaceKinds],
    debug_otb:               bool = False,
    do_watch_ram:            bool = False,
    debug_tasks:             bool = False
) -> List[Outcome]:
    """
    Process one S2 tile.

    I.E. run the OTB pipeline on all the S1 images that match the S2 tile.
    """
    ensure_tiled_workspaces_exist(cfg, tile_name, required_workspaces)

    logger.info("Processing tile %s (%s/%s)", tile_name, tile_idx + 1, tiles_nb)

    pipelines.register_extra_parameters_for_input_factories(tile_name=tile_name)
    dsk, required_products, errors = pipelines.generate_tasks(do_watch_ram)
    if errors:
        return errors
    logger.debug('######################################################################')
    logger.debug('Summary of %s tasks related to S1 -> S2 transformations of %s', len(dsk), tile_name)
    for product, how in dsk.items():
        logger.debug('- task: %s <-- %s', product, _how2str(how))

    if debug_otb:
        return _execute_tasks_debug(dsk, tile_name)
    else:
        assert client, "Dask client shall exist when not debugging calls to OTB applications"
        return _execute_tasks_with_dask(
            dsk=dsk,
            tile_name=tile_name,
            tile_idx=tile_idx,
            required_products=required_products,
            client=client,
            pipelines=pipelines,
            do_watch_ram=do_watch_ram,
            debug_tasks=debug_tasks,
        )


def read_config(
    config_opt          : Union[str, Configuration],
    extra_config_checks : Sequence[Tuple[Callable[[Configuration], bool], str]] = (),
) -> Configuration:
    """
    The config_opt can be either the configuration filename or an already initialized configuration
    object
    """
    if isinstance(config_opt, str):
        return Configuration(config_opt, extra_config_checks=extra_config_checks)
    else:
        for check, msg in extra_config_checks:
            if not check(config_opt):
                raise exceptions.ConfigurationError(msg, "")
        return config_opt


def _extend_config(config: Configuration, extra_opts: Dict, overwrite: bool = False) -> Configuration:
    """
    Adds attributes to configuration object.

    .. todo:: Configuration object shall be closer to a dictionary to avoid these workarounds...
    """
    for k in extra_opts:
        if overwrite or not hasattr(config, k):
            setattr(config, k, extra_opts[k])
    return config


def do_process_with_pipeline(  # pylint: disable=too-many-arguments, too-many-locals
    config_opt             : Union[str, Configuration],
    pipeline_builder,
    *,
    ctx_managers           : Sequence[Type] = (),
    extra_config_checks    : Sequence[Tuple[Callable[[Configuration], bool], str]] = (),
    dl_wait                : int  = EODAG_DEFAULT_DOWNLOAD_WAIT,
    dl_timeout             : int  = EODAG_DEFAULT_DOWNLOAD_TIMEOUT,
    searched_items_per_page: int  = EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE,
    nb_max_search_retries  : int  = EODAG_DEFAULT_SEARCH_MAX_RETRIES,
    dryrun                 : bool = False,
    debug_caches           : bool = False,
    debug_otb              : bool = False,
    watch_ram              : bool = False,
    debug_tasks            : bool = False,
    file_manager_builders  : Optional[dict[str, FileManagerBuilder]] = None,
) -> exits.Situation:
    """
    Internal function for executing pipelines.
    # TODO: parametrize tile loop, product download...
    """
    config: Configuration  = read_config(config_opt, extra_config_checks)
    extra_opts = {
        "dl_wait"                : dl_wait,
        "dl_timeout"             : dl_timeout,
        "searched_items_per_page": searched_items_per_page,
        "nb_max_search_retries"  : nb_max_search_retries,
    }
    _extend_config(config, extra_opts, overwrite=False)

    os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = str(config.OTBThreads)
    os.environ["GDAL_NUM_THREADS"] = str(config.OTBThreads)

    # For the OTB applications that don't receive the path as a parameter (like SARDEMProjection)
    # -> we set $OTB_GEOID_FILE
    if not os.path.exists(config.GeoidFile):
        raise exceptions.MissingGeoidError(config.GeoidFile)
    os.environ["OTB_GEOID_FILE"] = config.GeoidFile

    dag = eodag.create(config)
    file_manager_builders = file_manager_builders or {}
    file_managers = {}
    for key, fmb in file_manager_builders.items():
        file_managers[f"{key}_file_manager"] = fmb(config, dag)

    # Special case for the S1 File Manager
    s1_file_manager = cast(S1FileManager, file_managers.get('s1_file_manager', None))
    # -> sometimes the tile list shall be deduced from the S1 files found on disk
    tiles_to_process = extract_tiles_to_process(config, s1_file_manager)
    nb_tiles = len(tiles_to_process)
    if s1_file_manager:
        logger.info("%s images to process on %s tiles: %s", s1_file_manager.nb_images, nb_tiles, tiles_to_process)
    else:
        logger.info("%s tiles to process: %s", nb_tiles, tiles_to_process)

    if nb_tiles == 0:
        raise exceptions.NoS2TileError()

    # Prepare directories where to store temporary files
    # These directories won't be cleaned up automatically
    S1_tmp_dir = os.path.join(config.tmpdir, 'S1')
    os.makedirs(S1_tmp_dir, exist_ok=True)

    with contextlib.ExitStack() as context:
        for cm in ctx_managers:
            context.enter_context(cm(config, tiles_to_process))

        pipelines, required_workspaces = pipeline_builder(config, dryrun=dryrun, debug_caches=debug_caches)

        # Used by eof
        pipelines.register_extra_parameters_for_input_factories(
            **file_managers,  # names are: "{key}_file_manager"
            dryrun=dryrun,
            # tile_name will be done in process_one_tile
        )

        results = []
        with DaskContext(config, debug_otb) as dask_client:
            for idx, tile_it in enumerate(tiles_to_process):
                res = process_one_tile(
                    tile_name=tile_it,
                    tile_idx=idx,
                    tiles_nb=nb_tiles,
                    cfg=config,
                    pipelines=pipelines,
                    client=dask_client.client,
                    required_workspaces=required_workspaces,
                    debug_otb=debug_otb,
                    do_watch_ram=watch_ram,
                    debug_tasks=debug_tasks,
                )
                results.extend(res)

        nb_errors_detected = sum(not bool(res) for res in results)

        skipped_for_download_failures = s1_file_manager.get_skipped_S2_products() if s1_file_manager else []
        results.extend(skipped_for_download_failures)

        logger.debug('#############################################################################')
        nb_issues = nb_errors_detected + len(skipped_for_download_failures)
        if nb_issues > 0:
            logger.warning('Execution report: %s errors detected', nb_issues)
        else:
            logger.info('Execution report: no error detected')

        if results:
            log_level : Callable[[Any], int] = lambda res: logging.INFO if bool(res) else logging.WARNING
            for res in results:
                logger.log(log_level(res), ' - %s', res)
        else:
            logger.info(' -> Nothing has been executed')

        search_failures   = 0
        download_failures = []
        download_timeouts = []
        for fm in file_managers.values():
            search_failures   += fm.get_search_failures()
            download_failures .extend(fm.get_download_failures())
            download_timeouts .extend(fm.get_download_timeouts())

        return exits.Situation(
            nb_computation_errors=nb_errors_detected - search_failures,
            nb_search_failures=search_failures,
            nb_download_failures=len(download_failures),
            nb_download_timeouts=len(download_timeouts),
        )


def register_LIA_pipelines_v0(pipelines: PipelineDescriptionSequence, produce_angles: bool) -> PipelineDescription:
    """
    Internal function that takes care to register all pipelines related to
    LIA map and sin(LIA) map.
    """
    dem = pipelines.register_pipeline(
        [AgglomerateDEMOnS1],
        'AgglomerateDEM',
        inputs={'insar': 'basename'})

    demproj = pipelines.register_pipeline(
        [ExtractSentinel1Metadata, SARDEMProjection],
        'SARDEMProjection',
        is_name_incremental=True,
        inputs={'insar': 'basename', 'indem': dem})
    xyz = pipelines.register_pipeline(
        [SARCartesianMeanEstimation],
        'SARCartesianMeanEstimation',
        inputs={'insar': 'basename', 'indem': dem, 'indemproj': demproj})
    lia = pipelines.register_pipeline(
        [ComputeNormalsOnS1, ComputeLIAOnS1],
        'Normals|LIA',
        is_name_incremental=True,
        inputs={'xyz': xyz})

    # "inputs" parameter doesn't need to be specified in the following pipeline declarations
    # but we still use it for clarity!
    ortho_deg       = pipelines.register_pipeline(
            [filter_LIA('LIA'), OrthoRectifyLIA],
            'OrthoLIA',
            inputs={'in': lia},
            is_name_incremental=True)
    concat_deg      = pipelines.register_pipeline(
            [ConcatenateLIA],
            'ConcatLIA',
            inputs={'in': ortho_deg})
    pipelines.register_pipeline(
            [SelectBestCoverage],
            'SelectLIA',
            inputs={'in': concat_deg},
            product_required=produce_angles)

    ortho_sin = pipelines.register_pipeline(
            [filter_LIA('sin_LIA'), OrthoRectifyLIA],
            'OrthoSinLIA',
            inputs={'in': lia},
            is_name_incremental=True)
    concat_sin      = pipelines.register_pipeline(
            [ConcatenateLIA],
            'ConcatSinLIA',
            inputs={'in': ortho_sin})
    best_concat_sin = pipelines.register_pipeline(
            [SelectBestCoverage],
            'SelectSinLIA',
            inputs={'in': concat_sin},
            product_required=True)

    return best_concat_sin


def register_LIA_pipelines_v1_1(
    pipelines: PipelineDescriptionSequence,
    produce_angles: bool,
) -> PipelineDescription:
    """
    Internal function that takes care to register all pipelines related to
    LIA map and sin(LIA) map.
    """
    pipelines.register_inputs('tilename', tilename_first_inputs_factory)
    dem_vrt = pipelines.register_pipeline(
        [AgglomerateDEMOnS2],
        'AgglomerateDEM',
        inputs={'tilename': 'tilename'},
    )

    s2_dem = pipelines.register_pipeline(
        [ProjectDEMToS2Tile],
        "ProjectDEMToS2Tile",
        is_name_incremental=True,
        inputs={"indem": dem_vrt}
    )

    s2_height = pipelines.register_pipeline(
        [
            ProjectGeoidToS2Tile,
            SumAllHeights(
                product_key='height_on_s2',
                key_map={'indem': 'in_s2_dem', 'ingeoid': 'in_s2_geoid'},
                fname_fmt_default='DEM+GEOID_projected_on_{tile_name}.tiff',
                dname_fmt_default='S2/{tile_name}',
                image_description='DEM + GEOID height info projected on S2 tile',
            ),
        ],
        "GenerateHeightForS2Tile",
        is_name_incremental=True,
        inputs={"in_s2_dem": s2_dem},
    )

    # Notes:
    # * ComputeGroundAndSatPositionsOnDEM cannot be merged in memory with
    #   normals production AND LIA production: indeed the XYZ, and satposXYZ
    #   data needs to be reused several times, and in-memory pipeline can't
    #   support that (yet?)
    # * ExtractSentinel1Metadata needs to be in its own pipeline to make sure
    #   all meta are available later on to filter on the coverage.
    # * ComputeGroundAndSatPositionsOnDEM takes care of filtering on the
    #   coverage. We don't need any SelectBestS1onS2Coverage prior to this step.
    sar = pipelines.register_pipeline(
        [ExtractSentinel1Metadata],
        inputs={'inrawsar': 'basename'}
    )
    xyz = pipelines.register_pipeline(
        [ComputeGroundAndSatPositionsOnDEM],
        "ComputeGroundAndSatPositionsOnDEM",
        inputs={'insar': sar, 'inheight': s2_height},
    )

    # Always generate sin(LIA). If LIA° is requested, then it's also a
    # final/requested product.
    # produce_angles is ignored as there is no extra select_LIA step
    lia = pipelines.register_pipeline(
        [ComputeNormalsOnS2, ComputeLIAOnS2],
        'ComputeLIAOnS2',
        is_name_incremental=True,
        inputs={'xyz': xyz},
        product_required=True,
    )
    return lia



def register_GAMMA_AREA_pipelines(
    pipelines: PipelineDescriptionSequence,
    config: Configuration
) -> PipelineDescription:
    """
    Internal function that takes care to register all pipelines related to
    GAMMA AREA map.
    """
    # Build VRT
    dem = pipelines.register_pipeline(
        [AgglomerateDEMOnS1],
        'AgglomerateDEM',
        inputs={'insar': 'basename'},
    )

    # Resample DEM
    resampled_dem = dem
    if config.use_resampled_dem:
        resampled_dem = pipelines.register_pipeline(
            [NaNifyNoData, ResampleDEM],
            'RigidTransformResample',
            inputs={'indem': dem},
        )

    # Combine dem + geoid in order to optimize SARDEMProjection execution
    # 66% of its execution time would be lost in mutexes for accessing Geoid elevation on each point
    # otherwise.
    heights = pipelines.register_pipeline(
        [
            ProjectGeoidToDEM,
            SumAllHeights(
                product_key='height_4rtc',
                key_map={'indem': 'indem', 'ingeoid': 'ingeoid'},
                fname_fmt_default='DEM+GEOID_{polarless_basename}',
                dname_fmt_default='S1',
                image_description='DEM + GEOID',
            ),
        ],
        'Combine DEM+height',
        is_name_incremental=True,
        inputs={'indem': resampled_dem},
    )

    # Project DEM
    demproj = pipelines.register_pipeline(
        [ExtractSentinel1Metadata, SARDEMProjectionImageEstimation],
        'SARDEMProjection',
        is_name_incremental=True,
        # inputs={'insar': 'basename', 'indem': resampled_dem},
        inputs={'insar': 'basename', 'indem': heights},
    )

    # gamma area
    gamma_area = pipelines.register_pipeline(
        [SARGammaAreaImageEstimation],
        'SARGammaAreaImageEstimation',
        # TODO: indem parameter doesn't make sens in the application code...
        inputs={'insar': 'basename', 'indem': heights, 'indemproj': demproj},
    )

    # ortho gamma area
    ortho_gamma_area = pipelines.register_pipeline(
        [OrthoRectifyGAMMA_AREA],
        'OrthoGAMMA_AREA',
        inputs={'in': gamma_area},
        is_name_incremental=True,
    )
    concat_ortho_gamma_area = pipelines.register_pipeline(
        [ConcatenateGAMMA_AREA],
        'ConcatGAMMA_AREA',
        inputs={'in': ortho_gamma_area},
    )
    best_concat_ortho_gamma_area = pipelines.register_pipeline(
        [SelectGammaNaughtAreaBestCoverage],
        'SelectGAMMA_AREA',
        inputs={'in': concat_ortho_gamma_area},
        product_required=True,
    )

    return best_concat_ortho_gamma_area


def s1_raster_first_inputs_factory(
    *,
    tile_name          : str,
    s1_file_manager    : S1FileManager,
    output_name_formats: List[Tuple[str, str]],
    dryrun             : bool,
    **kwargs,  # pylint: disable=unused-argument
) -> List[Outcome[FirstStep]]:
    """
    :class:`FirstStepFactory` hook dedicated to S1 images.
    """
    matching_rasters = get_s1_files_for_tile(s1_file_manager, tile_name, output_name_formats, dryrun)
    if not matching_rasters:
        return [cast(Outcome[FirstStep], matching_rasters)]
    intersect_raster_list = matching_rasters.value()

    if len(intersect_raster_list) == 0:
        logger.info("No intersection with tile %s", tile_name)
        return []
    return s1_raster_first_inputs_factory_from_rasters(tile_name, intersect_raster_list)


def s1_raster_first_inputs_factory_from_rasters(
    tile_name      : str,
    raster_list : List[Dict],
    **kwargs,  # pylint: disable=unused-argument
) -> List[Outcome[FirstStep]]:
    """
    :class:`FirstStepFactory` hook dedicated to S1 images: converts S1 raster list into
    :class:`FirstStep` instance list.
    """
    assert raster_list
    first_inputs = []
    for raster_info in raster_list:
        raster: S1DateAcquisition = raster_info['raster']

        manifest = raster.get_manifest()
        for image in raster.get_images_list():
            start = FirstStep(tile_name=tile_name,
                              tile_origin=raster_info['tile_origin'],
                              tile_coverage=raster_info['tile_coverage'],
                              manifest=manifest,
                              basename=image)
            first_inputs.append(Outcome(start))

    # Log commented and kept for filling in unit tests
    # logger.debug('Generate first steps from: %s', intersect_raster_list)
    return first_inputs


def tilename_first_inputs_factory(
    *,
    tile_name    : str,
    configuration: Configuration,
    **kwargs,  # pylint: disable=unused-argument
) -> List[Outcome[FirstStep]]:
    """
    :class:`FirstStepFactory` hook dedicated to S2 MGRS tile information: name and footprint origin.
    """
    # TODO: avoid to search this information multiple times
    tiles_db  = configuration.output_grid
    layer     = Utils.Layer(tiles_db)
    tile_info = layer.find_tile_named(tile_name)
    if not tile_info:
        raise RuntimeError(f"Tile {tile_name} cannot be found in {tiles_db!r}")
    tile_footprint = tile_info.GetGeometryRef()
    area_polygon   = tile_footprint.GetGeometryRef(0)
    points         = area_polygon.GetPoints()
    tile_origin    = [(point[0], point[1]) for point in points[:-1]]
    return [
        Outcome(FirstStep(
            tile_name=tile_name,
            tile_origin=tile_origin,  # S2 tile footprint
            basename=f"S2info_{tile_name}",
            out_filename=tiles_db,  # Trick existing file detection
            does_product_exist=lambda: True,
        )),
    ]


def eof_first_inputs_factory(
    tile_name        : str,
    configuration    : Configuration,
    eof_file_manager : EOFFileManager,
    **kwargs,  # pylint: disable=unused-argument
) -> List[Outcome[FirstStep]]:
    """
    :class:`FirstStepFactory` hook dedicated to precise orbit inputs.

    It takes takes of returning or downloading the EOF files on-the-fly according to the single
    relative_orbit number requested in the configuration.

    :precondition: one and only one relative orbit number must have been requested in the configuration.
    :precondition: one and only one mission must have been requested in the configuration.
    """
    assert len(configuration.relative_orbit_list) >= 1
    relative_orbits = configuration.relative_orbit_list
    logger.debug("Configure EOF inputs for tile %s, orbit %s", tile_name, relative_orbits)
    eof_founds = eof_file_manager.search_for(relative_orbits)
    assert len(eof_founds) > 0
    if not eof_founds[0]:
        error = eof_founds[0].error()
        raise exceptions.DownloadEOFFileError(str(error)) from error
    logger.info("Orbit %s OSVs will be taken from %s", relative_orbits, ",".join([f"{eof_file.value()}" for eof_file in eof_founds]))
    # Duplicate the first step for all tile_name (as this is what will be used to attach dropped inputs)
    # TODO: see how to support the case where all inputs are dropped...
    # TODO: keep only one eof_file per series of consecutive files related to a same orbit
    #       => associate orbit+mission to a single EOF file
    steps = []
    for eof_entry in eof_founds:
        if not eof_entry:
            error = eof_entry.error()
            raise exceptions.DownloadEOFFileError(str(error)) from error
        for relorb, product in eof_entry.value().items():
            logger.debug("#  orb=%03d, product=%s", relorb, product.filename)
            assert product, f"Here, we should have a non null instance for {product=}"
            step = FirstStep(
                orbit=f"{relorb:0>3d}",
                basename=str(product.filename),
                flying_unit_code=product.mission.lower(),
                tile_name=tile_name,
            )
            steps.append(step)
    for step in steps:
        logger.debug("- EOF FirstStep = %s", step)
    return [Outcome(step) for step in steps]


def register_LIA_pipelines(
    pipelines: PipelineDescriptionSequence,
    produce_angles: bool,
) -> PipelineDescription:
    """
    Internal function that takes care to register all pipelines related to
    LIA map and sin(LIA) map.
    """
    pipelines.register_inputs('tilename', tilename_first_inputs_factory)
    dem_vrt = pipelines.register_pipeline(
        [AgglomerateDEMOnS2], 'AgglomerateDEM',
        inputs={'tilename': 'tilename'},
    )

    s2_dem = pipelines.register_pipeline(
        [ProjectDEMToS2Tile], "ProjectDEMToS2Tile",
        is_name_incremental=True,
        inputs={"indem": dem_vrt}
    )

    s2_height = pipelines.register_pipeline(
        [
            ProjectGeoidToS2Tile,
            SumAllHeights(
                product_key='height_on_s2',
                key_map={'indem': 'in_s2_dem', 'ingeoid': 'in_s2_geoid'},
                fname_fmt_default='DEM+GEOID_projected_on_{tile_name}.tiff',
                dname_fmt_default='S2/{tile_name}',
                image_description='DEM + GEOID height info projected on S2 tile',
            ),
        ],
        "GenerateHeightForS2Tile",
        is_name_incremental=True,
        inputs={"in_s2_dem": s2_dem},
    )

    pipelines.register_inputs('eof', eof_first_inputs_factory)
    xyz = pipelines.register_pipeline(
        [ComputeGroundAndSatPositionsOnDEMFromEOF],
        "ComputeGroundAndSatPositionsOnDEM",
        inputs={'ineof': 'eof', 'inheight': s2_height},
    )

    # Always generate sin(LIA). If LIA° is requested, then it's also a
    # final/requested product.
    # produce_angles is ignored as there is no extra select_LIA step
    lia = pipelines.register_pipeline(
        [ComputeNormalsOnS2, ComputeLIAOnS2],
        'ComputeLIAOnS2',
        is_name_incremental=True,
        inputs={'xyz': xyz},
        product_required=True,
    )
    return lia


def register_IA_pipelines(
        pipelines: PipelineDescriptionSequence,
        # produce_angles: bool,
) -> PipelineDescription:
    """
    Internal function that takes care to register all pipelines related to
    IA map and sin(IA) map.
    """
    pipelines.register_inputs('tilename', tilename_first_inputs_factory)

    pipelines.register_inputs('eof', eof_first_inputs_factory)
    xyz = pipelines.register_pipeline(
        [ComputeGroundAndSatPositionsOnEllipsoid],
        "ComputeGroundAndSatPositionsOnEllipsoid",
        inputs={'tilename': 'tilename', 'ineof': 'eof'},
    )

    # And then this time, normals are computed from S2 tile
    # Always generate sin(IA). If IA° is requested, then it's also a
    # final/requested product.
    # produce_angles is ignored as there is no extra select_IA step
    lia = pipelines.register_pipeline(
        [ComputeEllipsoidNormalsOnS2, ComputeIAOnS2],
        'ComputeIAOnS2',
        is_name_incremental=True,
        inputs={'tilename': 'tilename', 'xyz': xyz},
        product_required=True,
    )
    return lia


[docs] def s1_process( # pylint: disable=too-many-arguments, too-many-locals config_opt : Union[str, Configuration], *, dl_wait : int = EODAG_DEFAULT_DOWNLOAD_WAIT, dl_timeout : int = EODAG_DEFAULT_DOWNLOAD_TIMEOUT, searched_items_per_page : int = EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE, nb_max_search_retries : int = EODAG_DEFAULT_SEARCH_MAX_RETRIES, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, cache_before_ortho : bool = False, lia_process = None, gamma_area_process = None, ) -> exits.Situation: """ Entry point to :ref:`S1Tiling classic scenario <scenario.S1Processor>` and :ref:`S1Tiling NORMLIM scenario <scenario.S1ProcessorLIA>` of on demand Ortho-rectification of Sentinel-1 data on Sentinel-2 grid for all calibration kinds. It performs the following steps: 1. Download S1 images from S1 data provider (through eodag) This step may be ignored if ``config_opt`` *download* option is false; 2. Calibrate the S1 images according to the *calibration* option from ``config_opt``; 3. Orthorectify S1 images and cut their on geometric tiles; 4. Concatenate images from the same orbit on the same tile; 5. Build mask files; 6. Despeckle final images. :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dl_wait: Permits to override EODAG default wait time in minutes between two download tries. :param dl_timeout: Permits to override EODAG default maximum time in mins before stop retrying to download (default=20) :param searched_items_per_page: Tells how many items are to be returned by EODAG when searching for S1 images. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :param cache_before_ortho: Cutting, calibration and orthorectification are chained in memory unless this option is true. In that case, :ref:`Cut and calibrated (aka "OrthoReady") files <orthoready-files>` are stored in :ref:`%(tmp) <paths.tmp>`:samp:`/S1/` directory. Do not forget to regularly clean up this space. :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder( config: Configuration, dryrun: bool, debug_caches: bool, ) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: assert (not config.filter) or (config.keep_non_filtered_products or not config.mask_cond), \ 'Cannot purge non filtered products when mask are also produced!' output_name_formats = main_output_name_formats(config) chain_LIA_and_despeckle_inmemory = config.filter and not config.keep_non_filtered_products chain_GAMMA_AREA_and_despeckle_inmemory = config.filter and not config.keep_non_filtered_products chain_concat_and_despeckle_inmemory = False # See issue #118 pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) pipelines.register_inputs('basename', s1_raster_first_inputs_factory) pipelines.register_extra_parameters_for_input_factories(output_name_formats=output_name_formats) # Calibration ... OrthoRectification calib_seq = [ExtractSentinel1Metadata, AnalyseBorders, Calibrate] if config.removethermalnoise: calib_seq += [CorrectDenoising] calib_seq += [CutBorders] if cache_before_ortho: pipelines.register_pipeline(calib_seq, 'PrepareForOrtho', product_required=False, is_name_incremental=True) pipelines.register_pipeline([OrthoRectify], 'OrthoRectify', product_required=False) else: calib_seq += [OrthoRectify] pipelines.register_pipeline(calib_seq, 'FullOrtho', product_required=False, is_name_incremental=True) calibration_is_done_in_S1 = config.calibration_type in ['sigma', 'beta', 'gamma', 'dn'] # Concatenation (... + Despeckle) // not working yet, see issue #118 concat_seq : List[Type[StepFactory]] = [Concatenate] if chain_concat_and_despeckle_inmemory: concat_seq.append(SpatialDespeckle) need_to_keep_non_filtered_products = False else: need_to_keep_non_filtered_products = True concat_S2 = pipelines.register_pipeline( concat_seq, product_required=calibration_is_done_in_S1, is_name_incremental=True ) last_product_S2 = concat_S2 required_workspaces = [WorkspaceKinds.TILE] # LIA Calibration (...+ Despeckle) if config.calibration_type == 'normlim': apply_LIA_seq : List[Type[StepFactory]] = [ApplyLIACalibration] if chain_LIA_and_despeckle_inmemory: apply_LIA_seq.append(SpatialDespeckle) need_to_keep_non_filtered_products = False else: need_to_keep_non_filtered_products = True LIA_registration = lia_process or register_LIA_pipelines lias = LIA_registration(pipelines, config.produce_lia_map) # This steps helps forwarding sin(LIA) (only) to the next step # that corrects the β° with sin(LIA) map. sin_LIA = pipelines.register_pipeline( [filter_LIA('sin_LIA')], 'SelectSinLIA', is_name_incremental=True, inputs={'in': lias}, ) # TODO: Merge filter_LIA in apply_LIA_seq! apply_LIA = pipelines.register_pipeline( apply_LIA_seq, product_required=True, inputs={'sin_LIA': sin_LIA, 'concat_S2': concat_S2}, is_name_incremental=True, ) last_product_S2 = apply_LIA required_workspaces.append(WorkspaceKinds.LIA) # GAMMA AREA Calibration (...+ Despeckle) elif config.calibration_type == 'gamma_naught_rtc': apply_GAMMA_AREA_seq: List[Type[StepFactory]] = [ApplyGammaNaughtRTCCalibration] if chain_GAMMA_AREA_and_despeckle_inmemory: apply_GAMMA_AREA_seq.append(SpatialDespeckle) need_to_keep_non_filtered_products = False else: need_to_keep_non_filtered_products = True GammaNaughtArea_registration = gamma_area_process or register_GAMMA_AREA_pipelines gammanaughtareas = GammaNaughtArea_registration(pipelines, config) apply_GAMMA_AREA = pipelines.register_pipeline( apply_GAMMA_AREA_seq, product_required=True, inputs={'gamma_area': gammanaughtareas, 'concat_S2': concat_S2}, is_name_incremental=True, ) last_product_S2 = apply_GAMMA_AREA required_workspaces.append(WorkspaceKinds.GAMMA_AREA) # Masking if config.mask_cond: pipelines.register_pipeline( [BuildBorderMask, SmoothBorderMask], 'GenerateMask', product_required=True, inputs={'in': last_product_S2}) required_workspaces.append(WorkspaceKinds.MASK) # Quicklook if config.generate_quicklook: pipelines.register_pipeline( [GenerateQuickLook], 'GenerateQuickLook', product_required=True, inputs={'in': last_product_S2}) required_workspaces.append(WorkspaceKinds.QUICKLOOK) # Despeckle in non-inmemory case if config.filter: # Use SpatialDespeckle, only if filter ∈ [lee, gammamap, frost, kuan] required_workspaces.append(WorkspaceKinds.FILTER) if need_to_keep_non_filtered_products: # config.keep_non_filtered_products: # Define another pipeline if chaining cannot be done in memory pipelines.register_pipeline( [SpatialDespeckle], product_required=True, inputs={'in': last_product_S2}) return pipelines, required_workspaces def _check_requested_number_of_orbits(config: LIAConfiguration) -> bool: # The check is positive: failing to comply => there is an issue in the options return config.calibration_type != 'normlim' or len(config.relative_orbit_list) > 0 return do_process_with_pipeline( config_opt, builder, extra_config_checks=[( _check_requested_number_of_orbits, "At least one relative orbit is required for LIA map generation" )], ctx_managers=[DEMWorkspace], dl_wait=dl_wait, dl_timeout=dl_timeout, searched_items_per_page=searched_items_per_page, nb_max_search_retries=nb_max_search_retries, dryrun=dryrun, debug_otb=debug_otb, debug_caches=debug_caches, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'s1': S1FileManager}, )
def s1_process_lia_v0( # pylint: disable=too-many-arguments config_opt : Union[str, Configuration], *, dl_wait : int = EODAG_DEFAULT_DOWNLOAD_WAIT, dl_timeout : int = EODAG_DEFAULT_DOWNLOAD_TIMEOUT, searched_items_per_page: int = EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE, nb_max_search_retries : int = EODAG_DEFAULT_SEARCH_MAX_RETRIES, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, ) -> exits.Situation: """ Entry point to :ref:`LIA Map production scenario <scenario.S1LIAMap>` that generates Local Incidence Angle Maps on S2 geometry. It performs the following steps: 1. Determine the S1 products to process Given a list of S2 tiles, we first determine the day that'll the best coverage of each S2 tile in terms of S1 products. In case there is no single day that gives the best coverage for all S2 tiles, we try to determine the best solution that minimizes the number of S1 products to download and process. 2. Process these S1 products :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dl_wait: Permits to override EODAG default wait time in minutes between two download tries. :param dl_timeout: Permits to override EODAG default maximum time in mins before stop retrying to download (default=20) :param searched_items_per_page: Tells how many items are to be returned by EODAG when searching for S1 images. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder(config: Configuration, dryrun: bool, debug_caches: bool) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) pipelines.register_inputs('basename', s1_raster_first_inputs_factory) output_name_formats = [ (dname_fmt_lia_product(config), 'sin_LIA_{flying_unit_code}_{tile_name}_{orbit_direction}_{orbit}.tif'), ] if config.produce_lia_map: output_name_formats.append( (dname_fmt_lia_product(config), 'LIA_{flying_unit_code}_{tile_name}_{orbit_direction}_{orbit}.tif') ) pipelines.register_extra_parameters_for_input_factories(output_name_formats=output_name_formats) register_LIA_pipelines_v0(pipelines, produce_angles=config.produce_lia_map) required_workspaces = [WorkspaceKinds.LIA] return pipelines, required_workspaces return do_process_with_pipeline( config_opt, builder, ctx_managers=[DEMWorkspace], dl_wait=dl_wait, dl_timeout=dl_timeout, searched_items_per_page=searched_items_per_page, nb_max_search_retries=nb_max_search_retries, dryrun=dryrun, debug_caches=debug_caches, debug_otb=debug_otb, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'s1': S1FileManager}, ) def s1_process_lia_v1_1( # pylint: disable=too-many-arguments config_opt : Union[str, Configuration], *, dl_wait : int = EODAG_DEFAULT_DOWNLOAD_WAIT, dl_timeout : int = EODAG_DEFAULT_DOWNLOAD_TIMEOUT, searched_items_per_page: int = EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE, nb_max_search_retries : int = EODAG_DEFAULT_SEARCH_MAX_RETRIES, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, ) -> exits.Situation: """ Entry point to :ref:`LIA Map production scenario <scenario.S1LIAMap>` that generates Local Incidence Angle Maps on S2 geometry. It performs the following steps: 1. Determine the S1 products to process Given a list of S2 tiles, we first determine the day that'll the best coverage of each S2 tile in terms of S1 products. In case there is no single day that gives the best coverage for all S2 tiles, we try to determine the best solution that minimizes the number of S1 products to download and process. 2. Process these S1 products :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dl_wait: Permits to override EODAG default wait time in minutes between two download tries. :param dl_timeout: Permits to override EODAG default maximum time in mins before stop retrying to download (default=20) :param searched_items_per_page: Tells how many items are to be returned by EODAG when searching for S1 images. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder(config: Configuration, dryrun: bool, debug_caches: bool) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) pipelines.register_inputs('basename', s1_raster_first_inputs_factory) output_name_formats = [ (dname_fmt_lia_product(config), 'sin_LIA_{flying_unit_code}_{tile_name}_{orbit_direction}_{orbit}.tif'), ] if config.produce_lia_map: output_name_formats.append( (dname_fmt_lia_product(config), 'LIA_{flying_unit_code}_{tile_name}_{orbit_direction}_{orbit}.tif') ) pipelines.register_extra_parameters_for_input_factories(output_name_formats=output_name_formats) register_LIA_pipelines_v1_1(pipelines, produce_angles=config.produce_lia_map) required_workspaces = [WorkspaceKinds.LIA] return pipelines, required_workspaces return do_process_with_pipeline( config_opt, builder, ctx_managers=[DEMWorkspace], dl_wait=dl_wait, dl_timeout=dl_timeout, searched_items_per_page=searched_items_per_page, nb_max_search_retries=nb_max_search_retries, dryrun=dryrun, debug_caches=debug_caches, debug_otb=debug_otb, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'s1': S1FileManager}, ) def s1_process_lia_v1_2( # pylint: disable=too-many-arguments config_opt : Union[str, Configuration], *, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, ) -> exits.Situation: """ Entry point to :ref:`LIA Map production scenario <scenario.S1LIAMap>` that generates :ref:`Local Incidence Angle Maps on S2 geometry <lia-files>`. It performs the following steps: 1. Register the downloading of missing EOF matching the requested (relative) orbit number 2. Generate the LIA maps :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder(config: Configuration, dryrun: bool, debug_caches: bool) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) register_LIA_pipelines(pipelines, produce_angles=config.produce_lia_map) required_workspaces = [WorkspaceKinds.LIA] return pipelines, required_workspaces return do_process_with_pipeline( config_opt, builder, extra_config_checks=[( lambda config: len(config.relative_orbit_list) > 0, "At least one relative orbit is required for LIA map generation" )], ctx_managers=[DEMWorkspace], dryrun=dryrun, debug_caches=debug_caches, debug_otb=debug_otb, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'eof': EOFFileManager}, ) s1_process_lia = s1_process_lia_v1_2
[docs] def s1_process_ia( # pylint: disable=too-many-arguments config_opt : Union[str, Configuration], *, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, ) -> exits.Situation: """ Entry point to :ref:`IA Map production scenario <scenario.S1IAMap>` that generates :ref:`Incidence Angle Maps on S2 geometry <ia-files>`. It performs the following steps: 1. Register the downloading of missing EOF matching the requested (relative) orbit number 2. Generate the IA maps :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder(config: Configuration, dryrun: bool, debug_caches: bool) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) register_IA_pipelines(pipelines) required_workspaces = [WorkspaceKinds.IA] return pipelines, required_workspaces return do_process_with_pipeline( config_opt, builder, extra_config_checks=[( lambda config: len(config.relative_orbit_list) > 0, "At least one relative orbit is required for Ellipsoid IA map generation" )], dryrun=dryrun, debug_caches=debug_caches, debug_otb=debug_otb, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'eof': EOFFileManager}, )
[docs] def s1_process_gamma_area( # pylint: disable=too-many-arguments config_opt : Union[str, Configuration], *, dl_wait : int = EODAG_DEFAULT_DOWNLOAD_WAIT, dl_timeout : int = EODAG_DEFAULT_DOWNLOAD_TIMEOUT, searched_items_per_page: int = EODAG_DEFAULT_SEARCH_ITEMS_PER_PAGE, nb_max_search_retries : int = EODAG_DEFAULT_SEARCH_MAX_RETRIES, dryrun : bool = False, debug_otb : bool = False, debug_caches : bool = False, watch_ram : bool = False, debug_tasks : bool = False, ) -> exits.Situation: """ Entry point to :ref:`GAMMA_AREA Map production scenario <scenario.S1GammaAreaMap>` that generates Gamma Area Maps on S2 geometry. It performs the following steps: 1. Determine the S1 products to process Given a list of S2 tiles, we first determine the day that'll the best coverage of each S2 tile in terms of S1 products. In case there is no single day that gives the best coverage for all S2 tiles, we try to determine the best solution that minimizes the number of S1 products to download and process. 2. Process these S1 products :param config_opt: Either a :ref:`request configuration file <request-config-file>` or a :class:`s1tiling.libs.configuration.Configuration` instance. :param dl_wait: Permits to override EODAG default wait time in minutes between two download tries. :param dl_timeout: Permits to override EODAG default maximum time in mins before stop retrying to download (default=20) :param searched_items_per_page: Tells how many items are to be returned by EODAG when searching for S1 images. :param dryrun: Used for debugging: external (OTB/GDAL) application aren't executed. :param debug_otb: Used for debugging: Don't execute processing tasks in DASK workers but directly in order to be able to analyse OTB/external application through a debugger. :param debug_caches: Used for debugging: Don't delete the intermediary files but leave them behind. :param watch_ram: Used for debugging: Monitoring Python/Dask RAM consumption. :param debug_tasks: Generate SVG images showing task graphs of the processing flows :return: A *nominal* exit code depending of whether everything could have been downloaded and produced. :rtype: :class:`s1tiling.libs.exits.Situation` :exception Error: A variety of exceptions. See below (follow the link). """ def builder(config: Configuration, dryrun: bool, debug_caches: bool) -> Tuple[PipelineDescriptionSequence, List[WorkspaceKinds]]: pipelines = PipelineDescriptionSequence(config, dryrun=dryrun, debug_caches=debug_caches) pipelines.register_inputs('basename', s1_raster_first_inputs_factory) output_name_formats = [(dname_fmt_gamma_area_product(config), fname_fmt_gamma_area_product(config))] pipelines.register_extra_parameters_for_input_factories(output_name_formats=output_name_formats) register_GAMMA_AREA_pipelines(pipelines, config=config) required_workspaces = [WorkspaceKinds.GAMMA_AREA] return pipelines, required_workspaces return do_process_with_pipeline( config_opt, builder, ctx_managers=[DEMWorkspace], dl_wait=dl_wait, dl_timeout=dl_timeout, searched_items_per_page=searched_items_per_page, nb_max_search_retries=nb_max_search_retries, dryrun=dryrun, debug_caches=debug_caches, debug_otb=debug_otb, watch_ram=watch_ram, debug_tasks=debug_tasks, file_manager_builders={'s1': S1FileManager}, )