Source code for datumaro.components.crypter
# Copyright (C) 2023 Intel Corporation
#
# SPDX-License-Identifier: MIT
import logging as log
from typing import Optional, Union
from cryptography.fernet import Fernet, InvalidToken
from datumaro.components.errors import DatumaroError
[docs]
class Crypter:
# Prefix (datum-) = 6 and Fernet = 44, 6 + 44 = 50
FERNET_KEY_LEN = 50
KEY_PREFIX = b"datum-"
KEY_PREFIX_LEN = len(KEY_PREFIX)
def __init__(self, key: Union[str, bytes]) -> None:
if isinstance(key, str):
key = key.encode()
if len(key) != self.FERNET_KEY_LEN:
raise DatumaroError(
f"Key length should be {self.FERNET_KEY_LEN}, "
f"but your key length is {len(key)} (key={key})."
)
self._key = key[self.KEY_PREFIX_LEN :]
self._fernet = Fernet(self._key)
@property
def key(self) -> bytes:
return self.KEY_PREFIX + self._key
[docs]
def decrypt(self, msg: bytes) -> bytes:
return self._fernet.decrypt(msg)
[docs]
def encrypt(self, msg: bytes) -> bytes:
return self._fernet.encrypt(msg)
[docs]
def handshake(self, key: bytes) -> bool:
try:
return self.decrypt(key) == self.key
except InvalidToken as e:
log.debug(e)
return False
[docs]
@classmethod
def gen_key(cls, key: Optional[bytes] = None) -> bytes:
"""If "key" is not None, return the different key with "key"."""
_key = cls.KEY_PREFIX + Fernet.generate_key()
while _key == key:
_key = cls.KEY_PREFIX + Fernet.generate_key()
return _key
def __eq__(self, __o: object) -> bool:
if not isinstance(__o, Crypter):
return False
return self._key == __o._key
@property
def is_null_crypter(self):
return self == NULL_CRYPTER
[docs]
class NullCrypter(Crypter):
def __init__(self) -> None:
self._key = None
self._fernet = None
[docs]
def decrypt(self, msg: bytes) -> bytes:
return msg
[docs]
def encrypt(self, msg: bytes) -> bytes:
return msg
[docs]
def handshake(self, key: bytes) -> bool:
return self.decrypt(key) == b""
@property
def key(self) -> None:
return None
NULL_CRYPTER = NullCrypter()