Source code for datumaro.plugins.data_formats.mpii.mpii_json

# Copyright (C) 2022-2023 Intel Corporation
#
# SPDX-License-Identifier: MIT

import errno
import os.path as osp
from typing import List, Optional

import numpy as np

from datumaro.components.annotation import (
    AnnotationType,
    Bbox,
    LabelCategories,
    Points,
    PointsCategories,
)
from datumaro.components.dataset_base import DatasetItem, SubsetBase
from datumaro.components.format_detection import FormatDetectionContext
from datumaro.components.importer import ImportContext, Importer
from datumaro.components.media import Image
from datumaro.util import parse_json_file

from .format import MPII_POINTS_JOINTS, MPII_POINTS_LABELS


[docs] class MpiiJsonPath: ANNOTATION_FILE = "mpii_annotations.json" HEADBOXES_FILE = "mpii_headboxes.npy" VISIBILITY_FILE = "jnt_visible.npy" POS_GT_FILE = "mpii_pos_gt.npy"
[docs] class MpiiJsonBase(SubsetBase): def __init__( self, path: str, *, subset: Optional[str] = None, ctx: Optional[ImportContext] = None, ): if not osp.isfile(path): raise FileNotFoundError(errno.ENOENT, "Can't find annotations file", path) super().__init__(subset=subset, ctx=ctx) self._categories = { AnnotationType.label: LabelCategories.from_iterable(["human"]), AnnotationType.points: PointsCategories.from_iterable( [(0, MPII_POINTS_LABELS, MPII_POINTS_JOINTS)] ), } self._items = list(self._load_items(path).values()) def _load_items(self, path): items = {} root_dir = osp.dirname(path) hb_path = osp.join(root_dir, MpiiJsonPath.HEADBOXES_FILE) if osp.isfile(hb_path): headboxes = np.load(hb_path) else: headboxes = np.array([[[]]]) vis_path = osp.join(root_dir, MpiiJsonPath.VISIBILITY_FILE) if osp.isfile(vis_path): visibility = np.load(vis_path).T else: visibility = np.array([]) pos_gt_path = osp.join(root_dir, MpiiJsonPath.POS_GT_FILE) if osp.isfile(pos_gt_path): gt_pose = np.transpose(np.load(pos_gt_path), (2, 0, 1)) else: gt_pose = np.array([]) for i, ann in enumerate(parse_json_file(path)): item_id = osp.splitext(ann.get("img_paths", ""))[0] center = ann.get("objpos", []) scale = float(ann.get("scale_provided", 0)) if i < gt_pose.shape[0]: points = gt_pose[i].ravel() if i < visibility.shape[0]: vis = visibility[i] else: vis = np.ones(len(points) // 2, dtype=np.int8) else: keypoints = np.array(ann.get("joint_self", [])) points = keypoints[:, 0:2].ravel() vis = keypoints[:, 2] if i < visibility.shape[0]: vis = visibility[i] vis = [int(val) for val in vis] group_num = 1 annotations = [ Points( points, vis, label=0, group=group_num, attributes={"center": center, "scale": scale}, ) ] if i < headboxes.shape[2]: bbox = headboxes[:, :, i] annotations.append( Bbox( bbox[0][0], bbox[0][1], bbox[1][0] - bbox[0][0], bbox[1][1] - bbox[0][1], label=0, group=group_num, ) ) group_num += 1 joint_others = ann.get("joint_others") if joint_others: num_others = int(ann.get("numOtherPeople", 1)) center = ann.get("objpos_other", []) scale = ann.get("scale_provided_other", 0) if num_others == 1: center = [center] scale = [scale] joint_others = [joint_others] for i in range(num_others): keypoints = np.array(joint_others[i]) points = keypoints[:, 0:2].ravel() vis = keypoints[:, 2] vis = [int(val) for val in vis] attributes = {} if i < len(center): attributes["center"] = center[i] if i < len(scale): attributes["scale"] = scale[i] annotations.append( Points(points, vis, label=0, group=group_num, attributes=attributes) ) group_num += 1 items[item_id] = DatasetItem( id=item_id, subset=self._subset, media=Image.from_file(path=osp.join(root_dir, ann.get("img_paths", ""))), annotations=annotations, ) for ann in annotations: self._ann_types.add(ann.type) return items
[docs] class MpiiJsonImporter(Importer):
[docs] @classmethod def find_sources(cls, path): return cls._find_sources_recursive(path, ".json", "mpii_json")
[docs] @classmethod def detect(cls, context: FormatDetectionContext) -> None: context.require_file(MpiiJsonPath.ANNOTATION_FILE)
[docs] @classmethod def get_file_extensions(cls) -> List[str]: return [osp.splitext(MpiiJsonPath.ANNOTATION_FILE)[1]]