Source code for otx.cli.tools.utils.demo.images_capture

"""Images capturing module."""

# Copyright (C) 2021 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

import copy
import os
import sys

import cv2

# Taken from here:
# https://github.com/openvinotoolkit/open_model_zoo/blob/develop/demos/common/python/images_capture.py


[docs] class InvalidInput(Exception): """Exception for wrong input format.""" def __init__(self, message): super().__init__() self.message = message
[docs] class OpenError(Exception): """Exception for error opening reader.""" def __init__(self, message): super().__init__() self.message = message
[docs] class ImagesCapture: """Images capturing base class."""
[docs] def read(self): """Returns captured image.""" raise NotImplementedError
[docs] def fps(self): """Returns a frequency of getting images from source.""" raise NotImplementedError
[docs] def get_type(self): """Returns type of image capture.""" raise NotImplementedError
[docs] class ImreadWrapper(ImagesCapture): """Class for reading an image from file.""" def __init__(self, source, loop): self.loop = loop if not os.path.isfile(source): raise InvalidInput(f"Can't find the image by {source}") self.image = cv2.imread(source, cv2.IMREAD_COLOR) if self.image is None: raise OpenError(f"Can't open the image from {source}") self.can_read = True
[docs] def read(self): """Returns captured image.""" if self.loop: return copy.deepcopy(self.image) if self.can_read: self.can_read = False return copy.deepcopy(self.image) return None
[docs] def fps(self): """Returns a frequency of getting images from source.""" return 1.0
[docs] def get_type(self): """Returns type of image capture.""" return "IMAGE"
[docs] class DirReader(ImagesCapture): """Class for reading images from directory.""" def __init__(self, source, loop): self.loop = loop self.dir = source if not os.path.isdir(self.dir): raise InvalidInput(f"Can't find the dir by {source}") self.names = sorted(os.listdir(self.dir)) if not self.names: raise OpenError(f"The dir {source} is empty") self.file_id = 0 for name in self.names: filename = os.path.join(self.dir, name) image = cv2.imread(filename, cv2.IMREAD_COLOR) if image is not None: return raise OpenError(f"Can't read the first image from {source}")
[docs] def read(self): """Returns captured image.""" while self.file_id < len(self.names): filename = os.path.join(self.dir, self.names[self.file_id]) image = cv2.imread(filename, cv2.IMREAD_COLOR) self.file_id += 1 if image is not None: return image if self.loop: self.file_id = 0 while self.file_id < len(self.names): filename = os.path.join(self.dir, self.names[self.file_id]) image = cv2.imread(filename, cv2.IMREAD_COLOR) self.file_id += 1 if image is not None: return image return None
[docs] def fps(self): """Returns a frequency of getting images from source.""" return 1.0
[docs] def get_type(self): """Returns type of image capture.""" return "DIR"
[docs] class VideoCapWrapper(ImagesCapture): """Class for capturing images from video.""" def __init__(self, source, loop): self.loop = loop self.cap = cv2.VideoCapture() status = self.cap.open(source) if not status: raise InvalidInput(f"Can't open the video from {source}")
[docs] def read(self): """Returns captured image.""" status, image = self.cap.read() if not status: if not self.loop: return None self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) status, image = self.cap.read() if not status: return None return image
[docs] def fps(self): """Returns a frequency of getting images from source.""" return self.cap.get(cv2.CAP_PROP_FPS)
[docs] def get_type(self): """Returns type of image capture.""" return "VIDEO"
[docs] class CameraCapWrapper(ImagesCapture): """Class for capturing images from camera.""" def __init__(self, source, camera_resolution): self.cap = cv2.VideoCapture() try: status = self.cap.open(int(source)) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, camera_resolution[0]) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, camera_resolution[1]) self.cap.set(cv2.CAP_PROP_FPS, 30) self.cap.set(cv2.CAP_PROP_AUTOFOCUS, 1) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) if not status: raise OpenError(f"Can't open the camera from {source}") except ValueError as ex: raise InvalidInput(f"Can't find the camera {source}") from ex
[docs] def read(self): """Returns captured image.""" status, image = self.cap.read() if not status: return None return image
[docs] def fps(self): """Returns a frequency of getting images from source.""" return self.cap.get(cv2.CAP_PROP_FPS)
[docs] def get_type(self): """Returns type of image capture.""" return "CAMERA"
[docs] def open_images_capture(source, loop, camera_resolution=(1280, 720)): """Opens images capture.""" errors = {InvalidInput: [], OpenError: []} for reader in (ImreadWrapper, DirReader, VideoCapWrapper): try: return reader(source, loop) except (InvalidInput, OpenError) as ex: errors[type(ex)].append(ex.message) try: return CameraCapWrapper(source, camera_resolution) except (InvalidInput, OpenError) as ex: errors[type(ex)].append(ex.message) if not errors[OpenError]: print(*errors[InvalidInput], file=sys.stderr, sep="\n") else: print(*errors[OpenError], file=sys.stderr, sep="\n") sys.exit(1)