Source code for datumaro.plugins.data_formats.tabular

# Copyright (C) 2021-2023 Intel Corporation
#
# SPDX-License-Identifier: MIT

import errno
import os
import os.path as osp
from typing import Dict, List, Optional, Tuple, Type, Union

import pandas as pd

from datumaro.components.annotation import AnnotationType, Categories, Tabular, TabularCategories
from datumaro.components.dataset_base import DatasetBase, DatasetItem
from datumaro.components.errors import MediaTypeError
from datumaro.components.exporter import Exporter
from datumaro.components.importer import ImportContext, Importer
from datumaro.components.media import Table, TableDtype, TableRow
from datumaro.util.os_util import find_files

# Only supports '.csv' extention.
TABULAR_EXTENSIONS = [
    "csv",
]


[docs] class TabularDataBase(DatasetBase): NAME = "tabular" def __init__( self, path: str, *, target: Optional[Union[str, List[str]]] = None, dtype: Optional[Dict[str, Type[TableDtype]]] = None, ctx: Optional[ImportContext] = None, ) -> None: """ Read and compose a tabular dataset. The file name of each '.csv' file is regarded as subset. Args: path (str) : Path to a tabular dataset. (csv file or folder contains csv files). target (optional, str or list(str)) : Target column or list of target columns. If this is not specified (None), the last column is regarded as a target column. In case of a dataset with no targets, give an empty list as a parameter. dtype (optional, dict(str,str)) : Dictionay of column name -> type str ('str', 'int', or 'float'). This can be used when automatic type inferencing is failed. """ paths: List[str] = [] if osp.isfile(path): paths.append(path) else: for path in find_files(path, TABULAR_EXTENSIONS): paths.append(path) if not paths: raise FileNotFoundError(errno.ENOENT, "Can't find tabular files", path) super().__init__(media_type=TableRow, ctx=ctx) self._infos = {"path": path} self._items, self._categories = self._parse(paths, target, dtype) def _parse( self, paths: List[str], target: Optional[Dict[str, List[str]]] = None, dtype: Optional[Dict[str, Type[TableDtype]]] = None, ) -> Tuple[List[DatasetItem], Dict[AnnotationType, Categories]]: """ parse tabular files. Each file is regarded as a subset. Args: paths (list(str)) : A list of paths to tabular data files(csv files). target (optional, dict(str or list)) : Target column or list of target columns for each input and output. If this is not specified (None), the whole columns are regarded as a target column. In case of a dataset with no targets, give an empty list as a parameter. dtype (optional, dict(str,str)) : Dictionay of column name -> type str ('str', 'int', or 'float'). This can be used when automatic type inferencing is failed. Returns: list (DatasetItem): dataset items dict (AnnotationType, Categories): categories info """ # assert paths items: List[DatasetItem] = [] categories: TabularCategories = TabularCategories() if target is not None: if "input" not in target or "output" not in target: raise TypeError('Target should have both "input" and "output"') for path in paths: table = Table.from_csv(path, dtype=dtype) targets: List[str] = [] targets_ann: List[str] = [] if target is None: targets.extend(table.columns) # add all columns else: # add valid targeted output column name only if isinstance(target.get("input"), str) and target["input"] in table.columns: targets.append(target["input"]) elif isinstance(target.get("input"), list): targets.extend(col for col in target["input"] if col in table.columns) if isinstance(target.get("output"), str) and target["output"] in table.columns: targets_ann.append(target["output"]) elif isinstance(target.get("output"), list): targets_ann.extend(col for col in target["output"] if col in table.columns) targets = targets + targets_ann # set categories for target_ in targets_ann: _, category = categories.find(target_) target_dtype = table.dtype(target_) if target_dtype in [int, float, pd.api.types.CategoricalDtype()]: # 'int' can be categorical, but we don't know this unless user gives information. labels = set( [ feature for feature in table.features(target_, unique=True) if not pd.isna(feature) ] ) if category is None: categories.add(target_, target_dtype, labels) else: # update labels if they are different. category.labels.union(labels) elif target_dtype is str: if category is None: categories.add(target_, target_dtype) else: raise TypeError( f"Unsupported type '{target_dtype}' for target column '{target_}'." ) # load annotations subset = osp.splitext(osp.basename(path))[0] row: TableRow table.select(targets) for row in table: # type: TableRow id = f"{row.index}@{subset}" ann = [Tabular(values=row.data(targets_ann))] if targets_ann else None item = DatasetItem( id=id, subset=subset, media=row, annotations=ann, ) items.append(item) return items, {AnnotationType.tabular: categories}
[docs] def categories(self): return self._categories
def __iter__(self): yield from self._items
[docs] def string_to_dict(input_string): pairs = input_string.split(",") result = {} for pair in pairs: split_pair = pair.split(":") # Check if the key is "input" or "output". if len(split_pair) == 2: key, value = split_pair if key == "input" or key == "output": if key in result: result[key].append(value) else: result[key] = [value] else: # Ignore other keys pass else: result[key].extend(split_pair) return result
[docs] class TabularDataImporter(Importer): """ Import a tabular dataset. Each '.csv' file is regarded as a subset. """ NAME = "tabular"
[docs] @classmethod def build_cmdline_parser(cls, **kwargs): parser = super().build_cmdline_parser(**kwargs) parser.add_argument( "--target", type=lambda x: string_to_dict(x), help="Target column or list of target columns for each input and output." "(ex. 'input:date,output:class', 'input:data,output:class,breed') (default:None)" "If this is not specified (None), the whole columns are regarded as a target column." "In case of a dataset with no targets, give an empty list as a parameter.", ) parser.add_argument( "--dtype", type=lambda x: {k: v for k, v in (map.split(":") for map in x.split(","))}, help="Type information for a column. (ex. 'date:str,x:int') (default:None) " "This can be used when automatic type inferencing is failed", ) return parser
[docs] @classmethod def find_sources(cls, path): if not osp.isdir(path): ext = osp.splitext(path)[1][1:] # exclude "." if ext in TABULAR_EXTENSIONS: return [{"url": path, "format": TabularDataBase.NAME}] else: for _ in find_files(path, TABULAR_EXTENSIONS): # find 1 depth only. return [{"url": path, "format": TabularDataBase.NAME}] return []
[docs] @classmethod def get_file_extensions(cls) -> List[str]: return list({f".{ext}" for ext in TABULAR_EXTENSIONS})
[docs] class TabularDataExporter(Exporter): """ Export a tabular dataset. This will save each subset into a '.csv' file regardless of 'save_media' value """ NAME = "tabular" EXPORT_EXT = ".csv" DEFAULT_IMAGE_EXT = ".jpg" # just to avoid assert error. def _apply_impl(self): extractor = self._extractor if extractor.media_type() and not issubclass(extractor.media_type(), TableRow): raise MediaTypeError("Media type is not a table.") # we don't check self._save_media. # regardless of the value, we always save media(csv) file. os.makedirs(self._save_dir, exist_ok=True) for sname in extractor.subsets(): subset = extractor.get_subset(sname) path = osp.join(self._save_dir, sname + self.EXPORT_EXT) list_of_dicts: List[Dict[str, TableDtype]] = list() for item in subset: dicts = item.media.data() for ann in item.annotations: if isinstance(ann, Tabular): dicts.update(ann.values) # update value list_of_dicts.append(dicts) table = Table.from_list(list_of_dicts) table.save(path)