Source code for datumaro.plugins.data_formats.voc.base

# Copyright (C) 2019-2023 Intel Corporation
# SPDX-License-Identifier: MIT

import logging as log
import os.path as osp
from typing import Dict, List, Optional, Tuple, Type, TypeVar

import numpy as np
from defusedxml import ElementTree

from datumaro.components.annotation import (
from datumaro.components.dataset_base import DatasetItem, SubsetBase
from datumaro.components.errors import (
from datumaro.components.importer import ImportContext
from import Image
from datumaro.util.image import find_images
from datumaro.util.mask_tools import invert_colormap, lazy_mask
from datumaro.util.meta_file_util import has_meta_file

from .format import (

_inverse_inst_colormap = invert_colormap(VocInstColormap)

T = TypeVar("T")

[docs] class VocBase(SubsetBase): def __init__( self, path: str, task: Optional[VocTask] = VocTask.voc, *, subset: Optional[str] = None, voc_importer_type: VocImporterType = VocImporterType.default, ctx: Optional[ImportContext] = None, **kwargs, ): if not subset: subset = osp.splitext(osp.basename(path))[0] super().__init__(subset=subset, ctx=ctx) if voc_importer_type == VocImporterType.default: dataset_dir = osp.dirname(osp.dirname(osp.dirname(path))) self._image_dir = osp.join(dataset_dir, VocPath.IMAGES_DIR) self._anno_dir = osp.join(dataset_dir, VocPath.ANNOTATIONS_DIR) self._mask_dir = osp.join(dataset_dir, VocPath.SEGMENTATION_DIR) self._inst_dir = osp.join(dataset_dir, VocPath.INSTANCES_DIR) elif voc_importer_type == VocImporterType.roboflow: dataset_dir = path self._image_dir = dataset_dir self._anno_dir = dataset_dir else: raise DatasetImportError(f"Not supported type: {voc_importer_type}") self._path = path self._task = task self._categories = self._load_categories(dataset_dir) if self._task in [VocTask.voc, VocTask.voc_segmentation, VocTask.voc_instance_segmentation]: label_color = lambda label_idx: self._categories[AnnotationType.mask].colormap.get( label_idx, None ) log.debug( "Loaded labels: %s", ", ".join( "'%s' %s" % (, ("(%s, %s, %s)" % c) if c else "") for i, l, c in ( (i, l, label_color(i)) for i, l in enumerate(self._categories[AnnotationType.label].items) ) ), ) self._items = {item: None for item in self._load_subset_list(path)} def _get_label_id(self, label: str) -> int: label_id, _ = self._categories[AnnotationType.label].find(label) if label_id is None: raise UndeclaredLabelError(label) return label_id def _load_categories(self, dataset_path): label_map = None if has_meta_file(dataset_path): label_map = parse_meta_file(dataset_path) else: label_map_path = osp.join(dataset_path, VocPath.LABELMAP_FILE) if osp.isfile(label_map_path): label_map = parse_label_map(label_map_path) return make_voc_categories(label_map, self._task) def _load_subset_list(self, subset_path): if not osp.isfile(subset_path): raise DatasetImportError(f"Can't find txt subset list file at '{subset_path}'") subset_list = [] with open(subset_path, encoding="utf-8") as f: for i, line in enumerate(f): line = line.strip() if not line or line[0] == "#": continue if self._task == VocTask.voc_layout: objects = line.split('"') if 1 < len(objects): if len(objects) == 3: line = objects[1] else: raise InvalidAnnotationError( f"{osp.basename(subset_path)}:{i+1}: " "unexpected number of quotes in filename, expected 0 or 2" ) else: line = line.split()[0] else: line = line.strip() subset_list.append(line) return subset_list def __iter__(self): if osp.isdir(self._image_dir): images = { osp.splitext(osp.relpath(p, self._image_dir))[0].replace("\\", "/"): p for p in find_images(self._image_dir, recursive=True) } else: images = {} annotations = ( self._parse_labels() if self._task in [VocTask.voc, VocTask.voc_classification] else {} ) for item_id in self._ctx.progress_reporter.iter( self._items, desc=f"Importing '{self._subset}'" ): log.debug("Reading item '%s'" % item_id) size = None try: anns = annotations.get(item_id, []) image = None ann_file = osp.join(self._anno_dir, item_id + ".xml") if osp.isfile(ann_file) and self._task not in [ VocTask.voc_classification, VocTask.voc_segmentation, ]: root_elem = ElementTree.parse(ann_file).getroot() if root_elem.tag != "annotation": raise MissingFieldError("annotation") height = self._parse_field(root_elem, "size/height", int, required=False) width = self._parse_field(root_elem, "size/width", int, required=False) if height and width: size = (height, width) filename_elem = root_elem.find("filename") if filename_elem is not None: image = osp.join(self._image_dir, filename_elem.text) anns += self._parse_annotations(root_elem, item_id=(item_id, self._subset)) if self._task in [ VocTask.voc, VocTask.voc_segmentation, VocTask.voc_instance_segmentation, ]: anns += self._parse_masks(item_id) if image is None: image = images.pop(item_id, None) if image or size: image = Image.from_file(path=image, size=size) yield DatasetItem(id=item_id, subset=self._subset, media=image, annotations=anns) except ElementTree.ParseError as e: readable_wrapper = InvalidAnnotationError("Failed to parse XML file") readable_wrapper.__cause__ = e self._ctx.error_policy.report_item_error( readable_wrapper, item_id=(item_id, self._subset) ) except Exception as e: self._ctx.error_policy.report_item_error(e, item_id=(item_id, self._subset)) @staticmethod def _parse_field(root, xpath: str, cls: Type[T] = str, required: bool = True) -> Optional[T]: elem = root.find(xpath) if elem is None: if required: raise MissingFieldError(xpath) else: return None if cls is str: return elem.text try: return cls(elem.text) except Exception as e: raise InvalidFieldError(xpath) from e @staticmethod def _parse_bool_field(root, xpath: str, default: bool = False) -> Optional[bool]: elem = root.find(xpath) if elem is None: return default if elem.text not in ["0", "1"]: raise InvalidFieldError(xpath) return elem.text == "1" def _parse_attribute(self, object_elem): attributes = {} for key in ["difficult", "truncated", "occluded"]: attributes[key] = self._parse_bool_field(object_elem, key, default=False) pose_elem = object_elem.find("pose") if pose_elem is not None: attributes["pose"] = pose_elem.text point_elem = object_elem.find("point") if point_elem is not None: point_x = self._parse_field(point_elem, "x", float) point_y = self._parse_field(point_elem, "y", float) attributes["point"] = (point_x, point_y) attributes_elem = object_elem.find("attributes") if attributes_elem is not None: for attr_elem in attributes_elem.iter("attribute"): attributes[self._parse_field(attr_elem, "name")] = self._parse_field( attr_elem, "value" ) return attributes @classmethod def _parse_bbox(cls, object_elem): bbox_elem = object_elem.find("bndbox") if not bbox_elem: raise MissingFieldError("bndbox") xmin = cls._parse_field(bbox_elem, "xmin", float) xmax = cls._parse_field(bbox_elem, "xmax", float) ymin = cls._parse_field(bbox_elem, "ymin", float) ymax = cls._parse_field(bbox_elem, "ymax", float) return [xmin, ymin, xmax - xmin, ymax - ymin] def _parse_annotations(self, root_elem, *, item_id: Tuple[str, str]) -> List[Annotation]: item_annotations = [] obj_id = 0 for object_elem in root_elem.iterfind("object"): try: label_name = self._parse_field(object_elem, "name") # person_layout and action_classification are only available for background and person if self._task in [VocTask.voc_layout, VocTask.voc_action] and ( label_name not in ["person", "background"] ): continue obj_label_id = self._get_label_id(label_name) obj_bbox = self._parse_bbox(object_elem) attributes = self._parse_attribute(object_elem) group = obj_id if self._task in [VocTask.voc, VocTask.voc_layout]: for part_elem in object_elem.findall("part"): part_label_id = self._get_label_id(self._parse_field(part_elem, "name")) part_bbox = self._parse_bbox(part_elem) item_annotations.append(Bbox(*part_bbox, label=part_label_id, group=group)) if self._task in [VocTask.voc, VocTask.voc_action]: actions_elem = object_elem.find("actions") actions = { a: False for a in self._categories[AnnotationType.label] .items[obj_label_id] .attributes } if actions_elem is not None: for action_elem in actions_elem: actions[action_elem.tag] = self._parse_bool_field( actions_elem, action_elem.tag ) for action, present in actions.items(): attributes[action] = present item_annotations.append( Bbox( *obj_bbox, label=obj_label_id, attributes=attributes, id=obj_id, group=group ) ) obj_id += 1 except Exception as e: self._ctx.error_policy.report_annotation_error(e, item_id=item_id) return item_annotations @staticmethod def _lazy_extract_mask(mask, c): return lambda: mask == c def _parse_masks(self, item_id): item_annotations = [] class_mask = None segm_path = osp.join(self._mask_dir, item_id + VocPath.SEGM_EXT) if osp.isfile(segm_path): inverse_cls_colormap = self._categories[AnnotationType.mask].inverse_colormap class_mask = lazy_mask(segm_path, inverse_cls_colormap) instances_mask = None inst_path = osp.join(self._inst_dir, item_id + VocPath.SEGM_EXT) if osp.isfile(inst_path): instances_mask = lazy_mask(inst_path, _inverse_inst_colormap) label_cat = self._categories[AnnotationType.label] if instances_mask is not None: compiled_mask = CompiledMask(class_mask, instances_mask) if class_mask is not None: instance_labels = compiled_mask.get_instance_labels() else: instance_labels = {i: None for i in range(compiled_mask.instance_count)} for instance_id, label_id in instance_labels.items(): if len(label_cat) <= label_id: self._ctx.error_policy.report_annotation_error( UndeclaredLabelError(str(label_id)), item_id=(item_id, self._subset) ) image = compiled_mask.lazy_extract(instance_id) item_annotations.append(Mask(image=image, label=label_id, group=instance_id)) elif class_mask is not None: log.warning("Item %s: only class segmentations available", item_id) class_mask = class_mask() classes = np.unique(class_mask) for label_id in classes: if len(label_cat) <= label_id: self._ctx.error_policy.report_annotation_error( UndeclaredLabelError(str(label_id)), item_id=(item_id, self._subset) ) image = self._lazy_extract_mask(class_mask, label_id) item_annotations.append(Mask(image=image, label=label_id)) return item_annotations def _parse_labels(self) -> Dict[str, List[Label]]: annotations = {} task_dir = osp.dirname(self._path) for label_id, label in enumerate(self._categories[AnnotationType.label]): ann_file = osp.join(task_dir, f"{}_{self._subset}.txt") if not osp.isfile(ann_file): continue with open(ann_file, encoding="utf-8") as f: for i, line in enumerate(f): line = line.strip() if not line or line[0] == "#": continue parts = line.rsplit(maxsplit=1) if len(parts) != 2: raise InvalidAnnotationError( f"{osp.basename(ann_file)}:{i+1}: " "invalid number of fields in line, expected 2" ) item, present = parts if present not in ["-1", "0", "1"]: # Both -1 and 0 are used in the original VOC, they mean the same raise InvalidAnnotationError( f"{osp.basename(ann_file)}:{i+1}: " f"unexpected class existence value '{present}', expected -1, 0 or 1" ) if present == "1": annotations.setdefault(item, []).append(Label(label_id)) return annotations @property def is_stream(self) -> bool: return True
[docs] class VocClassificationBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_classification, **kwargs)
[docs] class VocDetectionBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_detection, **kwargs)
[docs] class VocSegmentationBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_segmentation, **kwargs)
[docs] class VocInstanceSegmentationBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_instance_segmentation, **kwargs)
[docs] class VocLayoutBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_layout, **kwargs)
[docs] class VocActionBase(VocBase): def __init__(self, path, **kwargs): super().__init__(path, task=VocTask.voc_action, **kwargs)