Source code for datumaro.components.annotations.matcher

# Copyright (C) 2023 Intel Corporation
#
# SPDX-License-Identifier: MIT

from typing import List, Optional, Union

import numpy as np
from attr import attrib, attrs

from datumaro.components.abstracts import IMergerContext
from datumaro.components.abstracts.merger import IMatcherContext
from datumaro.components.annotation import Annotation, Points
from datumaro.util.annotation_util import (
    OKS,
    approximate_line,
    bbox_iou,
    max_bbox,
    mean_bbox,
    segment_iou,
)

__all__ = [
    "match_segments_pair",
    "match_segments_more_than_pair",
    "AnnotationMatcher",
    "LabelMatcher",
    "ShapeMatcher",
    "BboxMatcher",
    "PolygonMatcher",
    "MaskMatcher",
    "PointsMatcher",
    "LineMatcher",
    "CaptionsMatcher",
    "Cuboid3dMatcher",
    "ImageAnnotationMatcher",
    "HashKeyMatcher",
    "FeatureVectorMatcher",
    "Cuboid2DMatcher",
]


[docs] def match_segments_pair( a_segms, b_segms, distance=segment_iou, dist_thresh=1.0, label_matcher=lambda a, b: a.label == b.label, ): """Match segments and return pairs of the two matched segments""" assert callable(distance), distance assert callable(label_matcher), label_matcher a_segms.sort(key=lambda ann: 1 - ann.attributes.get("score", 1)) b_segms.sort(key=lambda ann: 1 - ann.attributes.get("score", 1)) # a_matches: indices of b_segms matched to a bboxes # b_matches: indices of a_segms matched to b bboxes a_matches = -np.ones(len(a_segms), dtype=int) b_matches = -np.ones(len(b_segms), dtype=int) distances = np.array([[distance(a, b) for b in b_segms] for a in a_segms]) # matches: boxes we succeeded to match completely # mispred: boxes we succeeded to match, having label mismatch matches = [] mispred = [] # It needs len(a_segms) > 0 and len(b_segms) > 0 if len(b_segms) > 0: for a_idx, a_segm in enumerate(a_segms): matched_b = -1 max_dist = -1 b_indices = np.argsort( [not label_matcher(a_segm, b_segm) for b_segm in b_segms], kind="stable" ) # prioritize those with same label, keep score order for b_idx in b_indices: if 0 <= b_matches[b_idx]: # assign a_segm with max conf continue d = distances[a_idx, b_idx] if d < dist_thresh or d <= max_dist: continue max_dist = d matched_b = b_idx if matched_b < 0: continue a_matches[a_idx] = matched_b b_matches[matched_b] = a_idx b_segm = b_segms[matched_b] if label_matcher(a_segm, b_segm): matches.append((a_segm, b_segm)) else: mispred.append((a_segm, b_segm)) # *_umatched: boxes of (*) we failed to match a_unmatched = [a_segms[i] for i, m in enumerate(a_matches) if m < 0] b_unmatched = [b_segms[i] for i, m in enumerate(b_matches) if m < 0] return matches, mispred, a_unmatched, b_unmatched
[docs] def match_segments_more_than_pair( a_segms, b_segms, distance=segment_iou, dist_thresh=1.0, label_matcher=lambda a, b: a.label == b.label, ): """Match segments and return sets of the matched segments which can be more than two""" assert callable(distance), distance assert callable(label_matcher), label_matcher # a_matches: indices of b_segms matched to a bboxes # b_matches: indices of a_segms matched to b bboxes a_matches = -np.ones(len(a_segms), dtype=int) b_matches = -np.ones(len(b_segms), dtype=int) distances = np.array([[distance(a, b) for b in b_segms] for a in a_segms]) # matches: boxes we succeeded to match completely # mispred: boxes we succeeded to match, having label mismatch matches = [] mispred = [] # It needs len(a_segms) > 0 and len(b_segms) > 0 if len(b_segms) > 0: for a_idx, a_segm in enumerate(a_segms): b_indices = np.argsort( [not label_matcher(a_segm, b_segm) for b_segm in b_segms], kind="stable" ) # prioritize those with same label, keep score order for b_idx in b_indices: d = distances[a_idx, b_idx] if d < dist_thresh: continue a_matches[a_idx] = b_idx b_matches[b_idx] = a_idx b_segm = b_segms[b_idx] if label_matcher(a_segm, b_segm): matches.append((a_segm, b_segm)) else: mispred.append((a_segm, b_segm)) # *_umatched: boxes of (*) we failed to match a_unmatched = [a_segms[i] for i, m in enumerate(a_matches) if m < 0] b_unmatched = [b_segms[i] for i, m in enumerate(b_matches) if m < 0] return matches, mispred, a_unmatched, b_unmatched
[docs] @attrs(kw_only=True) class AnnotationMatcher: _context: Optional[Union[IMatcherContext, IMergerContext]] = attrib(default=None)
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class LabelMatcher(AnnotationMatcher):
[docs] def distance(self, a, b): a_label = self._context.get_any_label_name(a, a.label) b_label = self._context.get_any_label_name(b, b.label) return a_label == b_label
[docs] def match_annotations(self, sources): return [sum(sources, [])]
[docs] @attrs(kw_only=True) class ShapeMatcher(AnnotationMatcher): pairwise_dist = attrib(converter=float, default=0.9) cluster_dist = attrib(converter=float, default=-1.0) _match_segments = attrib(default=match_segments_pair)
[docs] def match_annotations(self, sources: List[List[Annotation]]) -> List[List[Annotation]]: distance = self.distance label_matcher = self.label_matcher pairwise_dist = self.pairwise_dist cluster_dist = self.cluster_dist if cluster_dist < 0: cluster_dist = pairwise_dist id_segm = {id(a): (a, id(s)) for s in sources for a in s} def _is_close_enough(cluster, extra_id): # check if whole cluster IoU will not be broken # when this segment is added b = id_segm[extra_id][0] for a_id in cluster: a = id_segm[a_id][0] if distance(a, b) < cluster_dist: return False return True def _has_same_source(cluster, extra_id): b = id_segm[extra_id][1] for a_id in cluster: a = id_segm[a_id][1] if a == b: return True return False # match segments in sources, pairwise adjacent = {i: [] for i in id_segm} # id(sgm) -> [id(adj_sgm1), ...] for a_idx, src_a in enumerate(sources): for src_b in sources[a_idx + 1 :]: matches, _, _, _ = self._match_segments( src_a, src_b, dist_thresh=pairwise_dist, distance=distance, label_matcher=label_matcher, ) for a, b in matches: adjacent[id(a)].append(id(b)) # join all segments into matching clusters clusters = [] visited = set() for cluster_idx in adjacent: if cluster_idx in visited: continue cluster = set() to_visit = {cluster_idx} while to_visit: c = to_visit.pop() cluster.add(c) visited.add(c) for i in adjacent[c]: if i in visited: continue if 0 < cluster_dist and not _is_close_enough(cluster, i): continue if _has_same_source(cluster, i): continue to_visit.add(i) clusters.append([id_segm[i][0] for i in cluster]) return clusters
[docs] def distance(self, a, b): return segment_iou(a, b)
[docs] def label_matcher(self, a, b): a_label = self._context.get_any_label_name(a, a.label) b_label = self._context.get_any_label_name(b, b.label) return a_label == b_label
[docs] @attrs class BboxMatcher(ShapeMatcher): pass
[docs] @attrs class PolygonMatcher(ShapeMatcher): pass
[docs] @attrs class MaskMatcher(ShapeMatcher): pass
[docs] @attrs(kw_only=True) class PointsMatcher(ShapeMatcher): sigma: Optional[list] = attrib(default=None) instance_map = attrib(converter=dict)
[docs] def distance(self, a, b): a_bbox = self.instance_map[id(a)][1] b_bbox = self.instance_map[id(b)][1] if bbox_iou(a_bbox, b_bbox) <= 0: return 0 bbox = mean_bbox([a_bbox, b_bbox]) return OKS(a, b, sigma=self.sigma, bbox=bbox)
[docs] @attrs class LineMatcher(ShapeMatcher):
[docs] def distance(self, a, b): # Compute inter-line area by using the Trapezoid formulae # https://en.wikipedia.org/wiki/Trapezoidal_rule # Normalize by common bbox and get the bbox fill ratio # Call this ratio the "distance" # The box area is an early-exit filter for non-intersected figures bbox = max_bbox([a, b]) box_area = bbox[2] * bbox[3] if not box_area: return 1 def _approx(line, segments): if len(line) // 2 != segments + 1: line = approximate_line(line, segments=segments) return np.reshape(line, (-1, 2)) segments = max(len(a.points) // 2, len(b.points) // 2, 5) - 1 a = _approx(a.points, segments) b = _approx(b.points, segments) dists = np.linalg.norm(a - b, axis=1) dists = dists[:-1] + dists[1:] a_steps = np.linalg.norm(a[1:] - a[:-1], axis=1) b_steps = np.linalg.norm(b[1:] - b[:-1], axis=1) # For the common bbox we can't use # - the AABB (axis-alinged bbox) of a point set # - the exterior of a point set # - the convex hull of a point set # because these soultions won't be correctly normalized. # The lines can have multiple self-intersections, which can give # the inter-line area more than internal area of the options above, # producing the value of the distance outside of the [0; 1] range. # # Instead, we can compute the upper boundary for the inter-line # area based on the maximum point distance and line length. max_area = np.max(dists) * max(np.sum(a_steps), np.sum(b_steps)) area = np.dot(dists, a_steps + b_steps) * 0.5 * 0.5 / max(max_area, 1.0) return abs(1 - area)
[docs] @attrs class CaptionsMatcher(AnnotationMatcher):
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class Cuboid3dMatcher(ShapeMatcher):
[docs] def distance(self, a, b): raise NotImplementedError()
[docs] @attrs class ImageAnnotationMatcher(AnnotationMatcher):
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class HashKeyMatcher(AnnotationMatcher):
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class FeatureVectorMatcher(AnnotationMatcher):
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class TabularMatcher(AnnotationMatcher):
[docs] def match_annotations(self, sources): raise NotImplementedError()
[docs] @attrs class RotatedBboxMatcher(ShapeMatcher): sigma: Optional[list] = attrib(default=None)
[docs] def distance(self, a, b): a = Points([p for pt in a.as_polygon() for p in pt]) b = Points([p for pt in b.as_polygon() for p in pt]) return OKS(a, b, sigma=self.sigma)
[docs] @attrs class Cuboid2DMatcher(ShapeMatcher): pass