Creating a reliable and scalable custom data ingestion pipeline platform can be a tedious process requiring a lot of manual configuration and coding. To address that, the snapblocs Data Flow platform blueprint makes it easy to create a multi-source, multi-destination ingestion pipeline on Kubernetes.
The goal of this tutorial is to show how easy it is to create data ingestion pipelines using the snapblocs Data Flow platform blueprint. Create data ingestion pipelines within an hour without writing code. The demo includes ingesting Avro data from the Internet to AWS S3 and Elasticsearch using Kafka and StreamSets Data Collector provisioned by the snapblocs Data Flow platform blueprint.
The following related tutorial videos are also available:
During this tutorial, the following applications will be used on Kubernetes stacks provisioned by the snapblocs Data Flow platform blueprint. AWS EKS
Elasticsearch, Logstash, FileBeat, MetricBeat, Kibana
Grafana
Kafka (Kafka brokers, Zookeeper, Kafka Connect, Kafka Connector, Schema Registry, Kafka Dashboard UI)
Kubernetes Dashboard
StreamSets Data Collector
Downloading and manually installing these applications is not required. The snapblocs Data Flow platform blueprint will install and configure them on the specified AWS environment. Access them through the links provided by snapblocs once the application stack is deployed.
Top-level overview of Data Ingestion flow
Use the first (Producer) pipeline to read Avro data from the Web and write records to a Kafka Topic using the StreamSets Kafka Producer. Then use a second, Consumer pipeline configured with a Kafka Consumer to drain that Topic, perform a transformation and send the data to AWS S3 and Elasticsearch destinations.
Note
This tutorial is designed to use both StreamSets Data Collector and Kafka together to ingest data for the purpose of this demo only. Use either one of them or both of them for other data ingestion scenarios based on whatever’s required, such as a drag-and-drop design, handling backpressure, etc.
Other use cases for using snapblocs Data Flow platform blueprint can be found here.
Prerequisites
Deploy Data Flow platform blueprint. The instructions can be found here. Download the preconfigured Producer and Consumer pipeline configuration JSON files provided by snapblocs and located with this video. Step-by-step video
Part 1 - Publishing to a Kafka Producer
In Part 1, set up a pipeline to send Avro data to the Kafka broker.
Access StreamSets Data Collector
Import a pipeline
On the StreamSets Data Collector Home page, click the "IMPORT PIPELINE" button
Enter a title such as, "Producer Pipeline"
Select the downloaded "Producer-Pipeline.json" file to import into StreamSets Data Collector
Defining configuration
On the main StreamSets Pipelines page, select "Producer Pipeline"
All configurations have been set for this tutorial
- Select the HTTP Client processor on the canvas and browse all preconfigured settings such as:
HTTP Tab: Resource URL, Max Batch Size, etc.
Data Format Tab: Data Format, Avro Schema Location, etc.
- Select the Kafka Producer processor on the canvas and browse all preconfigured settings such as:
Kafka Tab: Broker URI, Topic (test_cc_tranx)
Data Format Tab: Data Format, Avro Schema Location, Avro Compression Codec, etc.
Part 2 - Reading from a Kafka Consumer
Part 2 will cover setting up a pipeline to drain data from a Kafka Consumer, making a simple transformation, and then writing the output to AWS S3 and Elasticsearch. And observing the built-in transformation stage which masks the credit card numbers in order to only pass the last 4 digits.
Import a pipeline
On the StreamSets Data Collector Pipeline Home page, click the "+Create New Pipeline" drop-down menu and select "Import Pipeline"
Enter a title such as, "Consumer Pipeline"
Select the downloaded "Consumer-Pipeline.json" file to import into the StreamSets Data Collector
Defining the source
On the main “StreamSets Pipelines” page, select "Consumer Pipeline"
Most of the configurations have been set for this tutorial. (Provide AWS account information to store ingested data to S3. See the details below)
- Select the Kafka Consumer processor on the canvas and browse all preconfigured settings such as:
Kafka Tab: Broker URI, Topic (test_cc_tranx), ZooKeeper URI, Max Batch Size
Data Format: Data Format, Avro Schema Location, etc
- Select the Field Masker processor on the canvas and browse all preconfigured settings such as:
Mask Tab: Fields to Mask, Custom Mask, etc.
Data Format: Data Format, Avro Schema Location, etc.
The Field Masker process masks the credit card number so that the last 4 digits of the card is all that makes it to the data stores
- Select the Amazon S3 processor on the canvas and browse all preconfigured settings such as:
Amazon S3 Tab: Provide the following:
Access Key ID (Provide AWS account Access Key ID)
Secret Access Key (Provide AWS account Secret Access Key)
Region (The AWS Region where the Data Flow stack was deployed)
Bucket (The AWS S3 Bucket must exist where ingested data is to be stored)
Data Format Tab: Data Format, Avro Schema, etc.
- Select the Elasticsearch processor on the canvas and browse all preconfigured settings such as:
Part 3 - Execute the Pipelines
Execute the Producer pipeline
Optionally, start the Producer pipeline after starting the Consumer pipeline in order to simultaneously see the data flow from the Producer to the Consumer
Click the Start icon on the Producer pipeline. The pipeline should start sending data to Kafka.
Click the Summary tab to see the progress of data sent to Kafka
- Watch the Producer Execution
Execute the Consumer pipeline
Click the Start icon on the Consumer pipeline. The pipeline should start consuming data from Kafka.
Click the Summary tab of the Kafka Consumer processor on the canvas to see the progress of data read from Kafka
- Select the Amazon S3 processor on the canvas to see the progress of the data being sent to the S3
- Select the Elasticsearch processor on the canvas to see the progress of data being sent to Elasticsearch
Once all 10,000 records have been processed, stop the Execution
Part 4 - Validate Ingested Data
Validate the Producer execution
Validate Consumer execution
Reference
Avro record format:
{
"transaction_date":"dd/mm/YYYY",
"card_number":"0000-0000-0000-0000",
"card_expiry_date":"mm/YYYY",
"card_security_code":"0000",
"purchase_amount":"$00.00",
"description":"transaction description of the purchase"
}