"""OpenVINO Visual Prompting Task."""
# Copyright (C) 2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
import io
import json
import os
import pickle # nosec B403
import random
import tempfile
import time
from collections import defaultdict
from copy import deepcopy
from itertools import product
from pathlib import Path
from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Type, Union
from zipfile import ZipFile
import attr
import cv2
import nncf
import numpy as np
import openvino.runtime as ov
from addict import Dict as ADDict
from nncf.common.quantization.structs import QuantizationPreset
from openvino.model_api.adapters import OpenvinoAdapter, create_core
from openvino.model_api.models import Model
from otx.algorithms.common.utils import get_default_async_reqs_num, read_py_config
from otx.algorithms.common.utils.ir import check_if_quantized
from otx.algorithms.visual_prompting.adapters.openvino import model_wrappers
from otx.algorithms.visual_prompting.adapters.pytorch_lightning.datasets.dataset import (
OTXVisualPromptingDataset,
get_transform,
)
from otx.algorithms.visual_prompting.configs.base import VisualPromptingBaseConfig
from otx.api.entities.annotation import Annotation
from otx.api.entities.dataset_item import DatasetItemEntity
from otx.api.entities.datasets import DatasetEntity
from otx.api.entities.inference_parameters import (
InferenceParameters,
default_progress_callback,
)
from otx.api.entities.label_schema import LabelSchemaEntity
from otx.api.entities.model import (
ModelEntity,
ModelFormat,
ModelOptimizationType,
ModelPrecision,
OptimizationMethod,
)
from otx.api.entities.model_template import TaskType
from otx.api.entities.optimization_parameters import OptimizationParameters
from otx.api.entities.resultset import ResultSetEntity
from otx.api.entities.subset import Subset
from otx.api.entities.task_environment import TaskEnvironment
from otx.api.serialization.label_mapper import LabelSchemaMapper, label_schema_to_bytes
from otx.api.usecases.evaluation.metrics_helper import MetricsHelper
from otx.api.usecases.exportable_code import demo
from otx.api.usecases.exportable_code.inference.inference import IInferencer
from otx.api.usecases.exportable_code.prediction_to_annotation_converter import (
VisualPromptingToAnnotationConverter,
)
from otx.api.usecases.tasks.interfaces.deployment_interface import IDeploymentTask
from otx.api.usecases.tasks.interfaces.evaluate_interface import IEvaluationTask
from otx.api.usecases.tasks.interfaces.inference_interface import IInferenceTask
from otx.api.usecases.tasks.interfaces.optimization_interface import (
IOptimizationTask,
OptimizationType,
)
from otx.utils.logger import get_logger
logger = get_logger()
[docs]
class OpenVINOVisualPromptingInferencer(IInferencer):
"""Inferencer implementation for Visual Prompting using OpenVINO backend.
This inferencer has two models, image encoder and decoder.
Args:
hparams (VisualPromptingBaseConfig): Hyper parameters that the model should use.
label_schema (LabelSchemaEntity): LabelSchemaEntity that was used during model training.
model_files (Dict[str, Union[str, Path, bytes]]): Path or bytes to model to load,
`.xml`, `.bin` or `.onnx` file.
weight_files (Dict[str, Union[str, Path, bytes, None]], optional): Path or bytes to weights to load,
`.xml`, `.bin` or `.onnx` file. Defaults to None.
device (str): Device to run inference on, such as CPU, GPU or MYRIAD. Defaults to "CPU".
num_requests (int) : Maximum number of requests that the inferencer can make.
Good value is the number of available cores. Defaults to 1.
"""
def __init__(
self,
hparams: VisualPromptingBaseConfig,
label_schema: LabelSchemaEntity,
model_files: Dict[str, Union[str, Path, bytes]],
weight_files: Optional[Dict[str, Union[str, Path, bytes, None]]] = {},
device: str = "CPU",
num_requests: int = 1,
):
assert all(module in model_files for module in ["image_encoder", "decoder"])
self.model = {}
model_parameters = {"decoder": {"input_layouts": "image_embeddings:NCHW"}}
self.configuration = {
"image_encoder": {
**attr.asdict(
hparams.postprocessing,
filter=lambda attr, value: attr.name in ["image_size", "resize_type", "downsizing"],
)
},
"decoder": {
**attr.asdict(
hparams.postprocessing,
filter=lambda attr, value: attr.name
not in [
"header",
"description",
"type",
"visible_in_ui",
"class_name",
"downsizing",
],
)
},
}
for name in ["image_encoder", "decoder"]:
model_adapter = OpenvinoAdapter(
core=create_core(),
model=model_files.get(name),
weights_path=weight_files.get(name, None),
model_parameters=model_parameters.get(name, {}),
device=device,
max_num_requests=num_requests,
plugin_config={"PERFORMANCE_HINT": "THROUGHPUT"},
)
self.model[name] = Model.create_model(model_adapter, name, self.configuration.get(name, {}), preload=True)
self.converter = VisualPromptingToAnnotationConverter()
self.labels = label_schema.get_labels(include_empty=False)
self.transform = get_transform() # TODO (sungchul): insert args
[docs]
def pre_process(
self,
dataset_item: DatasetItemEntity,
extra_processing: bool = False,
use_bbox: bool = False,
use_point: bool = False,
) -> Tuple[Dict[str, Any], Dict[str, Any], List[Dict[str, Any]]]:
"""Pre-process function of OpenVINO Visual Prompting Inferencer for image encoder."""
if use_bbox and use_point:
logger.warning("If both use_bbox and use_point are set, bboxes and points will be generated randomly.")
prob = 1.0 if not use_point else 0.0 if not use_bbox and use_point else 0.5
images, meta = self.model["image_encoder"].preprocess(dataset_item.numpy, extra_processing)
prompts = OTXVisualPromptingDataset.get_prompts(dataset_item, self.labels, prob=prob)
prompts = self.model["decoder"].preprocess(prompts, meta)
return images, meta, prompts # type: ignore
[docs]
def post_process(
self, prediction: Dict[str, np.ndarray], metadata: Dict[str, Any]
) -> Tuple[List[Annotation], Any, Any]:
"""Post-process function of OpenVINO Visual Prompting Inferencer."""
hard_prediction, soft_prediction = self.model["decoder"].postprocess(prediction, metadata)
annotation = self.converter.convert_to_annotation(hard_prediction, metadata)
return annotation, hard_prediction, soft_prediction
[docs]
def predict(self, dataset_item: DatasetItemEntity) -> List[Annotation]: # type: ignore
"""Perform a prediction for a given input image."""
# forward image encoder
images, meta, prompts = self.pre_process(dataset_item)
image_embeddings = self.forward_image_encoder(images)
annotations: List[Annotation] = []
hard_predictions: List[np.ndarray] = []
soft_predictions: List[np.ndarray] = []
for prompt in prompts:
label = prompt.pop("label")
prompt.update(image_embeddings)
# forward decoder to get predicted mask
prediction = self.forward_decoder(prompt)
prediction["scores"] = prediction["iou_predictions"]
metadata = {"label": label}
# set annotation for eval
annotation, hard_prediction, soft_prediction = self.post_process(prediction, metadata)
annotations.extend(annotation)
hard_predictions.append(hard_prediction)
soft_predictions.append(soft_prediction)
return annotations
[docs]
def forward_image_encoder(self, inputs: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"""Forward function of OpenVINO Visual Prompting Inferencer."""
return self.model["image_encoder"].infer_sync(inputs)
[docs]
def forward_decoder(self, inputs: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"""Forward function of OpenVINO Visual Prompting Inferencer."""
return self.model["decoder"].infer_sync(inputs)
[docs]
def await_all(self) -> None:
"""Await all running infer requests if any."""
self.model["image_encoder"].await_all()
self.model["decoder"].await_all()
[docs]
class OpenVINOZeroShotVisualPromptingInferencer(OpenVINOVisualPromptingInferencer):
"""Inferencer implementation for Zero-shot Visual Prompting using OpenVINO backend.
This inferencer has two models, image encoder and decoder.
Args:
hparams (VisualPromptingBaseConfig): Hyper parameters that the model should use.
label_schema (LabelSchemaEntity): LabelSchemaEntity that was used during model training.
model_files (Dict[str, Union[str, Path, bytes]]): Path or bytes to model to load,
`.xml`, `.bin` or `.onnx` file.
weight_files (Dict[str, Union[str, Path, bytes, None]], optional): Path or bytes to weights to load,
`.xml`, `.bin` or `.onnx` file. Defaults to None.
device (str): Device to run inference on, such as CPU, GPU or MYRIAD. Defaults to "CPU".
num_requests (int) : Maximum number of requests that the inferencer can make.
Good value is the number of available cores. Defaults to 1.
"""
def __init__(
self,
hparams: VisualPromptingBaseConfig,
label_schema: LabelSchemaEntity,
model_files: Dict[str, Union[str, Path, bytes]],
weight_files: Optional[Dict[str, Union[str, Path, bytes, None]]] = {},
device: str = "CPU",
num_requests: int = 1,
):
super().__init__(hparams, label_schema, model_files, weight_files, device, num_requests)
self.point_labels_box = np.array([[2, 3]], dtype=np.float32)
self.has_mask_inputs = [np.array([[0.0]]), np.array([[1.0]])]
self.reference_feats: Optional[np.ndarray] = None
self.used_indices: Optional[np.ndarray] = None
[docs]
def pre_process_image_encoder(
self, inputs: np.ndarray, extra_processing: bool = False
) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
"""Pre-process function of OpenVINO Zero-shot Visual Prompting Inferencer for image encoder."""
return self.model["image_encoder"].preprocess(inputs, extra_processing)
[docs]
def learn(
self,
dataset_item: DatasetItemEntity,
reset_feat: bool = False,
use_bbox: bool = False,
use_point: bool = False,
path_reference_info: str = "vpm_zsl_reference_infos/{}/reference_info.pickle",
default_threshold_reference: float = 0.3,
) -> Tuple[Dict[str, np.ndarray], np.ndarray]:
"""Learn for reference features."""
ref_masks: np.ndarray
if reset_feat or self.reference_feats is None:
self.initialize_reference_info()
images, meta, prompts = self.pre_process(dataset_item, use_bbox, use_point)
largest_label: int = max([int(p["label"].id) for p in prompts])
self.expand_reference_info(largest_label)
image_embeddings = self.forward_image_encoder(images)
processed_embedding = image_embeddings["image_embeddings"].squeeze().transpose(1, 2, 0)
original_size = meta["original_shape"][:2]
ref_masks = np.zeros((largest_label + 1, *map(int, original_size)), dtype=np.uint8)
for prompt in prompts:
if "point_coords" in prompt:
# bboxes and points
label = prompt.pop("label")
original_size = prompt.get("orig_size")
prompt.update(image_embeddings)
prediction = self.forward_decoder(prompt, original_size, is_cascade=False)
ref_mask = prediction["upscaled_masks"]
else:
logger.warning("annotation and polygon will be supported.")
continue
ref_masks[int(label.id)] += ref_mask
ref_masks = np.clip(ref_masks, 0, 1)
for label in range(largest_label + 1):
ref_mask = ref_masks[label]
if ref_mask.sum() == 0:
# empty prediction
continue
ref_feat = None
cur_default_threshold_reference = deepcopy(default_threshold_reference)
while ref_feat is None:
logger.info(f"[*] default_threshold_reference : {cur_default_threshold_reference:.4f}")
ref_feat = self._generate_masked_features(
processed_embedding, ref_masks[label], cur_default_threshold_reference
)
cur_default_threshold_reference -= 0.05
self.reference_feats[label] = ref_feat
self.used_indices = np.concatenate((self.used_indices, np.array([label])))
reference_info = {"reference_feats": self.reference_feats, "used_indices": self.used_indices}
path_reference_info = path_reference_info.format(time.strftime("%Y%m%d-%H%M%S"))
logger.info(f"Saved reference info at {path_reference_info}.")
pickle.dump(reference_info, open(path_reference_info, "wb"))
return reference_info, ref_masks
[docs]
def infer(
self,
images: np.ndarray,
reference_feats: np.ndarray,
used_indices: np.ndarray,
is_cascade: bool = False,
threshold: float = 0.0,
num_bg_points: int = 1,
default_threshold_target: float = 0.65,
) -> Tuple[List[Any], DefaultDict[Any, Any], DefaultDict[Any, Any]]:
"""Perform a prediction for a given input image."""
points_score: np.ndarray
# forward image encoder
images, meta = self.pre_process_image_encoder(images)
original_shape = np.asarray(meta["original_shape"][:2], dtype=np.int64)
image_embeddings = self.forward_image_encoder(images)
# get point candidates
total_points_scores, total_bg_coords = self._get_prompt_candidates(
image_embeddings=image_embeddings["image_embeddings"],
reference_feats=reference_feats,
used_indices=used_indices,
original_shape=original_shape,
threshold=threshold,
num_bg_points=num_bg_points,
default_threshold_target=default_threshold_target,
image_size=self.model["image_encoder"].image_size,
downsizing=self.model["image_encoder"].downsizing,
)
annotations: DefaultDict = defaultdict(list)
predicted_masks: DefaultDict = defaultdict(list)
used_points: DefaultDict = defaultdict(list)
for label in total_points_scores.keys():
points_scores = total_points_scores[label]
bg_coords = total_bg_coords[label]
for points_score in points_scores:
if points_score[-1] in [-1.0, 0.0]:
continue
x, y = points_score[:2]
is_done = False
for pm in predicted_masks.get(label, []):
# check if that point is already assigned
if pm[int(y), int(x)] > 0:
is_done = True
break
if is_done:
continue
point_coords = np.concatenate((np.array([[x, y]]), bg_coords), axis=0, dtype=np.float32)
point_coords = self.model["decoder"]._apply_coords(point_coords, original_shape)
point_labels = np.array([1] + [0] * len(bg_coords), dtype=np.float32)
inputs_decoder = {
"point_coords": point_coords[None],
"point_labels": point_labels[None],
"orig_size": original_shape[None],
}
inputs_decoder.update(image_embeddings)
prediction = self.forward_decoder(inputs_decoder, original_shape, is_cascade)
prediction.update({"scores": points_score[-1]})
predicted_masks[label].append(prediction[self.model["decoder"].output_blob_name])
used_points[label].append(points_score)
self._inspect_overlapping_areas(predicted_masks, used_points)
for label, predictions in predicted_masks.items():
if len(predictions) == 0:
continue
metadata = {
"label": [_label for _label in self.labels if int(_label.id_) == label][0],
"original_size": original_shape,
}
for prediction, used_point in zip(predictions, used_points[label]):
annotation, _, _ = self.post_process(
{self.model["decoder"].output_blob_name: prediction, "scores": used_point[-1]}, metadata
)
annotations[label].extend(annotation)
return sum(annotations.values(), []), predicted_masks, used_points
[docs]
def forward_decoder( # type: ignore
self,
inputs: Dict[str, np.ndarray],
original_size: np.ndarray,
is_cascade: bool = True,
) -> Dict[str, np.ndarray]:
"""Forward function of OpenVINO Visual Prompting Inferencer."""
masks: np.ndarray
logits: np.ndarray
scores: np.ndarray
num_iter = 3 if is_cascade else 1
for i in range(num_iter):
if i == 0:
# First-step prediction
mask_input = np.zeros(
(1, 1, *map(lambda x: x * 4, inputs["image_embeddings"].shape[2:])), dtype=np.float32
)
has_mask_input = self.has_mask_inputs[0]
elif i == 1:
# Cascaded Post-refinement-1
mask_input, masks = self._postprocess_masks(masks, logits, scores, is_single=True) # noqa: F821
if masks.sum() == 0:
return {"upscaled_masks": masks}
has_mask_input = self.has_mask_inputs[1]
elif i == 2:
# Cascaded Post-refinement-2
mask_input, masks = self._postprocess_masks(masks, logits, scores) # noqa: F821
if masks.sum() == 0:
return {"upscaled_masks": masks}
has_mask_input = self.has_mask_inputs[1]
y, x = np.nonzero(masks)
box_coords = self.model["decoder"]._apply_coords(
np.array([[[x.min(), y.min()], [x.max(), y.max()]]], dtype=np.float32), original_size
)
inputs.update(
{
"point_coords": np.concatenate((inputs["point_coords"], box_coords), axis=1),
"point_labels": np.concatenate((inputs["point_labels"], self.point_labels_box), axis=1),
}
)
inputs.update({"mask_input": mask_input, "has_mask_input": has_mask_input})
prediction = self.model["decoder"].infer_sync(inputs)
upscaled_masks, scores, logits = (
prediction["upscaled_masks"],
prediction["iou_predictions"],
prediction["low_res_masks"],
)
masks = upscaled_masks > self.model["decoder"].mask_threshold
_, masks = self._postprocess_masks(masks, logits, scores)
return {"upscaled_masks": masks}
def _get_prompt_candidates(
self,
image_embeddings: np.ndarray,
reference_feats: np.ndarray,
used_indices: np.ndarray,
original_shape: np.ndarray,
threshold: float = 0.0,
num_bg_points: int = 1,
default_threshold_target: float = 0.65,
image_size: int = 1024,
downsizing: int = 64,
) -> Tuple[Dict[int, np.ndarray], Dict[int, np.ndarray]]:
"""Get prompt candidates."""
target_feat = image_embeddings.squeeze()
c_feat, h_feat, w_feat = target_feat.shape
target_feat = target_feat / np.linalg.norm(target_feat, axis=0, keepdims=True)
target_feat = target_feat.reshape(c_feat, h_feat * w_feat)
total_points_scores: Dict[int, np.ndarray] = {}
total_bg_coords: Dict[int, np.ndarray] = {}
for label in used_indices:
sim = reference_feats[label] @ target_feat
sim = sim.reshape(h_feat, w_feat)
sim = self._resize_to_original_shape(sim, image_size, original_shape)
threshold = (threshold == 0) * default_threshold_target + threshold
points_scores, bg_coords = self._point_selection(
mask_sim=sim,
original_shape=original_shape,
threshold=threshold,
num_bg_points=num_bg_points,
image_size=image_size,
downsizing=downsizing,
)
if points_scores is not None:
total_points_scores[label] = points_scores
total_bg_coords[label] = bg_coords
return total_points_scores, total_bg_coords
def _point_selection(
self,
mask_sim: np.ndarray,
original_shape: np.ndarray,
threshold: float = 0.0,
num_bg_points: int = 1,
image_size: int = 1024,
downsizing: int = 64,
) -> Tuple[np.ndarray, np.ndarray]:
"""Select point used as point prompts."""
_, w_sim = mask_sim.shape
# Top-first point selection
point_coords = np.where(mask_sim > threshold)
fg_coords_scores = np.stack(point_coords[::-1] + (mask_sim[point_coords],), axis=0).T
## skip if there is no point coords
if len(fg_coords_scores) == 0:
return None, None
ratio = image_size / original_shape.max()
width = (original_shape[1] * ratio).astype(np.int64)
n_w = width // downsizing
## get grid numbers
idx_grid = fg_coords_scores[:, 1] * ratio // downsizing * n_w + fg_coords_scores[:, 0] * ratio // downsizing
idx_grid_unique = np.unique(idx_grid.astype(np.int64))
## get matched indices
matched_matrix = np.expand_dims(idx_grid, axis=-1) == idx_grid_unique # (totalN, uniqueN)
## sample fg_coords_scores matched by matched_matrix
matched_grid = np.expand_dims(fg_coords_scores, axis=1) * np.expand_dims(matched_matrix, axis=-1)
## sample the highest score one of the samples that are in the same grid
matched_indices = self._topk_numpy(matched_grid[..., -1], k=1, axis=0, largest=True)[1][0].astype(np.int64)
points_scores = matched_grid[matched_indices].diagonal().T
## sort by the highest score
sorted_points_scores_indices = np.flip(np.argsort(points_scores[:, -1]), axis=-1).astype(np.int64)
points_scores = points_scores[sorted_points_scores_indices]
# Top-last point selection
bg_indices = self._topk_numpy(mask_sim.flatten(), num_bg_points, largest=False)[1]
bg_x = np.expand_dims(bg_indices // w_sim, axis=0)
bg_y = bg_indices - bg_x * w_sim
bg_coords = np.concatenate((bg_y, bg_x), axis=0).transpose(1, 0)
bg_coords = bg_coords.astype(np.float32)
return points_scores, bg_coords
def _postprocess_masks(
self, masks: np.ndarray, logits: np.ndarray, scores: np.ndarray, is_single: bool = False
) -> Tuple[np.ndarray, ...]:
"""Post-process logits for resized masks according to best index based on scores."""
if is_single:
best_idx = 0
else:
# skip the first index components
scores, masks, logits = map(lambda x: x[:, 1:], (scores, masks, logits))
# filter zero masks
while len(scores[0]) > 0 and masks[0, (best_idx := np.argmax(scores[0]))].sum() == 0:
scores, masks, logits = map(
lambda x: np.concatenate((x[:, :best_idx], x[:, best_idx + 1 :]), axis=1), (scores, masks, logits)
)
if len(scores[0]) == 0:
# all predicted masks were zero masks, ignore them.
return None, np.zeros(masks.shape[-2:])
best_idx = np.argmax(scores[0])
return logits[:, [best_idx]], masks[0, best_idx]
def _resize_to_original_shape(self, masks: np.ndarray, image_size: int, original_shape: np.ndarray) -> np.ndarray:
"""Resize feature size to original shape."""
# resize feature size to input size
masks = cv2.resize(masks, (image_size, image_size), interpolation=cv2.INTER_LINEAR)
# remove pad
prepadded_size = self._get_prepadded_size(original_shape, image_size)
masks = masks[..., : prepadded_size[0], : prepadded_size[1]]
# resize unpadded one to original shape
original_shape = original_shape.astype(np.int64)
h, w = original_shape[0], original_shape[1]
return cv2.resize(masks, (w, h), interpolation=cv2.INTER_LINEAR)
def _get_prepadded_size(self, original_shape: int, image_size: int) -> np.ndarray:
"""Get pre-padded size."""
scale = image_size / np.max(original_shape)
transformed_size = scale * original_shape
return np.floor(transformed_size + 0.5).astype(np.int64)
def _inspect_overlapping_areas(
self,
predicted_masks: Dict[int, List[np.ndarray]],
used_points: Dict[int, List[np.ndarray]],
threshold_iou: float = 0.8,
):
def _calculate_mask_iou(mask1: np.ndarray, mask2: np.ndarray):
assert mask1.ndim == 2 and mask2.ndim == 2
intersection = np.logical_and(mask1, mask2).sum().item()
union = np.logical_or(mask1, mask2).sum().item()
# Avoid division by zero
if union == 0:
return 0.0
iou = intersection / union
return iou
for (label, masks), (other_label, other_masks) in product(predicted_masks.items(), predicted_masks.items()):
if other_label <= label:
continue
overlapped_label = []
overlapped_other_label = []
for (im, mask), (jm, other_mask) in product(enumerate(masks), enumerate(other_masks)):
_mask_iou = _calculate_mask_iou(mask, other_mask)
if _mask_iou > threshold_iou:
if used_points[label][im][2] > used_points[other_label][jm][2]:
overlapped_other_label.append(jm)
else:
overlapped_label.append(im)
elif _mask_iou > 0:
# refine the slightly overlapping region
overlapped_coords = np.where(np.logical_and(mask, other_mask))
if used_points[label][im][2] > used_points[other_label][jm][2]:
other_mask[overlapped_coords] = 0.0
else:
mask[overlapped_coords] = 0.0
for im in sorted(list(set(overlapped_label)), reverse=True):
masks.pop(im)
used_points[label].pop(im)
for jm in sorted(list(set(overlapped_other_label)), reverse=True):
other_masks.pop(jm)
used_points[other_label].pop(jm)
[docs]
def predict(self, dataset_item: DatasetItemEntity) -> List[Annotation]: # type: ignore
"""Perform a prediction for a given input image."""
results = self.infer(dataset_item.numpy, self.reference_feats, self.used_indices)
return results[0]
def _find_latest_reference_info(self, root: str = "vpm_zsl_reference_infos") -> Union[str, None]:
"""Find latest reference info to be used."""
if not os.path.isdir(root):
return None
if len(stamps := sorted(os.listdir(root), reverse=True)) > 0:
return stamps[0]
return None
def _get_reference_info(
self, root: str = "vpm_zsl_reference_infos", path_reference_info: str = "{}/reference_info.pickle"
) -> Union[Tuple[np.ndarray, np.ndarray], None]:
"""Get reference info through loading previously saved one or running `learn`."""
if (latest_stamp := self._find_latest_reference_info(root)) is not None:
# load previously saved reference info
latest_reference_info = os.path.join(root, path_reference_info.format(latest_stamp))
# pickle.load() used for getting the latest reference info from the previously dumped object
reference_info = pickle.load(open(latest_reference_info, "rb")) # nosec B301
return reference_info["reference_feats"], reference_info["used_indices"]
return None, None
[docs]
def initialize_reference_info(self) -> None:
"""Initialize reference information."""
self.reference_feats = np.zeros((0, 1, 256), dtype=np.float32)
self.used_indices = np.array([], dtype=np.int64)
[docs]
def expand_reference_info(self, new_largest_label: int) -> None:
"""Expand reference info dimensions if newly given processed prompts have more lables."""
if new_largest_label > (cur_largest_label := len(self.reference_feats) - 1):
diff = new_largest_label - cur_largest_label
self.reference_feats = np.pad(self.reference_feats, ((0, diff), (0, 0), (0, 0)), constant_values=0.0)
def _generate_masked_features(
self,
feats: np.ndarray,
masks: np.ndarray,
threshold_mask: float,
) -> Tuple[np.ndarray, ...]:
"""Generate masked features.
Args:
feats (np.ndarray): Raw reference features. It will be filtered with masks.
masks (np.ndarray): Reference masks used to filter features.
threshold_mask (float): Threshold to control masked region.
Returns:
(np.ndarray): Masked features.
"""
target_shape = self.model["image_encoder"].image_size / max(masks.shape) * np.array(masks.shape)
target_shape = target_shape[::-1].astype(np.int32)
# Post-process masks
masks = cv2.resize(masks, target_shape, interpolation=cv2.INTER_LINEAR)
masks = self._pad_to_square(masks)
masks = cv2.resize(masks, feats.shape[:2][::-1], interpolation=cv2.INTER_LINEAR)
# Target feature extraction
if (masks > threshold_mask).sum() == 0:
# (for stability) there is no area to be extracted
return None
masked_feat = feats[masks > threshold_mask]
masked_feat = masked_feat.mean(0)[None]
masked_feat = masked_feat / np.linalg.norm(masked_feat, axis=-1, keepdims=True)
return masked_feat
def _pad_to_square(self, x: np.ndarray) -> np.ndarray:
"""Pad to a square input.
Args:
x (np.ndarray): Mask to be padded.
Returns:
(np.ndarray): Padded mask.
"""
h, w = x.shape[-2:]
padh = self.model["image_encoder"].image_size - h
padw = self.model["image_encoder"].image_size - w
x = np.pad(x, ((0, padh), (0, padw)), constant_values=0.0)
return x
def _topk_numpy(self, x: np.ndarray, k: int, axis: int = -1, largest: bool = True) -> np.ndarray:
"""Top-k function for numpy same with torch.topk."""
if largest:
k = -k
indices = range(k, 0)
else:
indices = range(k)
partitioned_ind = np.argpartition(x, k, axis=axis).take(indices=indices, axis=axis)
partitioned_scores = np.take_along_axis(x, partitioned_ind, axis=axis)
sorted_trunc_ind = np.flip(np.argsort(partitioned_scores, axis=axis), axis=axis)
ind = np.take_along_axis(partitioned_ind, sorted_trunc_ind, axis=axis)
scores = np.take_along_axis(partitioned_scores, sorted_trunc_ind, axis=axis)
return scores, ind
[docs]
class OTXOpenVinoDataLoader:
"""DataLoader implementation for VisualPromptingOpenVINOTask."""
def __init__(
self,
dataset: Any,
inferencer: OpenVINOVisualPromptingInferencer,
module_name: str,
shuffle: bool = True,
output_model: Optional[ModelEntity] = None,
**kwargs,
):
self.dataset = dataset
self.inferencer = inferencer
self.module_name = module_name
self.shuffler = None
if shuffle:
self.shuffler = list(range(len(dataset)))
random.shuffle(self.shuffler)
self.target_length = self.inferencer.model["image_encoder"].orig_width
if self.module_name not in ["image_encoder"]:
self.image_encoder = self._load_module("image_encoder", output_model)
def _load_module(self, module_name: str, output_model: ModelEntity, core=ov.Core()):
"""Load specific module."""
compressed_model = core.read_model(
output_model.get_data(f"visual_prompting_{module_name}.xml"),
output_model.get_data(f"visual_prompting_{module_name}.bin"),
)
return core.compile_model(
model=compressed_model, device_name=self.inferencer.model[module_name].inference_adapter.device
)
def __getitem__(self, index: int):
"""Get item from dataset."""
if self.shuffler is not None:
index = self.shuffler[index]
items = self.dataset[index]
images, _, prompts = self.inferencer.pre_process(items, extra_processing=True)
_, _, h, w = images["images"].shape
pad_width = ((0, 0), (0, 0), (0, self.target_length - h), (0, self.target_length - w))
images["images"] = np.pad(images["images"], pad_width, mode="constant", constant_values=0)
if self.module_name == "image_encoder":
return images
else:
image_embeddings = self.image_encoder(images["images"])
prompt = prompts[0] # only use the first prompt
prompt.pop("label")
prompt.update({"image_embeddings": image_embeddings["image_embeddings"]})
return prompt
# TODO (sungchul): change has_mask_input
def __len__(self):
"""Get length of dataset."""
return len(self.dataset)
[docs]
class OpenVINOVisualPromptingTask(IInferenceTask, IEvaluationTask, IOptimizationTask, IDeploymentTask):
"""Task implementation for Visual Prompting using OpenVINO backend."""
def __init__(self, task_environment: TaskEnvironment) -> None:
self.task_environment = task_environment
self.model = self.task_environment.model
self.model_name = self.task_environment.model_template.model_template_id
self.inferencer = self.load_inferencer()
self._avg_time_per_image: Optional[float] = None
labels = task_environment.get_labels(include_empty=False)
self._label_dictionary = dict(enumerate(labels, 1))
template_file_path = self.task_environment.model_template.model_template_path
self._base_dir = os.path.abspath(os.path.dirname(template_file_path))
self.task_type = TaskType.VISUAL_PROMPTING
@property
def hparams(self):
"""Hparams of OpenVINO Visual Prompting Task."""
return self.task_environment.get_hyper_parameters(VisualPromptingBaseConfig)
@property
def avg_time_per_image(self) -> Optional[float]:
"""Average inference time per image."""
return self._avg_time_per_image
[docs]
def load_inferencer(self) -> OpenVINOVisualPromptingInferencer:
"""Load OpenVINO Visual Prompting Inferencer."""
if self.model is None:
raise RuntimeError("load_inferencer failed, model is None")
return OpenVINOVisualPromptingInferencer(
self.hparams,
self.task_environment.label_schema,
{
"image_encoder": self.model.get_data("visual_prompting_image_encoder.xml"),
"decoder": self.model.get_data("visual_prompting_decoder.xml"),
},
{
"image_encoder": self.model.get_data("visual_prompting_image_encoder.bin"),
"decoder": self.model.get_data("visual_prompting_decoder.bin"),
},
num_requests=get_default_async_reqs_num(),
)
[docs]
def infer(
self,
dataset: DatasetEntity,
inference_parameters: Optional[InferenceParameters] = None,
) -> DatasetEntity:
"""Infer function of OpenVINOVisualPromptingTask.
Currently, asynchronous execution is not supported, synchronous execution will be executed instead.
"""
if inference_parameters is not None:
update_progress_callback = inference_parameters.update_progress
enable_async_inference = inference_parameters.enable_async_inference
else:
update_progress_callback = default_progress_callback
enable_async_inference = True
# FIXME (sungchul): Support async inference.
if enable_async_inference:
logger.warning("Asynchronous inference doesn't work, synchronous inference will be executed.")
enable_async_inference = False
predicted_validation_dataset = dataset.with_empty_annotations()
def add_prediction(id: int, annotations: List[Annotation]):
dataset_item = predicted_validation_dataset[id]
dataset_item.append_annotations(annotations)
total_time = 0.0
dataset_size = len(dataset)
for i, dataset_item in enumerate(dataset, 1):
start_time = time.perf_counter()
annotations = self.inferencer.predict(dataset_item)
add_prediction(i - 1, annotations)
end_time = time.perf_counter() - start_time
total_time += end_time
update_progress_callback(int(i / dataset_size * 100), None)
self.inferencer.await_all()
self._avg_time_per_image = total_time / len(dataset)
logger.info(f"Avg time per image: {self._avg_time_per_image} secs")
logger.info(f"Total time: {total_time} secs")
logger.info("Visual Prompting OpenVINO inference completed")
return predicted_validation_dataset
[docs]
def evaluate(self, output_resultset: ResultSetEntity, evaluation_metric: Optional[str] = None):
"""Evaluate function of OpenVINOVisualPromptingTask."""
logger.info("Computing mDice")
metrics = MetricsHelper.compute_dice_averaged_over_pixels(output_resultset)
logger.info(f"mDice after evaluation: {metrics.overall_dice.value}")
output_resultset.performance = metrics.get_performance()
[docs]
def deploy(self, output_model: ModelEntity) -> None:
"""Deploy function of OpenVINOVisualPromptingTask."""
logger.info("Deploying the model")
if self.model is None:
raise RuntimeError("deploy failed, model is None")
work_dir = os.path.dirname(demo.__file__)
parameters: Dict[str, Any] = {}
parameters["converter_type"] = f"{self.task_type}"
parameters["model_parameters"] = self.inferencer.configuration
parameters["model_parameters"]["labels"] = LabelSchemaMapper.forward(self.task_environment.label_schema)
zip_buffer = io.BytesIO()
with ZipFile(zip_buffer, "w") as arch:
# model files
arch.writestr(
os.path.join("model", "visual_prompting_image_encoder.xml"),
self.model.get_data("visual_prompting_image_encoder.xml"),
)
arch.writestr(
os.path.join("model", "visual_prompting_image_encoder.bin"),
self.model.get_data("visual_prompting_image_encoder.bin"),
)
arch.writestr(
os.path.join("model", "visual_prompting_decoder.xml"),
self.model.get_data("visual_prompting_decoder.xml"),
)
arch.writestr(
os.path.join("model", "visual_prompting_decoder.bin"),
self.model.get_data("visual_prompting_decoder.bin"),
)
arch.writestr(
os.path.join("model", "config.json"),
json.dumps(parameters, ensure_ascii=False, indent=4),
)
# model_wrappers files
for root, _, files in os.walk(os.path.dirname(model_wrappers.__file__)):
if "__pycache__" in root:
continue
for file in files:
file_path = os.path.join(root, file)
arch.write(
file_path,
os.path.join(
"python",
"model_wrappers",
file_path.split("model_wrappers/")[0],
),
)
# other python files
arch.write(os.path.join(work_dir, "requirements.txt"), os.path.join("python", "requirements.txt"))
arch.write(os.path.join(work_dir, "LICENSE"), os.path.join("python", "LICENSE"))
arch.write(os.path.join(work_dir, "demo.py"), os.path.join("python", "demo.py"))
arch.write(os.path.join(work_dir, "README.md"), os.path.join(".", "README.md"))
output_model.exportable_code = zip_buffer.getvalue()
logger.info("Deploying completed")
[docs]
def optimize(
self,
optimization_type: OptimizationType,
dataset: DatasetEntity,
output_model: ModelEntity,
optimization_parameters: Optional[OptimizationParameters] = None,
module_names: List[str] = ["image_encoder", "decoder"],
ov_dataloader: Type[OTXOpenVinoDataLoader] = OTXOpenVinoDataLoader,
**kwargs,
):
"""Optimize function of OpenVINOVisualPromptingTask."""
logger.info("Start PTQ optimization")
if self.model is None:
raise RuntimeError("PTQ optimize failed, model is None")
if optimization_type is not OptimizationType.POT:
raise ValueError("PTQ is the only supported optimization type for OpenVino models")
dataset = dataset.get_subset(Subset.TRAINING)
for i, module_name in enumerate(module_names, 1):
data_loader = ov_dataloader(
dataset, self.inferencer, module_name=module_name, output_model=output_model, **kwargs
)
quantization_dataset = nncf.Dataset(data_loader, lambda data: data)
with tempfile.TemporaryDirectory() as tempdir:
xml_path = os.path.join(tempdir, f"visual_prompting_{module_name}.xml")
bin_path = os.path.join(tempdir, f"visual_prompting_{module_name}.bin")
with open(xml_path, "wb") as f:
f.write(self.model.get_data(f"visual_prompting_{module_name}.xml"))
with open(bin_path, "wb") as f:
f.write(self.model.get_data(f"visual_prompting_{module_name}.bin"))
ov_model = ov.Core().read_model(xml_path, bin_path)
if check_if_quantized(ov_model):
raise RuntimeError("Model is already optimized by PTQ")
optimization_config_path = os.path.join(self._base_dir, "ptq_optimization_config.py")
ptq_config = ADDict()
if os.path.exists(optimization_config_path):
ptq_config = read_py_config(optimization_config_path)
ptq_config.update(
subset_size=min(self.hparams.pot_parameters.stat_subset_size, len(data_loader)),
preset=QuantizationPreset(self.hparams.pot_parameters.preset.name.lower()),
)
compressed_model = nncf.quantize(ov_model, quantization_dataset, **ptq_config)
if optimization_parameters is not None:
optimization_parameters.update_progress(90 // len(module_names) * i, None)
with tempfile.TemporaryDirectory() as tempdir:
xml_path = os.path.join(tempdir, f"visual_prompting_{module_name}.xml")
bin_path = os.path.join(tempdir, f"visual_prompting_{module_name}.bin")
ov.save_model(compressed_model, xml_path)
with open(xml_path, "rb") as f:
output_model.set_data(f"visual_prompting_{module_name}.xml", f.read())
with open(bin_path, "rb") as f:
output_model.set_data(f"visual_prompting_{module_name}.bin", f.read())
output_model.set_data(
"label_schema.json",
label_schema_to_bytes(self.task_environment.label_schema),
)
# set model attributes for quantized model
output_model.model_format = ModelFormat.OPENVINO
output_model.optimization_type = ModelOptimizationType.POT
output_model.optimization_methods = [OptimizationMethod.QUANTIZATION]
output_model.precision = [ModelPrecision.INT8]
self.model = output_model
self.inferencer = self.load_inferencer()
if optimization_parameters is not None:
optimization_parameters.update_progress(100, None)
logger.info("PTQ optimization completed")
[docs]
class OpenVINOZeroShotVisualPromptingTask(OpenVINOVisualPromptingTask):
"""Task implementation for Zero-shot Visual Prompting using OpenVINO backend."""
[docs]
def load_inferencer(self) -> OpenVINOZeroShotVisualPromptingInferencer:
"""Load OpenVINO Zero-shot Visual Prompting Inferencer."""
if self.model is None:
raise RuntimeError("load_inferencer failed, model is None")
return OpenVINOZeroShotVisualPromptingInferencer(
self.hparams,
self.task_environment.label_schema,
model_files={
"image_encoder": self.model.get_data("visual_prompting_image_encoder.xml"),
"decoder": self.model.get_data("visual_prompting_decoder.xml"),
},
weight_files={
"image_encoder": self.model.get_data("visual_prompting_image_encoder.bin"),
"decoder": self.model.get_data("visual_prompting_decoder.bin"),
},
num_requests=get_default_async_reqs_num(),
)
[docs]
def infer(
self,
dataset: DatasetEntity,
inference_parameters: Optional[InferenceParameters] = None,
root: str = "vpm_zsl_reference_infos",
path_reference_info: str = "{}/reference_info.pickle",
) -> DatasetEntity:
"""Infer function of OpenVINOVisualPromptingTask.
Currently, asynchronous execution is not supported, synchronous execution will be executed instead.
"""
if inference_parameters is not None:
update_progress_callback = inference_parameters.update_progress
enable_async_inference = inference_parameters.enable_async_inference
else:
update_progress_callback = default_progress_callback
enable_async_inference = True
# FIXME (sungchul): Support async inference.
if enable_async_inference:
logger.warning("Asynchronous inference doesn't work, synchronous inference will be executed.")
enable_async_inference = False
predicted_validation_dataset = dataset.with_empty_annotations()
def add_prediction(id: int, annotations: List[Annotation]):
dataset_item = predicted_validation_dataset[id]
dataset_item.append_annotations(annotations)
total_time = 0.0
dataset_size = len(dataset)
if self.inferencer.reference_feats is None and self.inferencer.used_indices is None:
# set reference_feats and used_indices from previously saved reference_info
self.inferencer.reference_feats, self.inferencer.used_indices = self.inferencer._get_reference_info(
root, path_reference_info
)
if self.inferencer.reference_feats is None and self.inferencer.used_indices is None:
# if they are empty, stop inference and return empty dataset
logger.warning(
(
"reference_feats and used_indices are empty, stop inference and return empty dataset. "
"Please run learn function first."
)
)
return predicted_validation_dataset
for i, dataset_item in enumerate(dataset, 1):
start_time = time.perf_counter()
annotations = self.inferencer.predict(dataset_item)
add_prediction(i - 1, annotations)
end_time = time.perf_counter() - start_time
total_time += end_time
update_progress_callback(int(i / dataset_size * 100), None)
self.inferencer.await_all()
self._avg_time_per_image = total_time / len(dataset)
logger.info(f"Avg time per image: {self._avg_time_per_image} secs")
logger.info(f"Total time: {total_time} secs")
logger.info("Visual Prompting OpenVINO inference completed")
return predicted_validation_dataset
[docs]
def optimize(
self,
optimization_type: OptimizationType,
dataset: DatasetEntity,
output_model: ModelEntity,
optimization_parameters: Optional[OptimizationParameters] = None,
module_names: List[str] = ["image_encoder", "decoder"],
ov_dataloader: Type[OTXOpenVinoDataLoader] = OTXOpenVinoDataLoader,
**kwargs,
):
"""Optimize function of OpenVINOZeroShotVisualPromptingTask."""
self.inferencer: OpenVINOZeroShotVisualPromptingInferencer
reference_feats, used_indices = self.inferencer._get_reference_info()
return super().optimize(
optimization_type=optimization_type,
dataset=dataset,
output_model=output_model,
optimization_parameters=optimization_parameters,
module_names=module_names,
ov_dataloader=ov_dataloader,
reference_feats=reference_feats,
used_indices=used_indices,
)