Source code for datumaro.plugins.data_formats.sly_pointcloud.base

# Copyright (C) 2022-2023 Intel Corporation
#
# SPDX-License-Identifier: MIT

import errno
import os.path as osp
from glob import iglob
from typing import List, Optional

from datumaro.components.annotation import AnnotationType, Cuboid3d, LabelCategories
from datumaro.components.dataset_base import DatasetItem, SubsetBase
from datumaro.components.errors import InvalidFieldError, UndeclaredLabelError
from datumaro.components.importer import ImportContext, Importer
from datumaro.components.media import Image, PointCloud
from datumaro.util import parse_json_file
from datumaro.util.image import find_images

from .format import PointCloudPath


[docs] class SuperviselyPointCloudBase(SubsetBase): NAME = "sly_pointcloud" _SUPPORTED_SHAPES = "cuboid" def __init__( self, path: str, *, subset: Optional[str] = None, ctx: Optional[ImportContext] = None, ): if not osp.isfile(path): raise FileNotFoundError(errno.ENOENT, "Can't find annotations file", path) rootdir = osp.abspath(osp.dirname(path)) self._rootdir = rootdir super().__init__(subset=subset, media_type=PointCloud, ctx=ctx) items, categories = self._parse(rootdir) self._categories = categories self._items = list(self._load_items(items).values()) @classmethod def _parse(cls, rootpath): mapping = parse_json_file(osp.join(rootpath, PointCloudPath.KEY_ID_FILE)) meta = parse_json_file(osp.join(rootpath, PointCloudPath.META_FILE)) label_cat = LabelCategories() for label in meta.get("classes", []): label_cat.add(label["title"]) tags = {} for tag in meta.get("tags", []): # See reference at: # https://github.com/supervisely/supervisely/blob/047e52ebe407cfee61464c1bd0beb9c906892253/supervisely_lib/annotation/tag_meta.py#L139 tags[tag["name"]] = tag applicable_to = tag.get("applicable_type", "all") if applicable_to == "imagesOnly": continue # an image attribute elif applicable_to not in {"all", "objectsOnly"}: raise InvalidFieldError(applicable_to) applicable_classes = tag.get("classes", []) if not applicable_classes: label_cat.attributes.add(tag["name"]) else: for label_name in applicable_classes: _, label = label_cat.find(label_name) if label is None: raise UndeclaredLabelError(label_name) label.attributes.add(tag["name"]) categories = {AnnotationType.label: label_cat} def _get_label_attrs(label_id): attrs = set(label_cat.attributes) attrs.update(label_cat[label_id].attributes) return attrs def _parse_tag(tag): if tag["value"] == "true": value = True elif tag["value"] == "false": value = False else: value = tag["value"] return value ann_dir = osp.join(rootpath, PointCloudPath.BASE_DIR, PointCloudPath.ANNNOTATION_DIR) items = {} for ann_file in iglob(osp.join(ann_dir, "**", "*.json"), recursive=True): ann_data = parse_json_file(ann_file) objects = {} for obj in ann_data["objects"]: obj["id"] = mapping["objects"][obj["key"]] objects[obj["key"]] = obj frame_attributes = {"description": ann_data.get("description", "")} for tag in ann_data["tags"]: frame_attributes[tag["name"]] = _parse_tag(tag) frame = mapping["videos"][ann_data["key"]] frame_desc = items.setdefault( frame, { "name": osp.splitext(osp.relpath(ann_file, ann_dir))[0], "annotations": [], "attributes": frame_attributes, }, ) for figure in ann_data["figures"]: geometry = { dst_field: [ float(figure["geometry"][src_field][axis]) for axis in ["x", "y", "z"] ] for src_field, dst_field in { "position": "position", "rotation": "rotation", "dimensions": "scale", }.items() } ann_id = mapping["figures"][figure["key"]] obj = objects[figure["objectKey"]] label = categories[AnnotationType.label].find(obj["classTitle"])[0] attributes = {} attributes["track_id"] = obj["id"] for tag in obj.get("tags", []): attributes[tag["name"]] = _parse_tag(tag) for attr in _get_label_attrs(label): if attr in attributes: continue if tags[attr]["value_type"] == "any_string": value = "" elif tags[attr]["value_type"] == "oneof_string": value = (tags[attr]["values"] or [""])[0] elif tags[attr]["value_type"] == "any_number": value = 0 else: value = None attributes[attr] = value shape = Cuboid3d(**geometry, label=label, id=ann_id, attributes=attributes) frame_desc["annotations"].append(shape) return items, categories def _load_items(self, parsed): for frame_id, frame_desc in parsed.items(): pcd_name = frame_desc["name"] name = osp.splitext(pcd_name)[0] pcd_path = osp.join( self._rootdir, PointCloudPath.BASE_DIR, PointCloudPath.POINT_CLOUD_DIR, pcd_name ) assert pcd_path.endswith(".pcd"), pcd_path related_images_dir = osp.join( self._rootdir, PointCloudPath.BASE_DIR, PointCloudPath.RELATED_IMAGES_DIR, name + "_pcd", ) related_images = None if osp.isdir(related_images_dir): related_images = [ Image.from_file(path=image) for image in find_images(related_images_dir) ] parsed[frame_id] = DatasetItem( id=name, subset=self._subset, media=PointCloud.from_file(path=pcd_path, extra_images=related_images), annotations=frame_desc.get("annotations"), attributes={"frame": int(frame_id), **frame_desc["attributes"]}, ) for ann in frame_desc.get("annotations"): self._ann_types.add(ann.type) return parsed
[docs] class SuperviselyPointCloudImporter(Importer): NAME = "sly_pointcloud" _ANNO_EXT = ".json"
[docs] @classmethod def find_sources(cls, path): return cls._find_sources_recursive(path, cls._ANNO_EXT, "sly_pointcloud", filename="meta")
[docs] @classmethod def get_file_extensions(cls) -> List[str]: return [cls._ANNO_EXT]