Datalake Ingestion

Getting started

Install the library with PIP

pip install datalake-ingestion

A basic configuration for collecting a file looks like this:

from datalake_ingestion.collect import Collector, validate_config
from datalake import Datalake
import yaml

dlk = Datalake()
# Get the bucket name and file path from the alias
bucket, path, _ = dlk.resolve_path("landing", "path/to/my_file.csv")

# Load a collector config file
collect_config = yaml.safe_load(config_file)
validate_config(collect_config)

# Run the collector
collector = Collector(dlk, collect_config)
collector.process(dlk.get_storage(bucket), path)

An AWS S3 event-driven collector would look like this:

from datalake_ingestion.collect import Collector, validate_config
from datalake import Datalake
from datalake.provider.aws import StorageEvents
import yaml

# Load a collector config file
collect_config = yaml.safe_load(config_file)
validate_config(collect_config)

# Run a collector in a SQS consumer
collector = Collector(Datalake(), collect_config)
collect = StorageEvents("sqs-for-s3-events", collector)
collect.daemon()