Source code for datumaro.plugins.data_formats.arrow.format
# Copyright (C) 2023 Intel Corporation
#
# SPDX-License-Identifier: MIT
import struct
import pyarrow as pa
from datumaro.components.dataset_base import IDataset
from datumaro.errors import DatasetImportError
from datumaro.plugins.data_formats.datumaro.exporter import JsonWriter
from datumaro.plugins.data_formats.datumaro_binary.mapper.common import DictMapper
[docs]
class DatumaroArrow:
SIGNATURE = "signature:datumaro_arrow"
VERSION = "2.0"
MP_TIMEOUT = 300.0 # 5 minutes
ID_FIELD = "id"
SUBSET_FIELD = "subset"
MEDIA_FIELD = "media"
IMAGE_FIELD = pa.struct(
[
pa.field("has_bytes", pa.bool_()),
pa.field("bytes", pa.binary()),
pa.field("path", pa.string()),
pa.field("size", pa.list_(pa.uint16(), 2)),
]
)
POINT_CLOUD_FIELD = pa.struct(
[
pa.field("has_bytes", pa.bool_()),
pa.field("bytes", pa.binary()),
pa.field("path", pa.string()),
pa.field("extra_images", pa.list_(pa.field("image", IMAGE_FIELD))),
]
)
MEDIA_FIELD = pa.struct(
[
pa.field("type", pa.uint8()),
pa.field("image", IMAGE_FIELD),
pa.field("point_cloud", POINT_CLOUD_FIELD),
]
)
SCHEMA = pa.schema(
[
pa.field(ID_FIELD, pa.string()),
pa.field(SUBSET_FIELD, pa.string()),
pa.field("media", MEDIA_FIELD),
pa.field("annotations", pa.binary()),
pa.field("attributes", pa.binary()),
]
)
[docs]
@classmethod
def check_signature(cls, signature: str):
if signature != cls.SIGNATURE:
raise DatasetImportError(
f"Input signature={signature} is not aligned with the ground truth signature={cls.SIGNATURE}"
)
[docs]
@classmethod
def check_schema(cls, schema: pa.lib.Schema):
if not cls.SCHEMA.equals(schema):
raise DatasetImportError(
f"input schema is not aligned with the ground truth schema.\n"
f"input schema: \n{schema.to_string()}\n"
f"ground truth schema: \n{cls.SCHEMA.to_string()}\n"
)
[docs]
@classmethod
def check_version(cls, version: str):
if version != cls.VERSION:
raise DatasetImportError(
f"Input version={version} is not aligned with the current data format version={cls.VERSION}"
)