Source code for datumaro.plugins.data_formats.coco.exporter

# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: MIT

import json
import logging as log
import os
import os.path as osp
from dataclasses import dataclass
from enum import Enum, auto
from io import BufferedWriter
from itertools import chain, groupby
from typing import Dict, List, Optional, Type, Union

import pycocotools.mask as mask_utils
from json_stream.writer import streamable_dict, streamable_list

import datumaro.util.annotation_util as anno_tools
import datumaro.util.mask_tools as mask_tools
from datumaro.components.annotation import (
from datumaro.components.dataset_base import DatasetItem
from datumaro.components.dataset_item_storage import ItemStatus
from datumaro.components.errors import MediaTypeError
from datumaro.components.exporter import Exporter
from import Image
from datumaro.util import cast, dump_json, dump_json_file, find, parse_json, str_to_bool
from datumaro.util.image import save_image

from .format import CocoPath, CocoTask

[docs] class SegmentationMode(Enum): guess = auto() polygons = auto() mask = auto()
@dataclass class _Writer: fp: BufferedWriter is_empty: bool
[docs] class TemporaryWriters: def __init__(self, subset: str, task: CocoTask, ann_dir: str): self._subset = subset self._task = task self._ann_dir = ann_dir self._writers = tuple() self.reset()
[docs] def close(self): for writer in self._writers: if not writer.fp.closed: writer.fp.close()
[docs] def remove(self): self.close() for writer in self._writers: fpath = if osp.exists(fpath): os.remove(fpath)
[docs] def reset(self): self.remove() self._writers = tuple( _Writer( fp=open( osp.join(self._ann_dir, f"__{}_{self._subset}_{key}.tmp"), "wb" ), is_empty=True, ) for key in ["imgs", "anns"] )
def __del__(self): self.remove() @property def imgs(self) -> _Writer: return self._writers[0] @property def anns(self) -> _Writer: return self._writers[1]
[docs] def add_item(self, data: Dict) -> None: self.imgs.is_empty = False writer = self.imgs.fp writer.write(dump_json(data, append_newline=True))
[docs] def add_anns(self, data: Dict) -> None: self.anns.is_empty = False writer = self.anns.fp writer.write(dump_json(data, append_newline=True))
[docs] def merge(self, path: str, header: Dict, min_ann_id: Optional[int]) -> None: self.close() @streamable_list def _gen_images(): with open(, "rb") as fp: for line in fp: yield parse_json(line) @streamable_list def _gen_anns(): with open(, "rb") as fp: next_id = min_ann_id for line in fp: ann = parse_json(line) if min_ann_id is not None and not ann["id"]: ann["id"] = next_id next_id += 1 yield ann @streamable_dict def _gen(): yield "licenses", header["licenses"] yield "info", header["info"] yield "categories", header["categories"] if not self.imgs.is_empty: yield "images", _gen_images() else: yield "images", [] if not self.anns.is_empty: yield "annotations", _gen_anns() else: yield "annotations", [] with open(path, "w", encoding="utf-8") as fp: json.dump(_gen(), fp) self.remove()
class _TaskExporter: def __init__( self, context: "CocoExporter", subset: str, task: CocoTask, ann_dir: str, stream: bool ): self._min_ann_id = 1 self._context = context self._subset = subset self._task = task self._ann_dir = ann_dir self._stream = stream data = {"licenses": [], "info": {}, "categories": [], "images": [], "annotations": []} data["licenses"].append({"name": "", "id": 0, "url": ""}) data["info"] = { "contributor": "", "date_created": "", "description": "", "url": "", "version": "", "year": "", } self._data = data self._temporary_writers = TemporaryWriters( subset=subset, task=task, ann_dir=ann_dir, ) def is_empty(self): return ( len(self._data["annotations"]) == 0 if not self._stream else self._temporary_writers.anns.is_empty ) def _get_image_id(self, item): return self._context._get_image_id(item) def save_image_info(self, item, filename): h = 0 w = 0 if and h, w = item_desc = { "id": self._get_image_id(item), "width": int(w), "height": int(h), "file_name": cast(filename, str, ""), "license": 0, "flickr_url": "", "coco_url": "", "date_captured": 0, } if not self._stream: self._data["images"].append(item_desc) else: self._temporary_writers.add_item(item_desc) def save_categories(self, dataset): raise NotImplementedError() def save_annotations(self, item): raise NotImplementedError() def write(self, path): next_id = self._min_ann_id for ann in self.annotations: if not ann["id"]: ann["id"] = next_id next_id += 1 dump_json_file(path, self._data) if self._stream: self._temporary_writers.merge(path, self._data, self._min_ann_id) @property def annotations(self): return self._data["annotations"] @property def infos(self): return self._data["info"] @property def categories(self): return self._data["categories"] def _get_ann_id(self, annotation): ann_id = 0 if self._context._reindex else if ann_id: self._min_ann_id = max(ann_id, self._min_ann_id) return ann_id @staticmethod def _convert_attributes(ann): return {k: v for k, v in ann.attributes.items() if k not in {"is_crowd", "score"}} class _ImageInfoExporter(_TaskExporter): def is_empty(self): return ( len(self._data["images"]) == 0 if not self._stream else self._temporary_writers.imgs.is_empty ) def save_categories(self, dataset): pass def save_annotations(self, item): pass class _CaptionsExporter(_TaskExporter): def save_categories(self, dataset): pass def save_annotations(self, item): for ann_idx, ann in enumerate(item.annotations): if ann.type != AnnotationType.caption: continue elem = { "id": self._get_ann_id(ann), "image_id": self._get_image_id(item), "category_id": 0, # NOTE: workaround for a bug in cocoapi "caption": ann.caption, } if "score" in ann.attributes: try: elem["score"] = float(ann.attributes["score"]) except Exception as e: log.warning( "Item '%s', ann #%s: failed to convert " "attribute 'score': %e" % (, ann_idx, e) ) if self._context._allow_attributes: attrs = self._convert_attributes(ann) if attrs: elem["attributes"] = attrs if not self._stream: self.annotations.append(elem) else: self._temporary_writers.add_anns(elem) class _InstancesExporter(_TaskExporter): _polygon_types = {AnnotationType.polygon, AnnotationType.ellipse} _allowed_types = { AnnotationType.bbox, AnnotationType.polygon, AnnotationType.mask, AnnotationType.ellipse, } def save_categories(self, dataset): label_categories = dataset.categories().get(AnnotationType.label) if label_categories is None: return for idx, cat in enumerate(label_categories.items): self.categories.append( { "id": 1 + idx, "name": cast(, str, ""), "supercategory": cast(cat.parent, str, ""), } ) @classmethod def crop_segments(cls, instances, img_width, img_height): instances = sorted(instances, key=lambda x: x[0].z_order) segment_map = [] segments = [] for inst_idx, (_, polygons, mask, _) in enumerate(instances): if polygons: segment_map.extend(inst_idx for p in polygons) segments.extend(polygons) elif mask is not None: segment_map.append(inst_idx) segments.append(mask) segments = mask_tools.crop_covered_segments(segments, img_width, img_height) for inst_idx, inst in enumerate(instances): new_segments = [s for si_id, s in zip(segment_map, segments) if si_id == inst_idx] if not new_segments: inst[1] = [] inst[2] = None continue if inst[1]: inst[1] = sum(new_segments, []) else: mask = mask_tools.merge_masks(new_segments) inst[2] = mask_tools.mask_to_rle(mask) return instances def find_instance_parts(self, group, img_width, img_height): boxes = [a for a in group if a.type == AnnotationType.bbox] polygons: List[Union[Polygon, Ellipse]] = [ a for a in group if a.type in self._polygon_types ] masks = [a for a in group if a.type == AnnotationType.mask] anns = boxes + polygons + masks leader = anno_tools.find_group_leader(anns) if len(boxes) > 0: bbox = anno_tools.max_bbox(boxes) else: bbox = anno_tools.max_bbox(anns) mask = None polygons = [p.as_polygon() for p in polygons] if self._context._segmentation_mode == SegmentationMode.guess: use_masks = True is leader.attributes.get( "is_crowd", find(masks, lambda x: x.label == leader.label) is not None ) elif self._context._segmentation_mode == SegmentationMode.polygons: use_masks = False elif self._context._segmentation_mode == SegmentationMode.mask: use_masks = True else: raise NotImplementedError( "Unexpected segmentation mode '%s'" % self._context._segmentation_mode ) if use_masks: if polygons and img_width > 0 and img_height > 0: mask = mask_tools.rles_to_mask(polygons, img_width, img_height) if masks: masks = (m.image for m in masks) if mask is not None: masks = chain(masks, [mask]) mask = mask_tools.merge_masks(masks) if mask is not None: mask = mask_tools.mask_to_rle(mask) polygons = [] else: if masks: mask = mask_tools.merge_masks(m.image for m in masks) polygons += mask_tools.mask_to_polygons(mask) mask = None return [leader, polygons, mask, bbox] @staticmethod def find_instance_anns(annotations): return [a for a in annotations if a.type in _InstancesExporter._allowed_types] @classmethod def find_instances(cls, annotations): return anno_tools.find_instances(cls.find_instance_anns(annotations)) def save_annotations(self, item): instances = self.find_instances(item.annotations) if not instances: return if not or not h, w = 0, 0 log.warning( "Item '%s': Mask annotations can be skipped since no image info available" % ) else: h, w = instances = [self.find_instance_parts(i, w, h) for i in instances] if self._context._crop_covered and w > 0 and h > 0: instances = self.crop_segments(instances, w, h) for instance in instances: elem = self.convert_instance(instance, item) if elem is None: continue if not self._stream: self.annotations.append(elem) else: self._temporary_writers.add_anns(elem) def convert_instance(self, instance, item) -> Optional[Dict]: ann, polygons, mask, bbox = instance is_crowd = mask is not None if is_crowd: segmentation = { "counts": list(int(c) for c in mask["counts"]), "size": list(int(c) for c in mask["size"]), } else: segmentation = [list(map(float, p)) for p in polygons] area = 0 if segmentation: if and h, w = else: # Here we can guess the image size as # it is only needed for the area computation w = bbox[0] + bbox[2] h = bbox[1] + bbox[3] rles = mask_utils.frPyObjects(segmentation, h, w) if is_crowd: rles = [rles] else: rles = mask_utils.merge(rles) area = mask_utils.area(rles) else: _, _, w, h = bbox segmentation = [] area = w * h elem = { "id": self._get_ann_id(ann), "image_id": self._get_image_id(item), "category_id": cast(ann.label, int, -1) + 1, "segmentation": segmentation, "area": float(area), "bbox": [round(float(n), COORDINATE_ROUNDING_DIGITS) for n in bbox], "iscrowd": int(is_crowd), } if "score" in ann.attributes: try: elem["score"] = float(ann.attributes["score"]) except Exception as e: log.warning("Item '%s': failed to convert attribute " "'score': %e" % (, e)) if self._context._allow_attributes: attrs = self._convert_attributes(ann) if attrs: elem["attributes"] = attrs return elem class _KeypointsExporter(_InstancesExporter): def save_categories(self, dataset): label_categories = dataset.categories().get(AnnotationType.label) if label_categories is None: return point_categories = dataset.categories().get(AnnotationType.points) for idx, label_cat in enumerate(label_categories.items): cat = { "id": 1 + idx, "name": cast(, str, ""), "supercategory": cast(label_cat.parent, str, ""), "keypoints": [], "skeleton": [], } if point_categories is not None: kp_cat = point_categories.items.get(idx) if kp_cat is not None: cat.update( { "keypoints": [str(l) for l in kp_cat.labels], "skeleton": [list(map(int, j)) for j in kp_cat.joints], } ) self.categories.append(cat) def save_annotations(self, item): point_annotations = [a for a in item.annotations if a.type == AnnotationType.points] if not point_annotations: return # Create annotations for solitary keypoints annotations for points in self.find_solitary_points(item.annotations): instance = [points, [], None, points.get_bbox()] elem = super().convert_instance(instance, item) elem.update(self.convert_points_object(points)) if not self._stream: self.annotations.append(elem) else: self._temporary_writers.add_anns(elem) # Create annotations for complete instance + keypoints annotations super().save_annotations(item) @classmethod def find_solitary_points(cls, annotations): annotations = sorted(annotations, key=lambda a: solitary_points = [] for g_id, group in groupby(annotations, lambda a: if not g_id or g_id and not cls.find_instance_anns(group): group = [a for a in group if a.type == AnnotationType.points] solitary_points.extend(group) return solitary_points @staticmethod def convert_points_object(ann): keypoints = [] points = ann.points visibility = ann.visibility for index in range(0, len(points), 2): kp = points[index : index + 2] state = visibility[index // 2].value keypoints.extend([*kp, state]) num_annotated = len([v for v in visibility if v != Points.Visibility.absent]) return { "keypoints": keypoints, "num_keypoints": num_annotated, } def convert_instance(self, instance, item) -> Optional[Dict]: points_ann = find( item.annotations, lambda x: x.type == AnnotationType.points and instance[0].group and == instance[0].group, ) if not points_ann: return None elem = super().convert_instance(instance, item) elem.update(self.convert_points_object(points_ann)) return elem class _LabelsExporter(_TaskExporter): def save_categories(self, dataset): label_categories = dataset.categories().get(AnnotationType.label) if label_categories is None: return for idx, cat in enumerate(label_categories.items): self.categories.append( { "id": 1 + idx, "name": cast(, str, ""), "supercategory": cast(cat.parent, str, ""), } ) def save_annotations(self, item): for ann in item.annotations: if ann.type != AnnotationType.label: continue elem = { "id": self._get_ann_id(ann), "image_id": self._get_image_id(item), "category_id": int(ann.label) + 1, } if "score" in ann.attributes: try: elem["score"] = float(ann.attributes["score"]) except Exception as e: log.warning( "Item '%s': failed to convert attribute " "'score': %e" % (, e) ) if self._context._allow_attributes: attrs = self._convert_attributes(ann) if attrs: elem["attributes"] = attrs if not self._stream: self.annotations.append(elem) else: self._temporary_writers.add_anns(elem) class _StuffExporter(_InstancesExporter): pass class _PanopticExporter(_TaskExporter): def write(self, path): dump_json_file(path, self._data) if self._stream: self._temporary_writers.merge(path, self._data, None) def save_categories(self, dataset): label_categories = dataset.categories().get(AnnotationType.label) if label_categories is None: return for idx, cat in enumerate(label_categories.items): self.categories.append( { "id": 1 + idx, "name": cast(, str, ""), "supercategory": cast(cat.parent, str, ""), "isthing": 0, # TODO: can't represent this information yet } ) def save_annotations(self, item): if not return ann_filename = + CocoPath.PANOPTIC_EXT segments_info = list() masks = [] next_id = self._min_ann_id for ann in item.annotations: if ann.type != AnnotationType.mask: continue if not = next_id next_id += 1 segment_info = {} segment_info["id"] = segment_info["category_id"] = cast(ann.label, int, -1) + 1 segment_info["area"] = float(ann.get_area()) segment_info["bbox"] = [float(p) for p in ann.get_bbox()] segment_info["iscrowd"] = cast(ann.attributes.get("is_crowd"), int, 0) segments_info.append(segment_info) masks.append(ann) if not masks: return pan_format = mask_tools.merge_masks((m.image, for m in masks) save_image( osp.join(self._context._segmentation_dir, ann_filename), mask_tools.index2bgr(pan_format), create_dir=True, ) elem = { "image_id": self._get_image_id(item), "file_name": ann_filename, "segments_info": segments_info, } if not self._stream: self.annotations.append(elem) else: self._temporary_writers.add_anns(elem)
[docs] class CocoExporter(Exporter): @staticmethod def _split_tasks_string(s): return [CocoTask[i.strip()] for i in s.split(",")]
[docs] @classmethod def build_cmdline_parser(cls, **kwargs): kwargs[ "description" ] = """ Segmentation mask modes ('--segmentation-mode'):|n - '{}': guess the mode for each instance,|n |s|suse 'is_crowd' attribute as hint|n - '{}': save polygons,|n |s|smerge and convert masks, prefer polygons|n - '{}': save masks,|n |s|smerge and convert polygons, prefer masks|n |n The '--reindex' option allows to control if the images and annotations must be given new indices. It can be useful, when you want to preserve the original indices in the produced dataset. Consider having this option enabled when converting from other formats.|n |n The '--allow-attributes' parameter enables or disables writing the custom annotation attributes to the "attributes" annotation field. This field is an extension to the original COCO format.|n |n The '--merge-images' parameter controls the output directory for images. When enabled, the dataset images are saved into a single directory, otherwise they are saved in separate directories by subsets. """.format( sm=SegmentationMode ) parser = super().build_cmdline_parser(**kwargs) parser.add_argument( "--segmentation-mode", choices=[ for m in SegmentationMode],, help="Save mode for instance segmentation (default: %(default)s)", ) parser.add_argument( "--crop-covered", action="store_true", help="Crop covered segments so that background objects' " "segmentation was more accurate (default: %(default)s)", ) parser.add_argument( "--allow-attributes", type=str_to_bool, default=True, help="Allow export of attributes (default: %(default)s)", ) parser.add_argument( "--reindex", type=str_to_bool, default=True, help="Assign new indices to images and annotations, " "useful to avoid merge conflicts (default: %(default)s)", ) parser.add_argument( "--merge-images", type=str_to_bool, default=False, help="Save all images into a single " "directory (default: %(default)s)", ) parser.add_argument( "--tasks", type=cls._split_tasks_string, help="COCO task filter, comma-separated list of {%s} " "(default: all)" % ", ".join( for t in CocoTask), ) return parser
DEFAULT_IMAGE_EXT = CocoPath.IMAGE_EXT _TASK_CONVERTER: Dict[CocoTask, Type[_TaskExporter]] = { CocoTask.image_info: _ImageInfoExporter, CocoTask.instances: _InstancesExporter, CocoTask.person_keypoints: _KeypointsExporter, CocoTask.captions: _CaptionsExporter, CocoTask.labels: _LabelsExporter, CocoTask.panoptic: _PanopticExporter, CocoTask.stuff: _StuffExporter, } def __init__( self, extractor, save_dir, tasks=None, segmentation_mode=None, crop_covered=False, allow_attributes=True, reindex=False, merge_images=False, stream: bool = False, **kwargs, ): super().__init__(extractor, save_dir, stream=stream, **kwargs) assert tasks is None or isinstance(tasks, (CocoTask, list, str)) if isinstance(tasks, CocoTask): tasks = [tasks] elif isinstance(tasks, str): tasks = [CocoTask[tasks]] elif tasks: for i, t in enumerate(tasks): if isinstance(t, str): tasks[i] = CocoTask[t] else: assert t in CocoTask, t else: tasks = set() self._tasks = tasks assert ( segmentation_mode is None or isinstance(segmentation_mode, str) or segmentation_mode in SegmentationMode ) if segmentation_mode is None: segmentation_mode = SegmentationMode.guess if isinstance(segmentation_mode, str): segmentation_mode = SegmentationMode[segmentation_mode] self._segmentation_mode = segmentation_mode self._crop_covered = crop_covered self._allow_attributes = allow_attributes self._reindex = reindex self._merge_images = merge_images self._image_ids = {} self._patch = None def _make_dirs(self): self._images_dir = osp.join(self._save_dir, CocoPath.IMAGES_DIR) os.makedirs(self._images_dir, exist_ok=True) self._ann_dir = osp.join(self._save_dir, CocoPath.ANNOTATIONS_DIR) os.makedirs(self._ann_dir, exist_ok=True) def _make_segmentation_dir(self, subset_name): self._segmentation_dir = osp.join( self._save_dir, CocoPath.ANNOTATIONS_DIR, "panoptic_" + subset_name ) os.makedirs(self._segmentation_dir, exist_ok=True) def _make_task_converter(self, task: CocoTask, subset: str) -> _TaskExporter: if task not in self._TASK_CONVERTER: raise NotImplementedError() return self._TASK_CONVERTER[task]( context=self, subset=subset, task=task, ann_dir=self._ann_dir, stream=self._stream, ) def _make_task_converters(self, subset: str): return { task: self._make_task_converter(task, subset) for task in (self._tasks or self._TASK_CONVERTER) } def _get_image_id(self, item): image_id = self._image_ids.get( if image_id is None: if not self._reindex: image_id = cast(item.attributes.get("id"), int, len(self._image_ids) + 1) else: image_id = len(self._image_ids) + 1 self._image_ids[] = image_id return image_id def _apply_impl(self): if self._extractor.media_type() and not issubclass(self._extractor.media_type(), Image): raise MediaTypeError("Media type is not an image") self._make_dirs() if self._save_dataset_meta: self._save_meta_file(self._save_dir) subsets = self._extractor.subsets() pbars = self._ctx.progress_reporter.split(len(subsets)) for pbar, (subset_name, subset) in zip(pbars, subsets.items()): task_converters = self._make_task_converters(subset_name) for task_conv in task_converters.values(): task_conv.save_categories(subset) if CocoTask.panoptic in task_converters: self._make_segmentation_dir(subset_name) for item in pbar.iter(subset, desc=f"Exporting '{subset_name}'"): try: if self._save_media: if self._save_image( item, subdir=osp.join( self._images_dir, "" if self._merge_images else subset_name ), ) else: log.debug("Item '%s' has no image info", for task_conv in task_converters.values(): try: task_conv.save_image_info(item, self._make_image_filename(item)) task_conv.save_annotations(item) except Exception as e: self._report_annotation_error(e, item_id=(, item.subset)) except Exception as e: self._ctx.error_policy.report_item_error(e, item_id=(, item.subset)) for task, task_conv in task_converters.items(): ann_file = osp.join(self._ann_dir, "%s_%s.json" % (, subset_name)) if task_conv.is_empty() and (not self._tasks or self._patch): if task == CocoTask.panoptic: os.rmdir(self._segmentation_dir) if self._patch: if osp.isfile(ann_file): # Remove subsets that became empty os.remove(ann_file) continue task_conv.write(ann_file)
[docs] @classmethod def patch(cls, dataset, patch, save_dir, **kwargs): for subset in patch.updated_subsets: conv = cls(dataset.get_subset(subset), save_dir=save_dir, **kwargs) conv._patch = patch conv.apply() conv = cls(dataset, save_dir=save_dir, **kwargs) images_dir = osp.join(save_dir, CocoPath.IMAGES_DIR) for (item_id, subset), status in patch.updated_items.items(): if status != ItemStatus.removed: item =, subset) else: item = DatasetItem(item_id, subset=subset) if not (status == ItemStatus.removed or not continue # Exporter supports saving in separate dirs and common image dir image_path = osp.join(images_dir, conv._make_image_filename(item)) if osp.isfile(image_path): os.unlink(image_path) image_path = osp.join(images_dir, subset, conv._make_image_filename(item)) if osp.isfile(image_path): os.unlink(image_path)
@property def can_stream(self) -> bool: return True
[docs] class CocoInstancesExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.instances super().__init__(*args, **kwargs)
[docs] class CocoImageInfoExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.image_info super().__init__(*args, **kwargs)
[docs] class CocoPersonKeypointsExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.person_keypoints super().__init__(*args, **kwargs)
[docs] class CocoCaptionsExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.captions super().__init__(*args, **kwargs)
[docs] class CocoLabelsExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.labels super().__init__(*args, **kwargs)
[docs] class CocoPanopticExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.panoptic super().__init__(*args, **kwargs)
[docs] class CocoStuffExporter(CocoExporter): def __init__(self, *args, **kwargs): kwargs["tasks"] = CocoTask.stuff kwargs["segmentation_mode"] = SegmentationMode.mask super().__init__(*args, **kwargs)