Source code for datalake.helpers

import csv
import decimal
import os
import pendulum
import re
from tempfile import mkstemp
from babel.core import Locale

try:
    import pandas

    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False

STANDARD_DATE_FORMAT = "YYYY-MM-DD"
STANDARD_TIME_FORMAT = "HH:mm:ss.SSSZZ"
STANDARD_DATETIME_FORMAT = "YYYY-MM-DDTHH:mm:ss.SSSZZ"


def _to_decimal(x, lang):
    if type(x) in (int, float):
        return x
    l = Locale.parse(lang)
    separator = l.number_symbols["decimal"]
    group = l.number_symbols["group"]

    if re.fullmatch(r"\s+", group) is not None:
        group = " "

    try:
        return decimal.Decimal(x.replace(group, "").replace(separator, "."))
    except decimal.InvalidOperation:
        raise ValueError(f"Unable to cast number {x} in locale '{lang}'")


def _to_date(string, format):
    return pendulum.from_format(string, format)


[docs]def cast_integer(x, lang="en_US"): """Cast a string as an integer according to a locale Args: x (str): a value to cast lang (str): the locale used to interpret the string Returns: the value casted as ``int`` """ return int(_to_decimal(x, lang))
[docs]def cast_float(x, lang="en_US"): """Cast a string as an decimal according to a locale Args: x (str): a value to cast lang (str): the locale used to interpret the string. Returns: the value casted as ``float`` """ return float(_to_decimal(x, lang))
[docs]def cast_date(d, formats=[STANDARD_DATE_FORMAT]): """Cast a string as an date according to a set of formats strings Args: d (str): a value to cast formats (list(str)): a set of formats used to try to cast the string as a date. See also the `Supported formats`_ Returns: the value formatted with `ISO 8601`_ date format .. _Supported formats: https://pendulum.eustace.io/docs/#tokens .. _ISO 8601: https://www.iso.org/iso-8601-date-and-time-format.html """ for f in formats: try: return _to_date(d, f).format(STANDARD_DATE_FORMAT) except ValueError: continue raise ValueError(f"Wrong date format '{d}'")
[docs]def cast_time(d, formats=[STANDARD_TIME_FORMAT]): """Cast a string as an time according to a set of formats strings Args: d (str): a value to cast formats (list(str)): a set of formats used to try to cast the string as a time. See also the `Supported formats`_ Returns: the value formatted with `ISO 8601`_ time format .. _Supported formats: https://pendulum.eustace.io/docs/#tokens .. _ISO 8601: https://www.iso.org/iso-8601-date-and-time-format.html """ for f in formats: try: return _to_date(d, f).format(STANDARD_TIME_FORMAT) except ValueError: continue raise ValueError(f"Wrong time format '{d}'")
[docs]def cast_datetime(d, formats=[STANDARD_DATETIME_FORMAT]): """Cast a string as an datetime according to a set of formats strings Args: d (str): a value to cast formats (list(str)): a set of formats used to try to cast the string as a datetime. See also the `Supported formats`_ Returns: the value formatted with `ISO 8601`_ datetime format .. _Supported formats: https://pendulum.eustace.io/docs/#tokens .. _ISO 8601: https://www.iso.org/iso-8601-date-and-time-format.html """ for f in formats: try: return _to_date(d, f).format(STANDARD_DATETIME_FORMAT) except ValueError: continue raise ValueError(f"Wrong datetime format '{d}'")
[docs]class StandardDialect(csv.Dialect): """CSV format according to `RFC 4180`_ .. _RFC 4180: https://datatracker.ietf.org/doc/html/rfc4180 """ delimiter = "," quotechar = '"' escapechar = None doublequote = True lineterminator = "\n" quoting = csv.QUOTE_MINIMAL skipinitialspace = False strict = True
[docs]class DatasetBuilder: def __init__(self, datalake, key, path=None, lang="en_US", date_formats=None, ciphered=False): """Creates a new CSV file according to a Datalake's standards Args: datalake (Datalake): a datalake instance key (str): the catalog entry identifier for which a dataset is created path (str): the path to a local file to create. Default to a auto-generated temp file lang (str): a locale to use for casting numbers date_formats (list(str)): a list of datetime formats to use for casting date and times. Defaults to iso8601 formats. ciphered (bool): Indicates whether the dataset has pseudonymized values or not. Defaults to ``False`` Example: building a dataset with dicts:: with DatasetBuilder(dlk, "my-entry") as dsb: for i in range(10): dsb.add_dict({ "id": i, "column_s" : "lorem ipsum", "column_i": 1234.56, "column_d": "2022-04-01", }) dlk.upload(dsb.path, "storename", "my-entry") """ self._datalake = datalake self._catalog_key = key self._catalog_entry = datalake.get_entry(key) self._typing = [item["type"] for item in self._catalog_entry["columns"]] self._header = [item["name"] for item in self._catalog_entry["columns"]] self._managed_path = path is None self._path = path if not self._managed_path else self._new_temp_file() self._lang = lang self._date_formats = ( date_formats if date_formats is not None else [STANDARD_DATE_FORMAT, STANDARD_TIME_FORMAT, STANDARD_DATETIME_FORMAT] ) self._ciphered = [] for item in self._catalog_entry["columns"]: if "gdpr" not in item or "pii" not in item["gdpr"]: self._ciphered.append(False) else: self._ciphered.append(ciphered and item["gdpr"]["pii"]) self._row_count = 0 def __del__(self): if self._managed_path: os.remove(self._path) def __enter__(self): self._file = open(self._path, "w", encoding="utf-8", newline="") self._writer = csv.writer(self._file, self._datalake.csv_dialect) self._writer.writerow(self._header) return self def __exit__(self, exc_type, exc_value, traceback): self._file.close() return False def _new_temp_file(self): (temp_file, temp_path) = mkstemp(prefix=f"datalake-dataset_", suffix=".csv") os.close(temp_file) return temp_path @property def path(self): """The path to the local file""" return self._path @property def row_count(self): """The number of rows appended in the dataset""" return self._row_count
[docs] def new_dict(self): """Returns an empty row as ``dict``""" return {name: None for name in self._header}
def _add(self, row): self._writer.writerow(row) self._row_count += 1
[docs] def add_dict(self, row): """Appends a row in the dataset Args: row (dict): a row in key/value pairs """ if row.keys() ^ set(self._header): raise ValueError(f"Row has not the expected column count {len(row.keys())}/{len(self._header)}") seq = [] for idx in range(len(self._header)): seq.append(row[self._header[idx]]) self.add_sequence(seq)
[docs] def add_sequence(self, row): """Appends a row in the dataset Args: row (list): a row sequence of values """ if len(row) != len(self._typing): raise ValueError(f"Row has not the expected column count {len(row)}/{len(self._typing)}") typed_row = [] for idx in range(len(row)): typed_value = row[idx] if not self._ciphered[idx]: if isinstance(typed_value, str): typed_value = re.sub(r"\s", " ", typed_value).strip() if typed_value is not None and typed_value != "": if self._typing[idx] == "date": typed_value = cast_date(typed_value, self._date_formats) elif self._typing[idx] == "time": typed_value = cast_time(typed_value, self._date_formats) elif self._typing[idx] == "datetime": typed_value = cast_datetime(typed_value, self._date_formats) elif self._typing[idx] in ("number", "decimal"): typed_value = cast_float(typed_value, self._lang) elif self._typing[idx] == "integer": typed_value = cast_integer(typed_value, self._lang) typed_row.append(typed_value) self._add(typed_row)
[docs]class DatasetReader: def __init__(self, datalake, store, key, path_params=None, ciphered=False): """Reads a CSV dataset from a store. No file is downloaded and data is streamed when using the iterators. Args: datalake (Datalake): a datalake instance store (str): the name of the store to read the dataset from key (str): the catalog entry identifier for which a dataset is read path_params (dict): the entry path placeholders used to find a specific dataset ciphered (bool): Indicates whether the dataset has pseudonymized values or not. Defaults to ``False`` Example: Count the number of rows in a dataset:: dsr = DatasetReader(dlk, "storename", "my-entry") count = 0 for item in dsr.iter_list(): count += 1 print(f"Found {count} rows") """ self._datalake = datalake self._storage, self._path = self._datalake.get_entry_path_resolved(store, key, path_params, strict=True) self._catalog_key = key self._catalog_entry = datalake.get_entry(key) self._typing = [item["type"] for item in self._catalog_entry["columns"]] self._header = [item["name"] for item in self._catalog_entry["columns"]] self._ciphered = [] for item in self._catalog_entry["columns"]: if "gdpr" not in item or "pii" not in item["gdpr"]: self._ciphered.append(False) else: self._ciphered.append(ciphered and item["gdpr"]["pii"])
[docs] def iter_list(self): """Returns an iterator of ``list`` for each row""" reader = csv.reader(self._storage.stream(self._path), self._datalake.csv_dialect) row_count = 0 for row in reader: row_count += 1 if row_count <= 1: continue typed_row = [] for idx in range(len(row)): typed_value = row[idx] if not self._ciphered[idx]: if typed_value is not None and typed_value != "": if self._typing[idx] in ("number", "decimal"): typed_value = cast_float(typed_value) elif self._typing[idx] == "integer": typed_value = cast_integer(typed_value) typed_row.append(typed_value) yield typed_row
[docs] def iter_dict(self): """Returns an iterator of ``dict`` for each row""" for row in self.iter_list(): yield {self._header[idx]: value for idx, value in enumerate(row)}
[docs] def dataframe(self): """Returns a pandas DataFrame Raises: RuntimeError: if pandas package is not installed """ if not PANDAS_AVAILABLE: # pragma: no cover raise RuntimeError("pandas package is missing") return pandas.DataFrame(self.iter_list(), columns=self._header)