Source code for datalake_ingestion.collect

from importlib import import_module
import inspect
import json
import re
from time import perf_counter
from logging import getLogger
from datalake.interface import IStorageEvent
import datalake_ingestion.exceptions
import datalake_ingestion.preprocess
from datalake.telemetry import Measurement
import pendulum
from pkg_resources import resource_stream
from jsonschema import Draft7Validator
import pendulum


STATUS_SUCCESS = "Success"
STATUS_UNKNOWN = "Unknown"
STATUS_ERROR = "Failure"
STATUS_INFECTED = "Infected"


[docs]def validate_config(cfg): """ Validates that the given configuration conforms to the schema. Args: cfg (dict): a configuration to test Raises: jsonschema.exceptions.ValidationError: when configuration is invalid """ with resource_stream("datalake_ingestion", "schemas/collect.json") as f: config_schema = json.load(f) Draft7Validator.check_schema(config_schema) Draft7Validator(config_schema).validate(cfg)
[docs]class Collector(IStorageEvent): def __init__(self, datalake, collect_config): """ Runs the main ingestion workflow. Args: datalake (datalake.Datalake): a datalake framework instance collect_config (list(dict)): a collect configuration """ self._logger = getLogger(f"{__name__}.{__class__.__name__}") self._datalake = datalake self._monitor = datalake.monitor validate_config(collect_config) builtin_classes = {} for n, c in inspect.getmembers(datalake_ingestion.preprocess, inspect.isclass): if not issubclass(c, datalake_ingestion.preprocess.Preprocessor): continue if inspect.isabstract(c): continue builtin_classes[n.lower()] = c(self._datalake) extended_classes = {} self._indexed = {} self._unindexed = [] for item in collect_config: cfg = {**item} cfg["pattern"] = re.compile(item["pattern"]) action_name = cfg["action_name"] if action_name.lower() in builtin_classes: cfg["action_class"] = builtin_classes[action_name.lower()] elif action_name.lower() in extended_classes: cfg["action_class"] = extended_classes[action_name.lower()] else: # try to find the class and add it to extended_classes action_split = action_name.split(".") if len(action_split) > 1: class_name = action_split[-1] module_name = ".".join(action_split[:-1]) try: module = import_module(module_name) except ModuleNotFoundError: raise ModuleNotFoundError(f"Preprocessor module '{module_name}' cannot be found") action_class = None for n, c in inspect.getmembers(module, inspect.isclass): if n == class_name: action_class = c break if action_class is None: raise AttributeError(f"Preprocessor class '{class_name}' cannot be found in module {module_name}") if not issubclass(c, datalake_ingestion.preprocess.Preprocessor): raise ValueError(f"Preprocessor '{action_name}' cannot be found in module {module_name}") extended_classes[action_name.lower()] = c(self._datalake) cfg["action_class"] = extended_classes[action_name.lower()] else: raise ValueError(f"Preprocessor '{action_name}' cannot be found") if "action_params" not in cfg: cfg["action_params"] = {} if "landing_folder" in cfg: idx = cfg["landing_folder"] if idx not in self._indexed: self._indexed[idx] = [] self._indexed[idx].append(cfg) else: self._unindexed.append(cfg)
[docs] def process(self, storage, path): """ Identifies the file path and runs the preprocessor. Also builds a Measurement and sends it to the telemetry backend Args: storage (datalake.interface.IStorage): the input storage path (str): the file path to process """ result = self.identify(path) process_metric = Measurement("collector") process_metric.add_labels({"source_bucket": storage.name, "source_path": path}) process_metric.add_measure("file_size", storage.size(path)) if result is None: # Path is not recognized from the configuration purgatory, target, _ = self._datalake.resolve_path("purgatory", path) storage.move(path, target, purgatory) process_metric.add_label("status", STATUS_UNKNOWN) else: # Collect configuration available if "catalog_entry" in result: catalog_entry = self._datalake.get_entry(result["catalog_entry"]) process_metric.add_labels( { "catalog_entry": catalog_entry["_key"], "catalog_domain": catalog_entry["domain"], "catalog_provider": catalog_entry["provider"], "catalog_feed": catalog_entry["feed"], } ) else: catalog_entry = None if result["archive"]: archive_timestamp = pendulum.now("UTC").format("YYYYMMDD_HHmmSS") archive, archive_path, _ = self._datalake.resolve_path("archive", f"{path}.{archive_timestamp}") storage.copy(path, archive_path, archive) process_metric.add_label("archived", "yes") process_metric.add_label("archive_path", archive_path) else: process_metric.add_label("archived", "no") try: preprocessor = result["action_class"] process_metric.add_label("preprocessor", preprocessor.__class__.__name__) preprocessor.action( process_metric, storage, path, result["pattern_extract"], catalog_entry, **result["action_params"], ) process_metric.add_label("status", STATUS_SUCCESS) except datalake_ingestion.exceptions.HackDetected as e: self._logger.warning(f"Suspicious file {path}: {e} detected") quarantine, resolved, _ = self._datalake.resolve_path("quarantine", path) storage.move(path, resolved, quarantine) process_metric.add_label("status", STATUS_INFECTED) except datalake_ingestion.exceptions.PreprocessError as e: self._logger.error(f"'{preprocessor.__class__.__name__}' Preprocessor failed file {path}: {e}") purgatory, resolved, _ = self._datalake.resolve_path("purgatory", path) storage.move(path, resolved, purgatory) process_metric.add_label("status", STATUS_ERROR) except Exception as e: self._logger.error(f"An error occured whilst preprocessing {path}: {str(e)}") purgatory, resolved, _ = self._datalake.resolve_path("purgatory", path) storage.move(path, resolved, purgatory) process_metric.add_label("status", STATUS_ERROR) process_metric.add_measure("processing_time", round(process_metric.read_chrono() // 10**6)) self._monitor.safe_push(process_metric)
[docs] def identify(self, path): """ Searches the collect configuration for a match with the given file path Args: path (str): the file path to identify Returns: the configuration entry ``dict`` if an entry is found, ``None`` otherwise. The values captured from the path are stored in the ``dict`` under the **pattern_extract** key """ config = self._unindexed for idx in self._indexed.keys(): if idx in path: config = self._indexed[idx] for item in config: match = item["pattern"].search(path) if match is not None: result = {**item} if "pattern_extract" in result: result["pattern_extract"] = {**result["pattern_extract"]} for var, template in result["pattern_extract"].items(): result["pattern_extract"][var] = template.format(capture=match.groups()) else: result["pattern_extract"] = {} return result return None