Source code for datalake.telemetry

from logging import getLogger
import pendulum
from time import perf_counter_ns
from datalake.interface import IMonitor
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS


[docs]class Measurement: # pragma: no cover def __init__(self, name, start_time=None): """Represents a point of measurement consisting in a starting time, a set of measures and a set of labels Args: name (str): the name for the measurement start_time (time): the time when measurement started. Defaults to current UTC time. """ self._name = name self._start = start_time if start_time is not None else pendulum.now("UTC") self._labels = {} self._measures = {"file_count": 1} self.reset_chrono() def __str__(self): return f"Metric '{self.name}' started at {self.start_time} with labels {self.labels} and measures {self.measures}" @property def name(self): """The name of the measurement""" return self._name @property def start_time(self): """The reference starting time for the measurement""" return self._start @start_time.setter def start_time(self, start_time): self._start = start_time @property def labels(self): """The ``dict`` of labels attached with the measurment""" return self._labels @labels.setter def labels(self, labels): if not isinstance(labels, dict): raise ValueError("Labels must be a key/value map") self._labels = labels @property def measures(self): """The ``dict`` of measure values. Defaults to ``{"file_count": 1}``""" return self._measures @measures.setter def measures(self, measures): if not isinstance(measures, dict): raise ValueError("Measures must be a key/value map") self._measures = measures
[docs] def add_measure(self, key, value): """Appends a single measure Args: key (str): the measure name value (double): the measure value """ self._measures[key] = value
[docs] def add_measures(self, measures): """Appends a batch of measures Args: measures (dict): a key pair map of measure names and values """ self._measures.update(measures)
[docs] def add_label(self, key, value): """Appends a single label Args: key (str): the label name value (str): the label value """ self._labels[key] = value
[docs] def add_labels(self, labels): """Appends a batch of labels Args: labels (dict): a key pair map of label names and values """ self._labels.update(labels)
[docs] def reset_chrono(self): """Resets the counter used for evaluating elapsed time""" self._chrono = perf_counter_ns()
[docs] def read_chrono(self): """Returns the elapsed time since last reset or since initialization""" return perf_counter_ns() - self._chrono
class NoMonitor(IMonitor): # pragma: no cover def __init__(self, quiet=True, *args, **kwargs): """A quiet monitoring implementation used for disabling monitoring or for development/testing Args: quiet (bool): Whether metrics are logged or not """ self._quiet = quiet def push(self, metric): """Writes the metric in the logger if **quiet** if set to ``False``""" if not self._quiet: logger = getLogger(__name__).info(metric) class InfluxMonitor(IMonitor): # pragma: no cover """Monitoring with InfluxDB OSS 2.x""" def __init__(self, url, token, org, bucket, *args, **kwargs): self._url = url self._token = token self._org = org self._bucket = bucket def push(self, metric): with InfluxDBClient(self._url, token=self._token, org=self._org) as client: write_api = client.write_api(write_options=SYNCHRONOUS) point = Point(metric.name) point.time(metric.start_time, WritePrecision.NS) for label, value in metric.labels.items(): point.tag(label, value) for field, value in metric.measures.items(): point.field(field, value) write_api.write(self._bucket, self._org, point)