import inspect
from importlib import import_module
import requests
from datalake.helpers import StandardDialect, DatasetBuilder, DatasetReader
from datalake.interface import IMonitor
from datalake.exceptions import *
[docs]class Datalake:
"""Main class for dealing with datacatalog and storage"""
def __init__(self, config={}):
"""
Args:
config (dict): configuration parameters in key/value pairs
"""
if not isinstance(config, dict):
raise BadConfiguration("Datalake configuration must be a dict")
datalake_config = {
"catalog_url": "http://localhost:8080",
"monitoring": {
"class": "NoMonitor",
"params": {},
},
}
datalake_config.update(config)
self._catalog_url = datalake_config["catalog_url"]
try:
catalog_config = self._call_catalog("configuration")
except Exception as e:
raise BadConfiguration(f"catalog URL is invalid ({str(e)})")
# Configure Cloud Services
self._provider = catalog_config["provider"]
self._service_discovery = ServiceDiscovery(self._provider, datalake_config["monitoring"])
# Configure CSV dialect
self._dialect = StandardDialect()
self._dialect.delimiter = catalog_config["csv_format"]["delimiter"]
self._dialect.lineterminator = catalog_config["csv_format"]["line_break"]
self._dialect.quotechar = catalog_config["csv_format"]["quote_char"]
self._dialect.doublequote = catalog_config["csv_format"]["double_quote"]
escape_char = catalog_config["csv_format"]["escape_char"]
self._dialect.escapechar = None if escape_char == "" else escape_char
def _call_catalog(self, endpoint, params=None):
response = requests.get(f"{self._catalog_url}/{endpoint}", params=params)
response.raise_for_status()
return response.json()
@property
def provider(self):
"""Returns the configured name of the provider"""
return self._provider
@property
def csv_dialect(self):
"""Returns the configured ``csv.Dialect`` instance"""
return self._dialect
@property
def monitor(self):
"""Returns the concrete implementation for ``datalake.interface.IMonitor``"""
return self._service_discovery.monitor
[docs] def get_storage(self, bucket):
"""Get a Storage instance for the provided bucket
Args:
bucket (str): the name of the bucket (depending on the underlying provider)
Returns:
A concrete instance of ``datalake.interface.IStorage``
"""
return self._service_discovery.get_storage(bucket)
[docs] def get_entry(self, key):
"""Get the catalog definition for an entry key
Returns:
a ``dict``
"""
try:
return self._call_catalog(f"catalog/entry/{key}")
except requests.exceptions.HTTPError as http_error:
if http_error.response.status_code == 404:
raise EntryNotFound(f"Entry '{key}' does not exist")
raise # pragma: no cover
[docs] def resolve_path(self, store, path):
"""Resolves a path in a store name to a fully qualified bucket path
Args:
store (str): the name of a store
path (str): the path to resolve in the store
Returns:
a tuple ``(bucket, path, uri)`` with the bucket name, the full path and a fully qualified URI
"""
try:
res = self._call_catalog(f"storage/{store}/{path}")
return res["bucket"], res["path"], res["uri"]
except requests.exceptions.HTTPError as http_error:
if http_error.response.status_code == 404:
raise StoreNotFound(f"Store '{store}' does not exist")
raise # pragma: no cover
[docs] def identify(self, path):
"""
Returns a tuple with the catalog entry that matches the path and a `dict` with the path's placeholders
"""
try:
res = self._call_catalog(f"catalog/identify/{path}")
if len(res) > 1:
raise EntryNotFound(f"Multiple entry found for path '{path}'")
return res[0]["entry"], res[0]["params"]
except requests.exceptions.HTTPError as http_error:
if http_error.response.status_code == 404:
raise EntryNotFound(f"No entry found for path '{path}'")
raise # pragma: no cover
[docs] def get_entry_path(self, key, path_params=None, strict=False):
"""
Builds a path for the specified entry with the given parameters
"""
try:
res = self._call_catalog(f"catalog/storage/{key}", path_params)
if strict and res["is_partial"]:
raise ValueError(f"Missing parameters for entry '{key}'")
return res["prefix"]
except requests.exceptions.HTTPError as http_error:
if http_error.response.status_code == 404:
raise EntryNotFound(f"Entry '{key}' does not exist")
raise # pragma: no cover
[docs] def get_entry_path_resolved(self, store, key, path_params=None, strict=False):
"""
Returns the resolved path for the specified entry with the given parameters and a storage for the specifed store
"""
path = self.get_entry_path(key, path_params, strict)
bucket, resolved, _ = self.resolve_path(store, path)
return self.get_storage(bucket), resolved
[docs] def upload(
self, filepath, store, key, path_params=None, content_type="text/plain", encoding="utf-8", metadata={}
): # pragma: no cover
"""Uploads a local file in a store as the specified catalog entry"""
storage, path = self.get_entry_path_resolved(store, key, path_params, strict=True)
storage.upload(filepath, path, content_type, encoding, metadata)
return path
[docs] def download(self, store, key, filepath, path_params=None): # pragma: no cover
"""Downloads the specified catalog entry from a store to a local file
Args:
store (str): the name of the store
key (str): the catalog key for the file to download
filepath (str): the local file path
path_params (dict): a map of key/value pair to fill the key path placeholders
"""
storage, path = self.get_entry_path_resolved(store, key, path_params, strict=True)
storage.download(path, filepath)
[docs] def list_entry_files(self, store, key, path_params=None): # pragma: no cover
"""
Returns the list of files in a store for the specified catalog entry
"""
storage, path = self.get_entry_path_resolved(store, key, path_params, strict=False)
return storage.keys_iterator(path)
[docs] def new_dataset_builder(self, key, path=None, lang="en_US", date_formats=None, ciphered=False):
return DatasetBuilder(self, key, path, lang, date_formats, ciphered)
[docs] def new_dataset_reader(self, store, key, path_params=None, ciphered=False):
return DatasetReader(self, store, key, path_params, ciphered)
[docs] def get_secret(self, name):
"""Get a Secret instance for the provider secret name
Args:
name (str): the name of the secret (depending on the underlying provider)
Returns:
A concrete instance of ``datalake.interface.ISecret``
"""
return self._service_discovery.get_secret(name)
[docs]class ServiceDiscovery:
def __init__(self, provider, monitoring=None, *args, **kwargs):
"""Cloud resources reslover
Args:
provider (str): the cloud provider (``"aws"``, ``"azure"`` or ``"gcp"``) or ``"local"``
monitoring (dict): the monitoring implementation spec
Example:
local provider with a console monitoring::
monitoring_spec = {
"class": "NoMonitor",
"params": {
"quiet": False
}
}
service_discovery = ServiceDiscovery("local", monitoring_spec)
Google cloud typical setup::
monitoring_spec = {
"class": "datalake.provider.gcp.GoogleMonitor",
"params": {
"project_id": "my-google-project-id"
}
}
service_discovery = ServiceDiscovery("gcp", monitoring_spec)
Raises:
DatalakeError: when the provider is invalid
BadConfiguration: when monitoring spec is invalid
"""
self._find_cloud_provider(provider)
if monitoring is None:
monitoring = {"class": "NoMonitor", "params": {}}
self._find_monitor_class(monitoring)
def _find_cloud_provider(self, provider):
if provider == "aws": # pragma: no cover
self._provider = import_module("datalake.provider.aws")
elif provider == "gcp": # pragma: no cover
self._provider = import_module("datalake.provider.gcp")
elif provider == "azure": # pragma: no cover
self._provider = import_module("datalake.provider.azure")
elif provider == "local":
self._provider = import_module("datalake.provider.local")
else: # pragma: no cover
raise DatalakeError(f"Invalid storage provider: {provider}")
def _find_monitor_class(self, config):
if not isinstance(config, dict):
raise BadConfiguration("Monitoring configuration must be a dict")
monitor_split = config["class"].split(".")
if len(monitor_split) > 1:
class_name = monitor_split[-1]
module_name = ".".join(monitor_split[:-1])
else:
class_name = monitor_split[0]
module_name = "datalake.telemetry"
try:
module = import_module(module_name)
except ModuleNotFoundError:
raise BadConfiguration(f"'{module_name}' Monitor module cannot be found")
monitor_class = None
for n, c in inspect.getmembers(module, inspect.isclass):
if n.lower() == class_name.lower():
monitor_class = c
break
if monitor_class is None:
raise BadConfiguration(f"'{class_name}' Monitor class cannot be found in module {module_name}")
if not issubclass(c, IMonitor):
raise BadConfiguration(f"'{class_name}' Monitor class is not a subclass for IMonitor")
try:
self._monitor = c(**config["params"])
except Exception as e:
raise BadConfiguration(f"'{class_name}' Monitor class cannot be instanciated: {str(e)}")
@property
def monitor(self):
"""Returns a ``datalake.interface.IMonitor`` instance"""
return self._monitor
[docs] def get_storage(self, bucket):
"""Returns a ``datalake.interface.IStorage`` instance
Args:
bucket (str): the name of the bucket to fetch
"""
return self._provider.Storage(bucket)
[docs] def get_secret(self, name):
"""Returns a ``datalake.interface.ISecret`` instance
Args:
name (str): the name of the secret to fetch
"""
return self._provider.Secret(name)