Source code for datalake.interface
from abc import ABC, abstractmethod
from logging import getLogger
[docs]class IStorage(ABC): # pragma: no cover
"""The storage interface defines method for manipulating files in a cloud object/blob storage"""
@property
@abstractmethod
def name(self):
"""Returns the name of the storage"""
pass
[docs] @abstractmethod
def exists(self, key):
"""Returns ``True`` if the specified key exists, ``False`` otherwise
Args:
key (str): the path of a storage object
"""
pass
[docs] @abstractmethod
def checksum(self, key):
"""Returns the **SHA256** hash for the file at the specified path
Args:
key (str): the path of a storage object
"""
pass
[docs] @abstractmethod
def is_folder(self, key):
"""
Return ``True`` if the specified key is a folder-like object, ``False`` otherwise
Args:
key (str): the path of a storage object
"""
pass
[docs] @abstractmethod
def keys_iterator(self, prefix):
"""Returns an iterator of keys that match the specified prefix
Args:
prefix (str): a partial path of a storage object
"""
pass
[docs] @abstractmethod
def upload(self, src, dst, content_type="text/csv", encoding="utf-8", metadata={}):
"""Uploads a local file to the storage
Args:
src (str): the path of a local file
dst (str): the path of the target storage object
content_type (str): the MIME type for the file
encoding (str): the encoding for the file
metadata (dict): a map of key/value pairs to add as metadata for the uploaded file
"""
pass
[docs] @abstractmethod
def download(self, src, dst):
"""Downloads a storage file locally
Args:
src (str): the path of a storage object
dst (str): the path of the target local file
"""
pass
[docs] @abstractmethod
def copy(self, src, dst, bucket=None):
"""Copies a storage key to another key in the same storage or another
Args:
src (str): the path of a storage object to copy
dst (str): the path of the target storage object
bucket (str): if specified then the object is copied in this bucket.
If ``None`` the object is copied in the same bucket than the source object
"""
pass
[docs] @abstractmethod
def delete(self, key):
"""Removes an object from the storage
Args:
key (str): the path of a storage object"""
pass
[docs] @abstractmethod
def move(self, src, dst, bucket=None):
"""Moves a storage key to another key in the same storage or another
Args:
src (str): the path of a storage object to move
dst (str): the path of the target storage object
bucket (str): if specified then the object is moved in this bucket.
If ``None`` the object is moved in the same bucket than the source object
"""
pass
[docs] @abstractmethod
def put(self, content, dst, content_type="text/csv", encoding="utf-8", metadata={}):
"""Writes the specified content in a storage object
Args:
content (str): the content to upload
dst (str): the path of the target storage object
content_type (str): the MIME type for the file
encoding (str): the encoding for the file
metadata (dict): a map of key/value pairs to add as metadata for the uploaded file
"""
pass
[docs] @abstractmethod
def get(self, key):
"""Returns the content of the specified object
Args:
key (str): the path of a storage object
"""
pass
[docs] @abstractmethod
def stream(self, key, encoding="utf-8"):
"""Returns an iterator on each lines for the specified object
Args:
key (str): the path of a storage object
encoding (str): the encoding to use for decoding the byte stream
"""
pass
[docs] @abstractmethod
def size(self, key):
"""Return the size in bytes for the specified object
Args:
key (str): the path of a storage object
"""
pass
[docs]class IStorageEvent(ABC): # pragma: no cover
"""The storage interface defines methods for handling event notifications from buckets"""
[docs] @abstractmethod
def process(self, storage, object):
"""Callback method for handling an object creation
Args:
storage (IStorage): the concrete implementation of the Storage interface
object (str): the path of the created object
"""
pass
[docs]class ISecret(ABC): # pragma: no cover
"""The secret interface defines methods for fetching a cloud managed secret"""
@property
@abstractmethod
def plain(self):
"""Returns the secret in plain UTF-8 text"""
pass
@property
@abstractmethod
def json(self):
"""Returns the secret as a ``dict``"""
pass
[docs]class IMonitor(ABC): # pragma: no cover
"""The monitoring abstract class defines methods for sending metrics to a Time Series Database"""
[docs] def safe_push(self, measurement):
"""Same as ``push()`` but with all exceptions trapped hence not disrupting the main program"""
logger = getLogger(__name__)
try:
self.push(measurement)
except Exception as e:
logger.warning(f"An error occured whilst pushing a measurement: {str(e)}")
[docs] @abstractmethod
def push(self, measurement):
"""Sends a measurement to the TSDB backend
Args:
measurement (Measurement): the measurement to send
"""
pass