Source code for otx.core.ov.omz_wrapper

"""OMZ wrapper-related code for otx.core.ov."""
# Copyright (C) 2023 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0

import hashlib
import os
import shutil
import string
import sys
import time
from pathlib import Path
from typing import Dict, List

import requests
from openvino.model_zoo import _common, _reporting
from openvino.model_zoo._configuration import load_models
from openvino.model_zoo.download_engine.downloader import Downloader
from openvino.model_zoo.download_engine.postprocessing import PostprocUnpackArchive
from openvino.model_zoo.omz_converter import ModelOptimizerProperties, convert_to_onnx
from requests.exceptions import HTTPError

from otx.core.file import OTX_CACHE

# pylint: disable=too-many-locals, too-many-branches
OMZ_CACHE = os.path.join(OTX_CACHE, "omz")
os.makedirs(OMZ_CACHE, exist_ok=True)


OMZ_PUBLIC_MODELS: Dict[str, List[str]] = dict(
    cls=[
        "alexnet",
        "caffenet",
        #  "convnext-tiny",                # omz_downloader does not support
        "densenet-121",
        "densenet-121-tf",
        "dla-34",
        "efficientnet-b0",
        "efficientnet-b0-pytorch",
        "efficientnet-v2-b0",
        "efficientnet-v2-s",
        "hbonet-1.0",
        "hbonet-0.25",
        "googlenet-v1",
        "googlenet-v1-tf",
        "googlenet-v2",
        "googlenet-v2-tf",
        "googlenet-v3",
        "googlenet-v3-pytorch",
        "googlenet-v4-tf",
        "inception-resnet-v2-tf",
        #  "levit-128s",                   # IR has hard-codeded batch size of 1
        "mixnet-l",
        "mobilenet-v1-0.25-128",
        "mobilenet-v1-1.0-224",
        "mobilenet-v1-1.0-224-tf",
        "mobilenet-v2",
        "mobilenet-v2-1.0-224",
        "mobilenet-v2-pytorch",
        "mobilenet-v2-1.4-224",
        "mobilenet-v3-small-1.0-224-tf",
        "mobilenet-v3-large-1.0-224-tf",
        #  "nfnet-f0",                     # mo 2022.2 bug
        #  "regnetx-3.2gf",                # omz_converter does not support
        "octave-resnet-26-0.25",
        #  "repvgg-a0",                    # trainig and inference architecture are difference
        #  "repvgg-b1",                    # trainig and inference architecture are difference
        #  "repvgg-b3",                    # trainig and inference architecture are difference
        #  "resnest-50-pytorch",           # IR has hard-coded batch size of 1
        "resnet-18-pytorch",
        "resnet-34-pytorch",
        "resnet-50-pytorch",
        "resnet-50-tf",
        #  "rexnet-v1-x1.0",               # IR has hard-coded batch size of 1
        "se-inception",
        "se-resnet-50",
        "se-resnext-50",
        "shufflenet-v2-x0.5",
        #  "shufflenet-v2-x1.0",           # IR has hard-coded batch size of 1
        "squeezenet1.0",
        "squeezenet1.1",
        #  "swin-tiny-patch4-window7-224", # IR has hard-coded batch size of 1
        #  "t2t-vit-14",                   # IR has hard-coded batch size of 1
        "vgg16",
        "vgg19",
    ],
    det=[],
    seg=[],
)


AVAILABLE_OMZ_MODELS: List[str] = []
for models_ in OMZ_PUBLIC_MODELS.values():
    for model_ in models_:
        AVAILABLE_OMZ_MODELS.append(model_)


[docs] class NameSpace: """NameSpace class for otx.core.ov.omz_wrapper.""" def __init__(self, **kwargs): self.__dict__.update(kwargs)
def _get_etag(url): """Getter etag function from url.""" try: response = requests.head(url, allow_redirects=True, timeout=100) if response.status_code != 200: return None return response.headers.get("ETag", None) except HTTPError: return None def _get_ir_path(directory): """Getter IR path function from directory path.""" directory = Path(directory) model_path = list(directory.glob("**/*.xml")) weight_path = list(directory.glob("**/*.bin")) if model_path and weight_path: assert len(model_path) == 1 and len(weight_path) == 1 return dict(model_path=model_path[0], weight_path=weight_path[0]) return None def _run_pre_convert(reporter, model, output_dir, args): """Run pre-converting function.""" script = _common.MODEL_ROOT / model.subdirectory_ori / "pre-convert.py" if not script.exists(): return True reporter.print_section_heading( "{}Running pre-convert script for {}", "(DRY RUN) " if args.dry_run else "", model.name, ) cmd = [ str(args.python), "--", str(script), "--", str(args.download_dir / model.subdirectory), str(output_dir / model.subdirectory), ] reporter.print("Pre-convert command: {}", _common.command_string(cmd)) reporter.print(flush=True) success = True if args.dry_run else reporter.job_context.subprocess(cmd) reporter.print() return success def _update_model(model): """Update model configs for omz_wrapper.""" m_hash = hashlib.sha256() for file in model.files: url = file.source.url etag = _get_etag(url) if etag is not None: m_hash.update(bytes(etag, "utf-8")) model.subdirectory_ori = model.subdirectory model.subdirectory = Path(m_hash.hexdigest()) # FIXME: a bug from openvino-dev==2022.3.0 # It has been fixed on master branch. # After upgrading openvino-dev, we can remove this temporary patch if getattr(model, "conversion_to_onnx_args") and not [ arg for arg in model.conversion_to_onnx_args if arg.startswith("--model-path") ]: model.conversion_to_onnx_args.append("--model-path=")
[docs] def get_model_configuration(model_name): """Getter function of model configuration from name.""" model_configurations = load_models(_common.MODEL_ROOT, {}) for model in model_configurations: if model.name == model_name: _update_model(model) return model return None
[docs] def download_model(model, download_dir=OMZ_CACHE, precisions=None, force=False): """Function for downloading model from directory.""" download_dir = Path("") if download_dir is None else Path(download_dir) precisions = precisions if precisions else {"FP32"} # TODO: need delicate cache management if not force and (download_dir / model.subdirectory).exists(): target_file_names = [] for postprocessing in model.postprocessing: if isinstance(postprocessing, PostprocUnpackArchive): target_file_names.append(postprocessing.file) done = [False for _ in model.files] for i, file in enumerate(model.files): filename = file.name if filename in target_file_names: # TODO # here, we assume unarchive is done done[i] = True continue if os.path.exists(download_dir / model.subdirectory / filename): done[i] = True if all(done): return reporter = Downloader.make_reporter("text") downloader = Downloader(precisions, download_dir) failed_models = downloader.bulk_download_model([model], reporter, 1, "text") if failed_models: reporter.print("FAILED:") for failed_model_name in failed_models: reporter.print(failed_model_name) sys.exit(1)
def _convert(reporter, model, output_dir, namespace, mo_props, requested_precisions): """Convert function for OMZ wrapper.""" if model.mo_args is None: reporter.print_section_heading("Skipping {} (no conversions defined)", model.name) reporter.print() return True model_precisions = requested_precisions & model.precisions if not model_precisions: reporter.print_section_heading("Skipping {} (all conversions skipped)", model.name) reporter.print() return True (output_dir / model.subdirectory).mkdir(parents=True, exist_ok=True) if not _run_pre_convert(reporter, model, output_dir, namespace): return False model_format = model.framework mo_extension_dir = mo_props.base_dir / "extensions" if not mo_extension_dir.exists(): mo_extension_dir = mo_props.base_dir template_variables = { "config_dir": _common.MODEL_ROOT / model.subdirectory_ori, "conv_dir": output_dir / model.subdirectory, "dl_dir": namespace.download_dir / model.subdirectory, "mo_dir": mo_props.base_dir, "mo_ext_dir": mo_extension_dir, } if model.conversion_to_onnx_args: if not convert_to_onnx(reporter, model, output_dir, namespace, template_variables): return False model_format = "onnx" expanded_mo_args = [string.Template(arg).substitute(template_variables) for arg in model.mo_args] for model_precision in sorted(model_precisions): data_type = model_precision.split("-")[0] layout_string = ",".join(f"{input.name}({input.layout})" for input in model.input_info if input.layout) shape_string = ",".join(str(input.shape) for input in model.input_info if input.shape) if layout_string: expanded_mo_args.append(f"--layout={layout_string}") if shape_string: expanded_mo_args.append(f"--input_shape={shape_string}") mo_cmd = [ *mo_props.cmd_prefix, f"--framework={model_format}", f"--output_dir={output_dir / model.subdirectory / model_precision}", f"--model_name={model.name}", f"--input={','.join(input.name for input in model.input_info)}".format(), *expanded_mo_args, *mo_props.extra_args, ] if "FP16" in data_type: mo_cmd.append("--compress_to_fp16") reporter.print_section_heading( "{}Converting {} to IR ({})", "(DRY RUN) " if namespace.dry_run else "", model.name, model_precision, ) reporter.print("Conversion command: {}", _common.command_string(mo_cmd)) if not namespace.dry_run: reporter.print(flush=True) if not reporter.job_context.subprocess(mo_cmd): # NOTE: mo returns non zero return code (245) even though it successfully generate IR cur_time = time.time() time_threshold = 5 xml_path = output_dir / model.subdirectory / model_precision / f"{model.name}.xml" bin_path = output_dir / model.subdirectory / model_precision / f"{model.name}.bin" if not ( os.path.exists(xml_path) and os.path.exists(bin_path) and os.path.getmtime(xml_path) - cur_time < time_threshold and os.path.getmtime(bin_path) - cur_time < time_threshold ): return False reporter.print() return True
[docs] def convert_model( model, download_dir=OMZ_CACHE, output_dir=OMZ_CACHE, precisions=None, force=False, *args, ): # pylint: disable=keyword-arg-before-vararg """Converting model for OMZ wrapping.""" download_dir = Path("") if download_dir is None else Path(download_dir) output_dir = Path("") if output_dir is None else Path(output_dir) precisions = precisions if precisions else {"FP32"} out = _get_ir_path(output_dir / model.subdirectory) if out and not force: return out namespace = NameSpace( python=shutil.which("python"), dry_run=False, download_dir=download_dir, ) mo_executable = shutil.which("mo") if mo_executable: mo_path = Path(mo_executable) else: try: mo_path = Path(os.environ["INTEL_OPENVINO_DIR"]) / "tools/mo/openvino/tools/mo/mo.py" if not mo_path.exists(): mo_path = Path(os.environ["INTEL_OPENVINO_DIR"]) / "tools/model_optimizer/mo.py" except KeyError: sys.exit( "Unable to locate Model Optimizer. " + "Use --mo or run setupvars.sh/setupvars.bat from the OpenVINO toolkit." ) mo_path = mo_path.resolve() mo_cmd_prefix = [namespace.python, "--", str(mo_path)] if str(mo_path).lower().endswith(".py"): mo_dir = mo_path.parent else: mo_package_path, stderr = _common.get_package_path(namespace.python, "openvino.tools.mo") mo_dir = mo_package_path if mo_package_path is None: mo_package_path, stderr = _common.get_package_path(args.python, "mo") if mo_package_path is None: sys.exit(f"Unable to load Model Optimizer. Errors occurred: {stderr}") mo_dir = mo_package_path.parent reporter = _reporting.Reporter(_reporting.DirectOutputContext()) mo_props = ModelOptimizerProperties( cmd_prefix=mo_cmd_prefix, extra_args=[], base_dir=mo_dir, ) shared_convert_args = (output_dir, namespace, mo_props, precisions) results = [] models = [] if model.model_stages: for model_stage in model.model_stages: results.append(_convert(reporter, model_stage, *shared_convert_args)) models.append(model_stage) else: results.append(_convert(reporter, model, *shared_convert_args)) models.append(model) failed_models = [model.name for model, successful in zip(models, results) if not successful] if failed_models: reporter.print("FAILED:") for failed_model_name in failed_models: reporter.print(failed_model_name) sys.exit(1) return _get_ir_path(output_dir / model.subdirectory)
[docs] def get_omz_model(model_name, download_dir=OMZ_CACHE, output_dir=OMZ_CACHE, force=False): """Get OMZ model from name and download_dir.""" model = get_model_configuration(model_name) download_model(model, download_dir=download_dir, force=force) return convert_model(model, download_dir=download_dir, output_dir=output_dir, force=force)