Source code for datumaro.components.exporter

# Copyright (C) 2019-2024 Intel Corporation
#
# SPDX-License-Identifier: MIT

import logging as log
import os
import os.path as osp
import shutil
from tempfile import mkdtemp
from typing import NoReturn, Optional, Tuple, TypeVar, Union

import attr
from attrs import define, field

from datumaro.components.annotation import HashKey
from datumaro.components.cli_plugin import CliPlugin
from datumaro.components.crypter import NULL_CRYPTER, Crypter
from datumaro.components.dataset_base import DatasetItem, IDataset
from datumaro.components.errors import (
    AnnotationExportError,
    DatasetExportError,
    DatumaroError,
    ItemExportError,
)
from datumaro.components.media import Image, PointCloud, Video, VideoFrame
from datumaro.components.progress_reporting import NullProgressReporter, ProgressReporter
from datumaro.util.meta_file_util import save_hashkey_file, save_meta_file
from datumaro.util.os_util import rmtree
from datumaro.util.scope import on_error_do, scoped

T = TypeVar("T")


class _ExportFail(DatumaroError):
    pass


[docs] class ExportErrorPolicy:
[docs] def report_item_error(self, error: Exception, *, item_id: Tuple[str, str]) -> None: """ Allows to report a problem with a dataset item. If this function returns, the converter must skip the item. """ if not isinstance(error, _ExportFail): ie = ItemExportError(item_id) ie.__cause__ = error return self._handle_item_error(ie) else: raise error
[docs] def report_annotation_error(self, error: Exception, *, item_id: Tuple[str, str]) -> None: """ Allows to report a problem with a dataset item annotation. If this function returns, the converter must skip the annotation. """ if not isinstance(error, _ExportFail): ie = AnnotationExportError(item_id) ie.__cause__ = error return self._handle_annotation_error(ie) else: raise error
def _handle_item_error(self, error: ItemExportError) -> None: """This function must either call fail() or return.""" self.fail(error) def _handle_annotation_error(self, error: AnnotationExportError) -> None: """This function must either call fail() or return.""" self.fail(error)
[docs] def fail(self, error: Exception) -> NoReturn: raise _ExportFail from error
[docs] class FailingExportErrorPolicy(ExportErrorPolicy): pass
[docs] @define(eq=False) class ExportContext: progress_reporter: ProgressReporter = field( default=None, converter=attr.converters.default_if_none(factory=NullProgressReporter) ) error_policy: ExportErrorPolicy = field( default=None, converter=attr.converters.default_if_none(factory=FailingExportErrorPolicy) )
[docs] class NullExportContext(ExportContext): pass
[docs] class Exporter(CliPlugin): DEFAULT_IMAGE_EXT = None
[docs] @classmethod def build_cmdline_parser(cls, **kwargs): parser = super().build_cmdline_parser(**kwargs) parser.add_argument( "--save-media", action="store_true", help="Save media (default: False)", ) parser.add_argument( "--image-ext", default=None, help="Image extension (default: keep or use format default%s)" % (" " + cls.DEFAULT_IMAGE_EXT if cls.DEFAULT_IMAGE_EXT else ""), ) parser.add_argument( "--save-dataset-meta", action="store_true", help="Save dataset meta file (default: %(default)s)", ) return parser
[docs] @classmethod def convert(cls, extractor, save_dir, **options): converter = cls(extractor, save_dir, **options) return converter.apply()
[docs] @classmethod @scoped def patch(cls, dataset, patch, save_dir, **options): # This solution is not any better in performance than just # writing a dataset, but in case of patching (i.e. writing # to the previous location), it allows to avoid many problems # with removing and replacing existing files. Surely, this # approach also has problems with removal of the given directory. # Problems can occur if we can't remove the directory, # or want to reuse the given directory. It can happen if it # is mounted or (sym-)linked. # Probably, a better solution could be to wipe directory # contents and write new data there. Note that directly doing this # also doesn't work, because images may be needed for writing. if not osp.isdir(save_dir): return cls.convert(dataset, save_dir, **options) tmpdir = mkdtemp(dir=osp.dirname(save_dir), prefix=osp.basename(save_dir), suffix=".tmp") on_error_do(rmtree, tmpdir, ignore_errors=True) shutil.copymode(save_dir, tmpdir) retval = cls.convert(dataset, tmpdir, **options) rmtree(save_dir) os.replace(tmpdir, save_dir) return retval
[docs] def apply(self): """Execute the data-format conversion""" if self._save_hashkey_meta: self._save_hashkey_file(self._save_dir) return self._apply_impl()
def _apply_impl(self): raise NotImplementedError("Should be implemented in a subclass") def __init__( self, extractor: IDataset, save_dir: str, *, save_media: Optional[bool] = None, image_ext: Optional[str] = None, default_image_ext: Optional[str] = None, save_dataset_meta: bool = False, save_hashkey_meta: bool = False, stream: bool = False, ctx: Optional[ExportContext] = None, ): default_image_ext = default_image_ext or self.DEFAULT_IMAGE_EXT assert default_image_ext self._default_image_ext = default_image_ext self._save_media = save_media self._image_ext = image_ext self._extractor = extractor self._save_dir = save_dir self._save_dataset_meta = save_dataset_meta self._save_hashkey_meta = save_hashkey_meta # TODO: refactor this variable. # Can be used by a subclass to store the current patch info from datumaro.components.dataset import DatasetPatch if isinstance(extractor, DatasetPatch.DatasetPatchWrapper): self._patch = extractor.patch else: self._patch = None if stream and not self.can_stream: raise DatasetExportError( f"{self.__class__.__name__} cannot export a dataset in a stream manner" ) self._stream = stream self._ctx: ExportContext = ctx or NullExportContext() def _find_image_ext(self, item: Union[DatasetItem, Image]): src_ext = None if isinstance(item, DatasetItem) and isinstance(item.media, Image): src_ext = item.media.ext elif isinstance(item, Image): src_ext = item.ext return self._image_ext or src_ext or self._default_image_ext def _make_item_filename(self, item, *, name=None, subdir=None): name = name or item.id subdir = subdir or "" return osp.join(subdir, name) def _make_image_filename(self, item, *, name=None, subdir=None): return self._make_item_filename(item, name=name, subdir=subdir) + self._find_image_ext(item) def _make_pcd_filename(self, item, *, name=None, subdir=None): return self._make_item_filename(item, name=name, subdir=subdir) + ".pcd" def _save_image( self, item, path=None, *, name=None, subdir=None, basedir=None, crypter: Crypter = NULL_CRYPTER, ): assert not ( (subdir or name or basedir) and path ), "Can't use both subdir or name or basedir and path arguments" if not isinstance(item.media, Image) or not item.media.has_data: log.warning("Item '%s' has no image", item.id) return basedir = basedir or self._save_dir path = path or osp.join(basedir, self._make_image_filename(item, name=name, subdir=subdir)) path = osp.abspath(path) item.media.save(path, crypter=crypter) def _save_point_cloud(self, item=None, path=None, *, name=None, subdir=None, basedir=None): assert not ( (subdir or name or basedir) and path ), "Can't use both subdir or name or basedir and path arguments" if not item.media or not isinstance(item.media, PointCloud): log.warning("Item '%s' has no pcd", item.id) return basedir = basedir or self._save_dir path = path or osp.join(basedir, self._make_pcd_filename(item, name=name, subdir=subdir)) path = osp.abspath(path) os.makedirs(osp.dirname(path), exist_ok=True) item.media.save(path, crypter=NULL_CRYPTER) def _save_meta_file(self, path): save_meta_file(path, self._extractor.categories()) def _save_hashkey_file(self, path): save_hashkey_file(path, self._extractor) def _check_hash_key_existence(self, item): if self._save_hashkey_meta: return for annotation in item.annotations: if isinstance(annotation, HashKey): self._save_hashkey_meta = True return @property def can_stream(self) -> bool: """Flag to indicate whether the exporter can export the dataset in a stream manner or not.""" return False
# TODO: Currently, ExportContextComponent is introduced only for Datumaro and DatumaroBinary format # for multi-processing. We need to propagate this to everywhere in Datumaro 1.2.0
[docs] class ExportContextComponent: def __init__( self, save_dir: str, save_media: bool, images_dir: str, pcd_dir: str, video_dir: str, crypter: Crypter = NULL_CRYPTER, image_ext: Optional[str] = None, default_image_ext: Optional[str] = None, source_path: Optional[str] = None, ): self._save_dir = save_dir self._save_media = save_media self._images_dir = images_dir self._pcd_dir = pcd_dir self._video_dir = video_dir self._crypter = crypter self._image_ext = image_ext self._default_image_ext = default_image_ext self._source_path = source_path
[docs] def find_image_ext(self, item: Union[DatasetItem, Image]): src_ext = None if isinstance(item, DatasetItem) and isinstance(item.media, Image): src_ext = item.media.ext elif isinstance(item, Image): src_ext = item.ext return self._image_ext or src_ext or self._default_image_ext
def _make_item_filename(self, item, *, name=None, subdir=None): name = name or item.id subdir = subdir or "" return osp.join(subdir, name)
[docs] def make_image_filename(self, item, *, name=None, subdir=None): return self._make_item_filename(item, name=name, subdir=subdir) + self.find_image_ext(item)
[docs] def make_pcd_filename(self, item, *, name=None, subdir=None): return self._make_item_filename(item, name=name, subdir=subdir) + ".pcd"
[docs] def make_pcd_extra_image_filename(self, item, idx, image, *, name=None, subdir=None): return self._make_item_filename( item, name=name if name else f"{item.id}/extra_image_{idx}", subdir=subdir ) + self.find_image_ext(image)
[docs] def make_video_filename(self, item, *, name=None): STR_WRONG_MEDIA_TYPE = "Video item's media type should be Video or VideoFrame" assert isinstance(item, DatasetItem), STR_WRONG_MEDIA_TYPE if isinstance(item.media, VideoFrame): video_file_name = osp.basename(item.media.video.path) elif isinstance(item.media, Video): video_file_name = osp.basename(item.media.path) else: assert False, STR_WRONG_MEDIA_TYPE return video_file_name
[docs] def save_image( self, item: DatasetItem, *, encryption: bool = False, basedir: Optional[str] = None, subdir: Optional[str] = None, fname: Optional[str] = None, ): if not isinstance(item.media, Image) or not item.media.has_data: log.warning("Item '%s' has no image", item.id) return basedir = self._images_dir if basedir is None else basedir basedir = osp.join(basedir, subdir) if subdir is not None else basedir fname = self.make_image_filename(item) if fname is None else fname path = osp.join(basedir, fname) path = osp.abspath(path) os.makedirs(osp.dirname(path), exist_ok=True) item.media.save(path, crypter=self._crypter if encryption else NULL_CRYPTER)
[docs] def save_point_cloud( self, item: DatasetItem, *, basedir: Optional[str] = None, subdir: Optional[str] = None, fname: Optional[str] = None, ): if not item.media or not isinstance(item.media, PointCloud): log.warning("Item '%s' has no pcd", item.id) return basedir = self._pcd_dir if basedir is None else basedir basedir = osp.join(basedir, subdir) if subdir is not None else basedir fname = self.make_pcd_filename(item) if fname is None else fname path = osp.join(basedir, fname) path = osp.abspath(path) os.makedirs(osp.dirname(path), exist_ok=True) def helper(i, image): basedir = self._images_dir basedir = osp.join(basedir, subdir) if subdir is not None else basedir return {"fp": osp.join(basedir, self.make_pcd_extra_image_filename(item, i, image))} item.media.save(path, helper, crypter=NULL_CRYPTER)
[docs] def save_video( self, item: DatasetItem, *, basedir: Optional[str] = None, subdir: Optional[str] = None, fname: Optional[str] = None, ): if not item.media or not isinstance(item.media, (Video, VideoFrame)): log.warning("Item '%s' has no video", item.id) return basedir = self._video_dir if basedir is None else basedir basedir = osp.join(basedir, subdir) if subdir is not None else basedir fname = self.make_video_filename(item) if fname is None else fname path = osp.join(basedir, fname) path = osp.abspath(path) # To prevent the video from being overwritten # (A video can have same path but different start/end frames) if not osp.exists(path): os.makedirs(osp.dirname(path), exist_ok=True) if isinstance(item.media, VideoFrame): item.media.video.save(path, crypter=NULL_CRYPTER) else: # Video item.media.save(path, crypter=NULL_CRYPTER)
@property def images_dir(self) -> str: return self._images_dir @property def pcd_dir(self) -> str: return self._pcd_dir @property def save_dir(self) -> str: return self._save_dir @property def save_media(self) -> bool: return self._save_media @property def crypter(self) -> Crypter: return self._crypter @property def source_path(self) -> str: return self._source_path if self._source_path else ""