Source code for otx.algorithms.segmentation.adapters.openvino.task

"""Openvino Task of Segmentation."""

# Copyright (C) 2021 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

import io
import json
import os
import tempfile
import time
from typing import Any, Dict, List, Optional, Tuple, Union
from zipfile import ZipFile

import attr
import nncf
import numpy as np
import openvino.runtime as ov
from addict import Dict as ADDict
from nncf.common.quantization.structs import QuantizationPreset
from openvino.model_api.adapters import OpenvinoAdapter, create_core
from openvino.model_api.models import Model
from openvino.model_api.models.utils import ImageResultWithSoftPrediction

from otx.algorithms.common.utils import OTXOpenVinoDataLoader, get_default_async_reqs_num, read_py_config
from otx.algorithms.common.utils.ir import check_if_quantized
from otx.algorithms.segmentation.adapters.openvino import model_wrappers
from otx.algorithms.segmentation.configs.base import SegmentationConfig
from otx.algorithms.segmentation.utils import get_activation_map
from otx.api.entities.annotation import AnnotationSceneEntity
from otx.api.entities.datasets import DatasetEntity
from otx.api.entities.inference_parameters import (
    InferenceParameters,
    default_progress_callback,
)
from otx.api.entities.label_schema import LabelSchemaEntity
from otx.api.entities.model import (
    ModelEntity,
    ModelFormat,
    ModelOptimizationType,
    ModelPrecision,
    OptimizationMethod,
)
from otx.api.entities.optimization_parameters import OptimizationParameters
from otx.api.entities.result_media import ResultMediaEntity
from otx.api.entities.resultset import ResultSetEntity
from otx.api.entities.subset import Subset
from otx.api.entities.task_environment import TaskEnvironment
from otx.api.entities.tensor import TensorEntity
from otx.api.serialization.label_mapper import LabelSchemaMapper, label_schema_to_bytes
from otx.api.usecases.evaluation.metrics_helper import MetricsHelper
from otx.api.usecases.exportable_code import demo
from otx.api.usecases.exportable_code.inference import IInferencer
from otx.api.usecases.exportable_code.prediction_to_annotation_converter import (
    SegmentationToAnnotationConverter,
)
from otx.api.usecases.tasks.interfaces.deployment_interface import IDeploymentTask
from otx.api.usecases.tasks.interfaces.evaluate_interface import IEvaluationTask
from otx.api.usecases.tasks.interfaces.inference_interface import IInferenceTask
from otx.api.usecases.tasks.interfaces.optimization_interface import (
    IOptimizationTask,
    OptimizationType,
)
from otx.utils.logger import get_logger

logger = get_logger()


# pylint: disable=too-many-locals, too-many-statements, unused-argument
[docs] class OpenVINOSegmentationInferencer(IInferencer): """Inferencer implementation for Segmentation using OpenVINO backend.""" def __init__( self, hparams: SegmentationConfig, label_schema: LabelSchemaEntity, model_file: Union[str, bytes], weight_file: Union[str, bytes, None] = None, device: str = "CPU", num_requests: int = 1, ): """Inferencer implementation for Segmentation using OpenVINO backend. :param hparams: Hyper parameters that the model should use. :param label_schema: LabelSchemaEntity that was used during model training. :param model_file: Path to model to load, `.xml`, `.bin` or `.onnx` file. :param device: Device to run inference on, such as CPU, GPU or MYRIAD. Defaults to "CPU". :param num_requests: Maximum number of requests that the inferencer can make. Good value is the number of available cores. Defaults to 1. """ model_adapter = OpenvinoAdapter( create_core(), model_file, weight_file, device=device, max_num_requests=num_requests, plugin_config={"PERFORMANCE_HINT": "THROUGHPUT"}, ) self.configuration = { **attr.asdict( hparams.postprocessing, filter=lambda attr, value: attr.name not in ["header", "description", "type", "visible_in_ui", "class_name"], ) } self.model = Model.create_model( model_adapter, "Segmentation", self.configuration, preload=True, ) self.converter = SegmentationToAnnotationConverter(label_schema) self.callback_exceptions: List[Exception] = [] self.model.inference_adapter.set_callback(self._async_callback)
[docs] def predict(self, image: np.ndarray) -> Tuple[ImageResultWithSoftPrediction, AnnotationSceneEntity]: """Perform a prediction for a given input image.""" result = self.model(image) return result, self.converter.convert_to_annotation(result)
[docs] def enqueue_prediction(self, image: np.ndarray, id: int, result_handler: Any) -> None: """Runs async inference.""" if not self.model.is_ready(): self.model.await_any() image, metadata = self.model.preprocess(image) callback_data = id, metadata, result_handler self.model.inference_adapter.infer_async(image, callback_data)
[docs] def await_all(self) -> None: """Await all running infer requests if any.""" self.model.await_all()
def _async_callback(self, request: Any, callback_args: tuple) -> None: """Fetches the results of async inference.""" try: id, preprocessing_meta, result_handler = callback_args raw_prediction = self.model.inference_adapter.copy_raw_result(request) processed_prediciton = self.model.postprocess(raw_prediction, preprocessing_meta) annotation = self.converter.convert_to_annotation(processed_prediciton, preprocessing_meta) result_handler(id, annotation, processed_prediciton.feature_vector, processed_prediciton.saliency_map) except Exception as e: self.callback_exceptions.append(e)
[docs] class OpenVINOSegmentationTask(IDeploymentTask, IInferenceTask, IEvaluationTask, IOptimizationTask): """Task implementation for Segmentation using OpenVINO backend.""" def __init__(self, task_environment: TaskEnvironment): self.task_environment = task_environment self.model = self.task_environment.model self.model_name = self.task_environment.model_template.model_template_id self.inferencer = self.load_inferencer() self._avg_time_per_image: Optional[float] = None labels = task_environment.get_labels(include_empty=False) self._label_dictionary = dict(enumerate(labels, 1)) template_file_path = self.task_environment.model_template.model_template_path self._base_dir = os.path.abspath(os.path.dirname(template_file_path)) @property def hparams(self): """Hparams of OpenVINO Segmentation Task.""" return self.task_environment.get_hyper_parameters(SegmentationConfig) @property def avg_time_per_image(self) -> Optional[float]: """Average inference time per image.""" return self._avg_time_per_image
[docs] def load_inferencer(self) -> OpenVINOSegmentationInferencer: """load_inferencer function of OpenVINO Segmentation Task.""" if self.model is None: raise RuntimeError("load_inferencer failed, model is None") return OpenVINOSegmentationInferencer( self.hparams, self.task_environment.label_schema, self.model.get_data("openvino.xml"), self.model.get_data("openvino.bin"), num_requests=get_default_async_reqs_num(), )
[docs] def infer( self, dataset: DatasetEntity, inference_parameters: Optional[InferenceParameters] = None, ) -> DatasetEntity: """Infer function of OpenVINOSegmentationTask.""" if inference_parameters is not None: update_progress_callback = inference_parameters.update_progress dump_soft_prediction = not inference_parameters.is_evaluation process_soft_prediction = inference_parameters.process_saliency_maps enable_async_inference = inference_parameters.enable_async_inference else: update_progress_callback = default_progress_callback dump_soft_prediction = True process_soft_prediction = False enable_async_inference = True def add_prediction( id: int, predicted_scene: AnnotationSceneEntity, feature_vector: Union[np.ndarray, None], soft_prediction: Union[np.ndarray, None], ): dataset_item = dataset[id] dataset_item.append_annotations(predicted_scene.annotations) if feature_vector is not None: feature_vector_media = TensorEntity(name="representation_vector", numpy=feature_vector.reshape(-1)) dataset_item.append_metadata_item(feature_vector_media, model=self.model) if dump_soft_prediction and soft_prediction is not None and soft_prediction.ndim > 1: for label_index, label in self._label_dictionary.items(): current_label_soft_prediction = soft_prediction[:, :, label_index] if process_soft_prediction: current_label_soft_prediction = get_activation_map( current_label_soft_prediction, normalize=False ) result_media = ResultMediaEntity( name=label.name, type="soft_prediction", label=label, annotation_scene=dataset_item.annotation_scene, roi=dataset_item.roi, numpy=current_label_soft_prediction, ) dataset_item.append_metadata_item(result_media, model=self.model) total_time = 0.0 dataset_size = len(dataset) for i, dataset_item in enumerate(dataset, 1): start_time = time.perf_counter() if enable_async_inference: self.inferencer.enqueue_prediction(dataset_item.numpy, i - 1, add_prediction) else: result, predicted_scene = self.inferencer.predict(dataset_item.numpy) add_prediction(i - 1, predicted_scene, result.feature_vector, result.saliency_map) end_time = time.perf_counter() - start_time total_time += end_time update_progress_callback(int(i / dataset_size * 100), None) self.inferencer.await_all() self._avg_time_per_image = total_time / len(dataset) logger.info(f"Avg time per image: {self._avg_time_per_image} secs") logger.info(f"Total time: {total_time} secs") logger.info("Segmentation OpenVINO inference completed") return dataset
[docs] def evaluate(self, output_resultset: ResultSetEntity, evaluation_metric: Optional[str] = None): """Evaluate function of OpenVINOSegmentationTask.""" logger.info("Computing mDice") metrics = MetricsHelper.compute_dice_averaged_over_pixels(output_resultset) logger.info(f"mDice after evaluation: {metrics.overall_dice.value}") output_resultset.performance = metrics.get_performance()
[docs] def deploy(self, output_model: ModelEntity) -> None: """Deploy function of OpenVINOSegmentationTask.""" logger.info("Deploying the model") if self.model is None: raise RuntimeError("deploy failed, model is None") work_dir = os.path.dirname(demo.__file__) parameters: Dict[str, Any] = {} parameters["type_of_model"] = "Segmentation" parameters["converter_type"] = "SEGMENTATION" parameters["model_parameters"] = self.inferencer.configuration parameters["model_parameters"]["labels"] = LabelSchemaMapper.forward(self.task_environment.label_schema) zip_buffer = io.BytesIO() with ZipFile(zip_buffer, "w") as arch: # model files arch.writestr(os.path.join("model", "model.xml"), self.model.get_data("openvino.xml")) arch.writestr(os.path.join("model", "model.bin"), self.model.get_data("openvino.bin")) arch.writestr( os.path.join("model", "config.json"), json.dumps(parameters, ensure_ascii=False, indent=4), ) # model_wrappers files for root, _, files in os.walk(os.path.dirname(model_wrappers.__file__)): if "__pycache__" in root: continue for file in files: file_path = os.path.join(root, file) arch.write( file_path, os.path.join( "python", "model_wrappers", file_path.split("model_wrappers/")[1], ), ) # other python files arch.write(os.path.join(work_dir, "requirements.txt"), os.path.join("python", "requirements.txt")) arch.write(os.path.join(work_dir, "LICENSE"), os.path.join("python", "LICENSE")) arch.write(os.path.join(work_dir, "demo.py"), os.path.join("python", "demo.py")) arch.write(os.path.join(work_dir, "README.md"), os.path.join(".", "README.md")) output_model.exportable_code = zip_buffer.getvalue() logger.info("Deploying completed")
[docs] def optimize( self, optimization_type: OptimizationType, dataset: DatasetEntity, output_model: ModelEntity, optimization_parameters: Optional[OptimizationParameters] = None, ): """Optimize function of OpenVINOSegmentationTask.""" logger.info("Start PTQ optimization") if self.model is None: raise RuntimeError("PTQ optimize failed, model is None") if optimization_type is not OptimizationType.POT: raise ValueError("PTQ is the only supported optimization type for OpenVino models") dataset = dataset.get_combined_subset([Subset.TRAINING, Subset.UNLABELED]) data_loader = OTXOpenVinoDataLoader(dataset, self.inferencer) quantization_dataset = nncf.Dataset(data_loader, lambda data: data[0]) with tempfile.TemporaryDirectory() as tempdir: xml_path = os.path.join(tempdir, "model.xml") bin_path = os.path.join(tempdir, "model.bin") with open(xml_path, "wb") as f: f.write(self.model.get_data("openvino.xml")) with open(bin_path, "wb") as f: f.write(self.model.get_data("openvino.bin")) ov_model = ov.Core().read_model(xml_path) if check_if_quantized(ov_model): raise RuntimeError("Model is already optimized by PTQ") if optimization_parameters is not None: optimization_parameters.update_progress(10, None) optimization_config_path = os.path.join(self._base_dir, "ptq_optimization_config.py") ptq_config = ADDict() if os.path.exists(optimization_config_path): ptq_config = read_py_config(optimization_config_path) ptq_config.update( subset_size=min(self.hparams.pot_parameters.stat_subset_size, len(data_loader)), preset=QuantizationPreset(self.hparams.pot_parameters.preset.name.lower()), ) compressed_model = nncf.quantize( ov_model, quantization_dataset, **ptq_config, ) if optimization_parameters is not None: optimization_parameters.update_progress(90, None) with tempfile.TemporaryDirectory() as tempdir: xml_path = os.path.join(tempdir, "model.xml") ov.save_model(compressed_model, xml_path) with open(xml_path, "rb") as f: output_model.set_data("openvino.xml", f.read()) with open(os.path.join(tempdir, "model.bin"), "rb") as f: output_model.set_data("openvino.bin", f.read()) output_model.set_data( "label_schema.json", label_schema_to_bytes(self.task_environment.label_schema), ) # set model attributes for quantized model output_model.model_format = ModelFormat.OPENVINO output_model.optimization_type = ModelOptimizationType.POT output_model.optimization_methods = [OptimizationMethod.QUANTIZATION] output_model.precision = [ModelPrecision.INT8] self.model = output_model self.inferencer = self.load_inferencer() if optimization_parameters is not None: optimization_parameters.update_progress(100, None) logger.info("PTQ optimization completed")