Source code for datumaro.plugins.data_formats.datumaro_binary.mapper.media

# Copyright (C) 2024 Intel Corporation
#
# SPDX-License-Identifier: MIT

import os.path as osp
import struct
from typing import Dict, Optional, Tuple

from datumaro.components.errors import DatumaroError
from datumaro.components.media import Image, MediaElement, MediaType, PointCloud, Video, VideoFrame

from .common import Mapper, StringMapper


[docs] class MediaMapper(Mapper):
[docs] @classmethod def forward(cls, obj: Optional[MediaElement]) -> bytes: if obj is None: return struct.pack("<I", MediaType.NONE) elif obj._type == MediaType.IMAGE: return ImageMapper.forward(obj) elif obj._type == MediaType.POINT_CLOUD: return PointCloudMapper.forward(obj) elif obj._type == MediaType.VIDEO: return VideoMapper.forward(obj) elif obj._type == MediaType.VIDEO_FRAME: return VideoFrameMapper.forward(obj) elif obj._type == MediaType.MEDIA_ELEMENT: return MediaElementMapper.forward(obj) else: raise DatumaroError(f"{obj._type} is not allowed for MediaMapper.")
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[Optional[MediaElement], int]: (media_type,) = struct.unpack_from("<I", _bytes, offset) if media_type == MediaType.NONE: return None, offset + 4 elif media_type == MediaType.IMAGE: return ImageMapper.backward(_bytes, offset, media_path_prefix) elif media_type == MediaType.POINT_CLOUD: return PointCloudMapper.backward(_bytes, offset, media_path_prefix) elif media_type == MediaType.VIDEO: return VideoMapper.backward(_bytes, offset, media_path_prefix) elif media_type == MediaType.VIDEO_FRAME: return VideoFrameMapper.backward(_bytes, offset, media_path_prefix) elif media_type == MediaType.MEDIA_ELEMENT: return MediaElementMapper.backward(_bytes, offset, media_path_prefix) else: raise DatumaroError(f"{media_type} is not allowed for MediaMapper.")
[docs] class MediaElementMapper(Mapper): MAGIC_PATH = "/NOT/A/REAL/PATH" MEDIA_TYPE = MediaType.MEDIA_ELEMENT
[docs] @classmethod def forward(cls, obj: MediaElement) -> bytes: bytes_arr = bytearray() bytes_arr.extend(struct.pack("<I", obj.type)) path = getattr(obj, "path", cls.MAGIC_PATH) bytes_arr.extend(StringMapper.forward(path)) return bytes(bytes_arr)
[docs] @classmethod def backward_dict( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[Dict, int]: (media_type,) = struct.unpack_from("<I", _bytes, offset) assert media_type == cls.MEDIA_TYPE, f"Expect {cls.MEDIA_TYPE} but actual is {media_type}." offset += 4 path, offset = StringMapper.backward(_bytes, offset) if path == cls.MAGIC_PATH: path = None return { "type": media_type, "path": path if path == cls.MAGIC_PATH or media_path_prefix is None else osp.join(media_path_prefix[cls.MEDIA_TYPE], path), }, offset
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[MediaElement, int]: _, offset = cls.backward_dict(_bytes, offset, media_path_prefix) return MediaElement(), offset
[docs] class ImageMapper(MediaElementMapper): MAGIC_SIZE_FOR_NONE = (-1583, -1597) MEDIA_TYPE = MediaType.IMAGE
[docs] @classmethod def forward(cls, obj: Image) -> bytes: size = obj.size if obj.has_size else cls.MAGIC_SIZE_FOR_NONE bytes_arr = bytearray() bytes_arr.extend(super().forward(obj)) bytes_arr.extend(struct.pack("<ii", size[0], size[1])) return bytes(bytes_arr)
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[Image, int]: media_dict, offset = cls.backward_dict(_bytes, offset, media_path_prefix) height, width = struct.unpack_from("<ii", _bytes, offset) size = (height, width) offset += 8 return ( Image.from_file( path=media_dict["path"], size=size if size != cls.MAGIC_SIZE_FOR_NONE else None ), offset, )
[docs] class VideoMapper(MediaElementMapper): MAGIC_END_FRAME_FOR_NONE = 4294967295 # max value of unsigned int32 MEDIA_TYPE = MediaType.VIDEO
[docs] @classmethod def forward(cls, obj: Video) -> bytes: end_frame = obj._end_frame if obj._end_frame else cls.MAGIC_END_FRAME_FOR_NONE bytes_arr = bytearray() bytes_arr.extend(super().forward(obj)) bytes_arr.extend(struct.pack("<III", obj._step, obj._start_frame, end_frame)) return bytes(bytes_arr)
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[Video, int]: media_dict, offset = cls.backward_dict(_bytes, offset, media_path_prefix) step, start_frame, end_frame = struct.unpack_from("<III", _bytes, offset) offset += 12 video = Video( path=media_dict["path"], step=step, start_frame=start_frame, end_frame=end_frame if end_frame != cls.MAGIC_END_FRAME_FOR_NONE else None, ) return (video, offset)
[docs] class VideoFrameMapper(MediaElementMapper): MEDIA_TYPE = MediaType.VIDEO_FRAME
[docs] @classmethod def forward(cls, obj: VideoFrame) -> bytes: bytes_arr = bytearray() bytes_arr.extend(super().forward(obj)) bytes_arr.extend(struct.pack("<I", obj.index)) return bytes(bytes_arr)
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[VideoFrame, int]: media_dict, offset = cls.backward_dict(_bytes, offset, media_path_prefix) (frame_index,) = struct.unpack_from("<I", _bytes, offset) video = Video(media_dict["path"]) offset += 4 return ( VideoFrame(video, frame_index), offset, )
[docs] class PointCloudMapper(MediaElementMapper): MEDIA_TYPE = MediaType.POINT_CLOUD
[docs] @classmethod def forward(cls, obj: PointCloud) -> bytes: bytes_arr = bytearray() bytes_arr.extend(super().forward(obj)) bytes_arr.extend(struct.pack("<I", len(obj.extra_images))) for img in obj.extra_images: bytes_arr.extend(ImageMapper.forward(img)) return bytes(bytes_arr)
[docs] @classmethod def backward( cls, _bytes: bytes, offset: int = 0, media_path_prefix: Optional[Dict[MediaType, str]] = None, ) -> Tuple[PointCloud, int]: media_dict, offset = cls.backward_dict(_bytes, offset, media_path_prefix) (len_extra_images,) = struct.unpack_from("<I", _bytes, offset) offset += 4 extra_images = [] for _ in range(len_extra_images): img, offset = ImageMapper.backward(_bytes, offset, media_path_prefix) extra_images.append(img) return PointCloud.from_file(path=media_dict["path"], extra_images=extra_images), offset