Datalake Ingestion¶
Getting started¶
Install the library with PIP
pip install datalake-ingestion
A basic configuration for collecting a file looks like this:
from datalake_ingestion.collect import Collector, validate_config
from datalake import Datalake
import yaml
dlk = Datalake()
# Get the bucket name and file path from the alias
bucket, path, _ = dlk.resolve_path("landing", "path/to/my_file.csv")
# Load a collector config file
collect_config = yaml.safe_load(config_file)
validate_config(collect_config)
# Run the collector
collector = Collector(dlk, collect_config)
collector.process(dlk.get_storage(bucket), path)
An AWS S3 event-driven collector would look like this:
from datalake_ingestion.collect import Collector, validate_config
from datalake import Datalake
from datalake.provider.aws import StorageEvents
import yaml
# Load a collector config file
collect_config = yaml.safe_load(config_file)
validate_config(collect_config)
# Run a collector in a SQS consumer
collector = Collector(Datalake(), collect_config)
collect = StorageEvents("sqs-for-s3-events", collector)
collect.daemon()