Source code for otx.algorithms.detection.utils.utils

"""Utils for OTX Detection."""
# Copyright (C) 2021 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

import colorsys
import random
from typing import Any, List, Optional, Sequence, Tuple

import cv2
import numpy as np
import pycocotools.mask as mask_util

from otx.api.entities.annotation import Annotation
from otx.api.entities.color import Color
from otx.api.entities.id import ID
from otx.api.entities.label import Domain, LabelEntity
from otx.api.entities.label_schema import LabelGroup, LabelGroupType, LabelSchemaEntity
from otx.api.entities.model_template import TaskType
from otx.api.entities.scored_label import ScoredLabel
from otx.api.entities.shapes.ellipse import Ellipse
from otx.api.entities.shapes.polygon import Point, Polygon
from otx.api.entities.shapes.rectangle import Rectangle

# pylint: disable=invalid-name


class ColorPalette:
    """ColorPalette class."""

    def __init__(self, n: int, rng: Optional[random.Random] = None):
        assert n > 0

        if rng is None:
            rng = random.Random(0xACE)  # nosec B311 used rng only for generating general purpose numbers

        candidates_num = 100
        hsv_colors = [(1.0, 1.0, 1.0)]
        for _ in range(1, n):
            colors_candidates = [
                (rng.random(), rng.uniform(0.8, 1.0), rng.uniform(0.5, 1.0)) for _ in range(candidates_num)
            ]
            min_distances = [self._min_distance(hsv_colors, c) for c in colors_candidates]
            arg_max = np.argmax(min_distances)
            hsv_colors.append(colors_candidates[arg_max])

        self.palette = [Color(*self._hsv2rgb(*hsv)) for hsv in hsv_colors]

    @staticmethod
    def _dist(c1, c2):
        dh = min(abs(c1[0] - c2[0]), 1 - abs(c1[0] - c2[0])) * 2
        ds = abs(c1[1] - c2[1])
        dv = abs(c1[2] - c2[2])
        return dh * dh + ds * ds + dv * dv

    @classmethod
    def _min_distance(cls, colors_set, color_candidate):
        distances = [cls._dist(o, color_candidate) for o in colors_set]
        return np.min(distances)

    @staticmethod
    def _hsv2rgb(h, s, v):
        return tuple(round(c * 255) for c in colorsys.hsv_to_rgb(h, s, v))

    def __getitem__(self, n: int):
        """Return item from index function ColorPalette."""
        return self.palette[n % len(self.palette)]

    def __len__(self):
        """Return length of ColorPalette."""
        return len(self.palette)


[docs] def generate_label_schema(label_names: Sequence[str], label_domain: Domain = Domain.DETECTION): """Generating label_schema function.""" colors = ColorPalette(len(label_names)) if len(label_names) > 0 else [] not_empty_labels = [ LabelEntity(name=name, color=colors[i], domain=label_domain, id=ID(f"{i:08}")) # type: ignore for i, name in enumerate(label_names) ] emptylabel = LabelEntity( name="Empty label", color=Color(42, 43, 46), is_empty=True, domain=label_domain, id=ID(f"{len(not_empty_labels):08}"), ) label_schema = LabelSchemaEntity() exclusive_group = LabelGroup(name="labels", labels=not_empty_labels, group_type=LabelGroupType.EXCLUSIVE) empty_group = LabelGroup(name="empty", labels=[emptylabel], group_type=LabelGroupType.EMPTY_LABEL) label_schema.add_group(exclusive_group) label_schema.add_group(empty_group) return label_schema
[docs] def get_det_model_api_configuration( label_schema: LabelSchemaEntity, task_type: TaskType, confidence_threshold: float, tiling_parameters: Any, use_ellipse_shapes: bool, nms_iou_threshold: float, ): """Get ModelAPI config.""" omz_config = {} all_labels = "" all_label_ids = "" if task_type == TaskType.DETECTION: omz_config[("model_info", "model_type")] = "ssd" omz_config[("model_info", "task_type")] = "detection" if task_type == TaskType.INSTANCE_SEGMENTATION: omz_config[("model_info", "model_type")] = "MaskRCNN" omz_config[("model_info", "task_type")] = "instance_segmentation" all_labels = "otx_empty_lbl " all_label_ids = "None " if tiling_parameters.enable_tiling: omz_config[("model_info", "resize_type")] = "fit_to_window_letterbox" if task_type == TaskType.ROTATED_DETECTION: omz_config[("model_info", "model_type")] = "MaskRCNN" omz_config[("model_info", "task_type")] = "rotated_detection" all_labels = "otx_empty_lbl " all_label_ids = "None " if tiling_parameters.enable_tiling: omz_config[("model_info", "resize_type")] = "fit_to_window_letterbox" omz_config[("model_info", "confidence_threshold")] = str(confidence_threshold) omz_config[("model_info", "iou_threshold")] = str(nms_iou_threshold) omz_config[("model_info", "use_ellipse_shapes")] = str(use_ellipse_shapes) if tiling_parameters.enable_tiling: omz_config[("model_info", "tile_size")] = str( int(tiling_parameters.tile_size * tiling_parameters.tile_ir_scale_factor) ) omz_config[("model_info", "tiles_overlap")] = str( tiling_parameters.tile_overlap / tiling_parameters.tile_ir_scale_factor ) omz_config[("model_info", "max_pred_number")] = str(tiling_parameters.tile_max_number) for lbl in label_schema.get_labels(include_empty=False): all_labels += lbl.name.replace(" ", "_") + " " all_label_ids += f"{lbl.id_} " omz_config[("model_info", "labels")] = all_labels.strip() omz_config[("model_info", "label_ids")] = all_label_ids.strip() return omz_config
def expand_box(box: np.ndarray, scale_h: float, scale_w: float): """Expand the box. Args: box (np.ndarray): bounding box scale_h (float): scaling factor for height scale_w (float): scaling factor for width Returns: expanded box (np.ndarray): x1, y1, x2, y2 coordinates of the expanded box """ w_half = (box[2] - box[0]) * 0.5 h_half = (box[3] - box[1]) * 0.5 x_c = (box[2] + box[0]) * 0.5 y_c = (box[3] + box[1]) * 0.5 w_half *= scale_w h_half *= scale_h expanded_box = np.zeros(box.shape) expanded_box[0] = x_c - w_half expanded_box[2] = x_c + w_half expanded_box[1] = y_c - h_half expanded_box[3] = y_c + h_half return expanded_box def mask_resize(box: np.ndarray, mask: np.ndarray, img_height: int, img_width: int): """Resize mask to the size of the bounding box. Args: box (np.ndarray): bounding box which enclosing the mask mask (np.ndarray): mask to be resize img_height (int): image height img_width (int): image width Returns: bit_mask (np.ndarray): full size mask """ # scaling bbox to prevent up-sampling artifacts on segment borders. mask = np.pad(mask, ((1, 1), (1, 1)), "constant", constant_values=0) scale_h = mask.shape[0] / (mask.shape[0] - 2.0) scale_w = mask.shape[1] / (mask.shape[1] - 2.0) extended_box = expand_box(box, scale_h=scale_h, scale_w=scale_w).astype(int) w, h = np.maximum(extended_box[2:] - extended_box[:2] + 1, 1) x0, y0 = np.clip(extended_box[:2], a_min=0, a_max=[img_width, img_height]) x1, y1 = np.clip(extended_box[2:] + 1, a_min=0, a_max=[img_width, img_height]) mask = cv2.resize(mask.astype(np.float32), (w, h)) > 0.5 mask = mask.astype(np.uint8) bit_mask = np.zeros((img_height, img_width), dtype=np.uint8) bit_mask[y0:y1, x0:x1] = mask[ (y0 - extended_box[1]) : (y1 - extended_box[1]), (x0 - extended_box[0]) : (x1 - extended_box[0]), ] return bit_mask
[docs] def create_detection_shapes( pred_results: List[np.ndarray], width: int, height: int, confidence_threshold: float, use_ellipse_shapes: bool, labels: List, ): """Create prediction detection shapes. Args: pred_results (list(np.ndarray)): per class predicted boxes width (int): image width height (int): image height confidence_threshold (float): confidence threshold for filtering predictions use_ellipse_shapes (bool): if True, use ellipse shapes labels (list): dataset labels Returns: shapes: list of prediction shapes (Annotation) """ shapes = [] for label_idx, detections in enumerate(pred_results): for det in detections: probability = float(det[4]) coords = det[:4].astype(float).copy() coords /= np.array([width, height, width, height], dtype=float) coords = np.clip(coords, 0, 1) if (probability < confidence_threshold) or (coords[3] - coords[1] <= 0 or coords[2] - coords[0] <= 0): continue assigned_label = [ScoredLabel(labels[label_idx], probability=probability)] if not use_ellipse_shapes: shapes.append( Annotation( Rectangle(x1=coords[0], y1=coords[1], x2=coords[2], y2=coords[3]), labels=assigned_label, ) ) else: shapes.append( Annotation( Ellipse(coords[0], coords[1], coords[2], coords[3]), labels=assigned_label, ) ) return shapes
[docs] def create_mask_shapes( pred_results: Tuple, width: int, height: int, confidence_threshold: float, use_ellipse_shapes: bool, labels: List, rotated_polygon: bool = False, ): """Create prediction mask shapes. Args: pred_results (tuple): tuple of predicted boxes and masks for each dataset item width (int): image width height (int): image height confidence_threshold (float): confidence threshold for filtering predictions use_ellipse_shapes (bool): if True, use ellipse shapes labels (list): dataset labels rotated_polygon (bool, optional): if True, use rotated polygons for mask shapes Returns: shapes: list of prediction shapes (Annotation) """ shapes = [] for label_idx, (boxes, masks) in enumerate(zip(*pred_results)): for mask, box in zip(masks, boxes): probability = float(box[4]) if probability < confidence_threshold: continue assigned_label = [ScoredLabel(labels[label_idx], probability=probability)] if not use_ellipse_shapes: if isinstance(mask, dict): mask = mask_util.decode(mask) if mask.shape[0] != height or mask.shape[1] != width: # resize mask to the size of the bounding box coords = box[:4].astype(float).copy() mask = mask_resize(coords, mask, height, width) if mask.sum() == 0: continue contours, hierarchies = cv2.findContours(mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) if hierarchies is None: continue for contour, hierarchy in zip(contours, hierarchies[0]): # skip inner contours if hierarchy[3] != -1 or len(contour) <= 2: continue if rotated_polygon: box_points = cv2.boxPoints(cv2.minAreaRect(contour)) points = [Point(x=(point[0]) / width, y=(point[1]) / height) for point in box_points] else: points = [Point(x=(point[0][0]) / width, y=(point[0][1]) / height) for point in contour] polygon = Polygon(points=points) if cv2.contourArea(contour) > 0 and polygon.get_area() > 1e-12: shapes.append(Annotation(polygon, labels=assigned_label, id=ID(f"{label_idx:08}"))) else: ellipse = Ellipse((box[0]) / width, (box[1]) / height, (box[2]) / width, (box[3]) / height) shapes.append(Annotation(ellipse, labels=assigned_label, id=ID(f"{label_idx:08}"))) return shapes