Source code for datumaro.plugins.openvino_plugin.launcher

# Copyright (C) 2019-2021 Intel Corporation
#
# SPDX-License-Identifier: MIT

# pylint: disable=exec-used


import logging as log
import os.path as osp
import shutil
import urllib
from dataclasses import dataclass, fields
from typing import Dict, List, Optional

import numpy as np
from openvino.runtime import Core
from tqdm import tqdm

from datumaro.components.abstracts.model_interpreter import LauncherInputType, ModelPred
from datumaro.components.cli_plugin import CliPlugin
from datumaro.components.launcher import LauncherWithModelInterpreter
from datumaro.errors import DatumaroError
from datumaro.util.definitions import get_datumaro_cache_dir
from datumaro.util.samples import get_samples_path


class _OpenvinoImporter(CliPlugin):
    @staticmethod
    def _parse_output_layers(s):
        return [s.strip() for s in s.split(",")]

    @classmethod
    def build_cmdline_parser(cls, **kwargs):
        parser = super().build_cmdline_parser(**kwargs)
        parser.add_argument(
            "-d", "--description", required=True, help="Path to the model description file (.xml)"
        )
        parser.add_argument(
            "-w", "--weights", required=True, help="Path to the model weights file (.bin)"
        )
        parser.add_argument(
            "-i",
            "--interpreter",
            required=True,
            help="Path to the network output interprter script (.py)",
        )
        parser.add_argument("--device", default="CPU", help="Target device (default: %(default)s)")
        parser.add_argument(
            "--output-layers",
            type=cls._parse_output_layers,
            help="A comma-separated list of extra output layers",
        )
        return parser

    @staticmethod
    def copy_model(model_dir, model):
        shutil.copy(model["description"], osp.join(model_dir, osp.basename(model["description"])))
        model["description"] = osp.basename(model["description"])

        shutil.copy(model["weights"], osp.join(model_dir, osp.basename(model["weights"])))
        model["weights"] = osp.basename(model["weights"])

        shutil.copy(model["interpreter"], osp.join(model_dir, osp.basename(model["interpreter"])))
        model["interpreter"] = osp.basename(model["interpreter"])


[docs] @dataclass class OpenvinoModelInfo: interpreter: Optional[str] description: Optional[str] weights: Optional[str] model_dir: Optional[str]
[docs] def validate(self): """Validate integrity of the member variables""" def _validate(key: str): path = getattr(self, key) if not osp.isfile(path): path = osp.join(self.model_dir, path) if not osp.isfile(path): raise DatumaroError(f'Failed to open model {key} file "{path}"') setattr(self, key, path) for field in fields(self): if field.name != "model_dir": _validate(field.name)
[docs] @dataclass class BuiltinOpenvinoModelInfo(OpenvinoModelInfo): downloadable_models = { "clip_text_ViT-B_32", "clip_visual_ViT-B_32", "googlenet-v4-tf", }
[docs] @classmethod def create_from_model_name(cls, model_name: str) -> "BuiltinOpenvinoModelInfo": openvino_plugin_samples_dir = get_samples_path() interpreter = osp.join(openvino_plugin_samples_dir, model_name + "_interp.py") interpreter = interpreter if osp.exists(interpreter) else interpreter model_dir = get_datumaro_cache_dir() # Please visit open-model-zoo repository for OpenVINO public models if you are interested in # https://github.com/openvinotoolkit/open_model_zoo/blob/master/models/public/index.md url_folder = "https://storage.openvinotoolkit.org/repositories/datumaro/models/" description = osp.join(model_dir, model_name + ".xml") if not osp.exists(description): description = ( cls._download_file(osp.join(url_folder, model_name + ".xml"), description) if model_name in cls.downloadable_models else None ) weights = osp.join(model_dir, model_name + ".bin") if not osp.exists(weights): weights = ( cls._download_file(osp.join(url_folder, model_name + ".bin"), weights) if model_name in cls.downloadable_models else None ) return cls( interpreter=interpreter, description=description, weights=weights, model_dir=model_dir, )
@staticmethod def _download_file(url: str, file_root: str) -> str: log.info('Downloading: "{}" to {}\n'.format(url, file_root)) req = urllib.request.Request(url) with urllib.request.urlopen(req) as source, open(file_root, "wb") as output: # nosec B310 with tqdm( total=int(source.info().get("Content-Length")), ncols=80, unit="iB", unit_scale=True, unit_divisor=1024, ) as loop: while True: buffer = source.read(8192) if not buffer: break output.write(buffer) loop.update(len(buffer)) return file_root
[docs] def override(self, other: OpenvinoModelInfo) -> None: """Override builtin model variables to other""" def _apply(key: str) -> None: other_item = getattr(other, key) self_item = getattr(self, key) if other_item is None and self_item: log.info(f"Override description with the builtin model {key}: {self.description}.") setattr(other, key, self_item) for field in fields(self): _apply(field.name)
[docs] class OpenvinoLauncher(LauncherWithModelInterpreter): cli_plugin = _OpenvinoImporter def __init__( self, description: Optional[str] = None, weights: Optional[str] = None, interpreter: Optional[str] = None, model_dir: Optional[str] = None, model_name: Optional[str] = None, output_layers: List[str] = [], device: Optional[str] = None, compile_model_config: Optional[Dict] = None, ): model_info = OpenvinoModelInfo( interpreter=interpreter, description=description, weights=weights, model_dir=model_dir, ) if model_name: builtin_model_info = BuiltinOpenvinoModelInfo.create_from_model_name(model_name) builtin_model_info.override(model_info) model_info.validate() super().__init__(model_interpreter_path=model_info.interpreter) self.model_info = model_info self._device = device or "CPU" self._compile_model_config = compile_model_config self._core = Core() self._network = self._core.read_model(model_info.description, model_info.weights) if output_layers: log.info(f"Add additional output layers {output_layers} to the model outputs.") self._network.add_outputs(output_layers) self._check_model_support(self._network, self._device) self._load_executable_net() @property def inputs(self): return self._network.inputs @property def outputs(self): return self._network.outputs def _check_model_support(self, net, device): not_supported_layers = set( name for name, dev in self._core.query_model(net, device).items() if not dev ) if len(not_supported_layers) != 0: log.error( "The following layers are not supported " "by the plugin for device '%s': %s." % (device, ", ".join(not_supported_layers)) ) raise NotImplementedError("Some layers are not supported on the device") def _load_executable_net(self, batch_size: int = 1): network = self._network iter_inputs = iter(network.inputs) self._input_blob = next(iter_inputs) is_dynamic_layout = False try: self._input_layout = self._input_blob.shape except ValueError: # In case of that the input has dynamic shape self._input_layout = self._input_blob.partial_shape is_dynamic_layout = True if is_dynamic_layout: self._input_layout[0] = batch_size network.reshape({self._input_blob: self._input_layout}) else: model_batch_size = self._input_layout[0] if batch_size != model_batch_size: log.warning( "Input layout of the model is static, so that we cannot change " f"the model batch size ({model_batch_size}) to batch size ({batch_size})! " "Set the batch size to {model_batch_size}." ) batch_size = model_batch_size self._batch_size = batch_size self._net = self._core.compile_model( model=network, device_name=self._device, config=self._compile_model_config, ) self._request = self._net.create_infer_request()
[docs] def infer(self, inputs: LauncherInputType) -> List[ModelPred]: batch_size = len(inputs) if self._batch_size < batch_size: self._load_executable_net(batch_size) inputs = ( {self._input_blob.get_any_name(): inputs} if isinstance(inputs, np.ndarray) else inputs ) results = self._request.infer(inputs=inputs) outputs_group_by_item = [ {key.any_name: output for key, output in zip(results.keys(), outputs)} for outputs in zip(*results.values()) ] return outputs_group_by_item