Source code for otx.algorithms.common.utils.data

"""Collections of Dataset utils for common OTX algorithms."""
# Copyright (C) 2022-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=invalid-name

import glob
import os
import random
from typing import Any, Dict, List, Optional, Union

import cv2
import numpy as np

from otx.api.entities.annotation import NullAnnotationSceneEntity
from otx.api.entities.dataset_item import DatasetItemEntity
from otx.api.entities.datasets import DatasetEntity
from otx.api.entities.image import Image
from otx.api.entities.subset import Subset
from otx.api.utils.argument_checks import IMAGE_FILE_EXTENSIONS
from otx.utils.logger import get_logger

logger = get_logger()


def get_unlabeled_filename(base_root: str, file_list_path: str):
    """This method checks and gets image file paths, which are listed in file_list_path.

    The content of file_list_path is expected to specify relative paths of each image file to base_root line by line.
    It returns the list of image filenames only which will compose unlabeled dataset.

    Args:
        base_root (str): path of base root dir where unlabeled images are.
        file_list_path (str) : path of file which contains relative paths of unlabeled data to base_root.

    Returns:
        List[str]: a list of existing image file paths which will be unlabeled data items.
    """

    def is_valid(file_path):
        return file_path.lower().endswith(tuple(IMAGE_FILE_EXTENSIONS))

    with open(file_list_path, "r", encoding="UTF-8") as f:
        file_names = f.read().splitlines()
    unlabeled_files = []
    for fn in file_names:
        file_path = os.path.join(base_root, fn.strip())
        if is_valid(file_path) and os.path.isfile(file_path):
            unlabeled_files.append(file_path)
    return unlabeled_files


def load_unlabeled_dataset_items(
    data_root_dir: str,
    file_list_path: Optional[str] = None,
):
    """This method loads unlabeled dataset items from images in data_root_dir.

    Args:
        data_root_dir (str): path of base root directory where unlabeled images are.
        file_list_path (str) : path of a file which contains relative paths of unlabeled data to base_root.
        subset (Subset) : Entity subset category
    Returns:
        List[DatasetItemEntity]: a list of unlabeled dataset item entity.
    """
    if file_list_path is not None:
        data_list = get_unlabeled_filename(data_root_dir, file_list_path)

    else:
        data_list = []

        for ext in IMAGE_FILE_EXTENSIONS:
            data_list.extend(glob.glob(f"{data_root_dir}/**/*{ext}", recursive=True))

    dataset_items = []

    for filename in data_list:
        dataset_item = DatasetItemEntity(
            media=Image(file_path=filename),
            annotation_scene=NullAnnotationSceneEntity(),
            subset=Subset.UNLABELED,
        )
        dataset_items.append(dataset_item)
    return dataset_items


def get_dataset(dataset: DatasetEntity, subset: Subset):
    """Get dataset from datasetentity."""
    data = dataset.get_subset(subset)
    return data if len(data) > 0 else None


[docs] def get_cls_img_indices(labels, dataset): """Function for getting image indices per class. Args: labels (List[LabelEntity]): List of labels dataset(DatasetEntity): dataset entity """ img_indices = {label.name: [] for label in labels} for i, item in enumerate(dataset): item_labels = item.annotation_scene.get_labels() for i_l in item_labels: if i_l in labels: img_indices[i_l.name].append(i) return img_indices
[docs] def get_old_new_img_indices(labels, new_classes, dataset): """Function for getting old & new indices of dataset. Args: labels (List[LabelEntity]): List of labels new_classes(List[str]): List of new classes dataset(DatasetEntity): dataset entity """ ids_old, ids_new = [], [] _dataset_label_schema_map = {label.name: label for label in labels} new_classes = [_dataset_label_schema_map[new_class] for new_class in new_classes] for i, item in enumerate(dataset): if item.annotation_scene.contains_any(new_classes): ids_new.append(i) else: ids_old.append(i) return {"old": ids_old, "new": ids_new}
[docs] def get_image(results: Dict[str, Any], cache_dir: str, to_float32=False) -> np.ndarray: """Load an image and cache it if it's a training video frame. Args: results (Dict[str, Any]): A dictionary that contains information about the dataset item. cache_dir (str): A directory path where the cached images will be stored. to_float32 (bool, optional): A flag indicating whether to convert the image to float32. Defaults to False. Returns: np.ndarray: The loaded image. """ def is_training_video_frame(subset, media) -> bool: return subset.name in ["TRAINING", "VALIDATION"] and "VideoFrame" in repr(media) def load_image_from_cache(filename: str, to_float32=False) -> Union[np.ndarray, None]: try: cached_img = cv2.imread(filename) if to_float32: cached_img = cached_img.astype(np.float32) return cached_img except Exception as e: # pylint: disable=broad-except logger.warning(f"Skip loading cached {filename} \nError msg: {e}") return None def save_image_to_cache(img: np.array, filename: str): tmp_filename = filename.replace(".png", "-tmp.png") if os.path.exists(filename) or os.path.exists(tmp_filename): # if image is cached or caching return try: cv2.imwrite(tmp_filename, img=img) except Exception as e: # pylint: disable=broad-except logger.warning(f"Skip caching for {filename} \nError msg: {e}") return if os.path.exists(tmp_filename) and not os.path.exists(filename): try: os.replace(tmp_filename, filename) except Exception as e: # pylint: disable=broad-except os.remove(tmp_filename) logger.warning(f"Failed to rename {tmp_filename} -> {filename} \nError msg: {e}") subset = results["dataset_item"].subset media = results["dataset_item"].media if is_training_video_frame(subset, media): index = results["index"] filename = os.path.join(cache_dir, f"{subset}-{index:06d}.png") if os.path.exists(filename): loaded_img = load_image_from_cache(filename, to_float32=to_float32) if loaded_img is not None: return loaded_img img = results["dataset_item"].numpy # this takes long for VideoFrame if to_float32: img = img.astype(np.float32) if is_training_video_frame(subset, media): save_image_to_cache(img, filename) return img
[docs] class OTXOpenVinoDataLoader: """DataLoader implementation for ClassificationOpenVINOTask.""" def __init__(self, dataset: DatasetEntity, inferencer: Any, shuffle: bool = True): super().__init__() self.dataset = dataset self.inferencer = inferencer self.shuffler = None if shuffle: self.shuffler = list(range(len(dataset))) random.shuffle(self.shuffler) def __getitem__(self, index: int): """Get item from dataset.""" if self.shuffler is not None: index = self.shuffler[index] image = self.dataset[index].numpy annotation = self.dataset[index].annotation_scene resized_image = self.inferencer.model.resize(image, (self.inferencer.model.w, self.inferencer.model.h)) resized_image = self.inferencer.model.input_transform(resized_image) resized_image = self.inferencer.model._change_layout(resized_image) return resized_image, annotation def __len__(self): """Get length of dataset.""" return len(self.dataset)
def compute_robust_statistics(values: np.array) -> Dict[str, float]: """Computes robust statistics of given samples. Args: values (np.array): Array of samples Returns: Dict[str, float]: Robust avg, min, max values """ stat: Dict = {} if values.size == 0: return stat avg_value = np.mean(values) std_value = np.std(values) avg_3std_min_value = avg_value - 3 * std_value avg_3std_max_value = avg_value + 3 * std_value min_value = np.min(values) max_value = np.max(values) # Refine min/max to reduce outlier effect robust_min_value = max(min_value, avg_3std_min_value) robust_max_value = min(max_value, avg_3std_max_value) stat["avg"] = float(avg_value) stat["std"] = float(std_value) stat["min"] = float(min_value) stat["max"] = float(max_value) stat["robust_min"] = float(robust_min_value) stat["robust_max"] = float(robust_max_value) return stat def compute_robust_scale_statistics(values: np.array) -> Dict[str, float]: """Computes robust statistics of scale values. Average of 0.5x scale and 2x scale should be 1x Args: values (np.array): Array of positive scale values Returns: Dict[str, float]: Robust avg, min, max values """ # Compute stat in log scale & convert back to original scale if values.size == 0: return {} stat = compute_robust_statistics(np.log(values)) stat = {k: float(np.exp(v)) for k, v in stat.items()} stat["std"] = float(np.std(values)) # Normal scale std is better for understanding return stat def compute_robust_dataset_statistics(dataset: DatasetEntity, ann_stat=False, max_samples=1000) -> Dict[str, Any]: """Computes robust statistics of image & annotation sizes. Args: dataset (DatasetEntity): Input dataset. ann_stat (bool, optional): Whether to compute annotation size statistics. Defaults to False. max_samples (int, optional): Maximum number of dataset subsamples to analyze. Defaults to 1000. Returns: Dict[str, Any]: Robust avg, min, max values for images, and annotations optionally. ex) stat = { "image": {"avg": ...}, "annotation": { "num_per_image": {"avg": ...}, "size_of_shape": {"avg": ...}, } } """ stat: Dict = {} if len(dataset) == 0 or max_samples <= 0: return stat max_image_samples = min(max_samples, len(dataset)) image_indices = np.random.permutation(len(dataset))[:max_image_samples] image_sizes = [] for i in image_indices: data = dataset[int(i)] image_sizes.append(np.sqrt(data.width * data.height)) stat["image"] = compute_robust_scale_statistics(np.array(image_sizes)) if ann_stat: stat["annotation"] = {} num_per_images: List[int] = [] size_of_shapes: List[float] = [] for i in image_indices: data = dataset[int(i)] annotations = data.get_annotations() num_per_images.append(len(annotations)) if len(size_of_shapes) >= max_samples: continue image_area = data.width * data.height def scale_of(ann): return np.sqrt(image_area * ann.shape.get_area()) size_of_shapes.extend( filter(lambda x: x >= 1, map(scale_of, annotations)) ) # Filter out shapes smaller than 1 pixel as outlier stat["annotation"]["num_per_image"] = compute_robust_statistics(np.array(num_per_images)) stat["annotation"]["size_of_shape"] = compute_robust_scale_statistics(np.array(size_of_shapes)) return stat