Streaming Data Pipelines using Kafka connect

Maha Amer
5 min readJun 23, 2020

Are the messages produced by one part of an application to Apache Kafka and consumed by another part of that same application, Other applications aren’t interested in these messages?

let’s imagine that The distributing data sets that are computed these massage may need these messages into an external system or push it from external service.

So having a framework for moving data between streaming data pipelines and external systems such as databases, key-value stores, and file systems is a challenge to make this process even easier for common use cases.

Here comes Kafka Connect, a framework to ease ingest data from an external database into Kafka or from Kafka to the external system.

Kafka Connect API and Kafka Connectors

Kafka Connect API is a core component of Apache Kafka, introduced in version 0.9. It provides scalable and resilient integration between Kafka and other systems.

Kafka Connect can be run as a clustered process across multiple nodes and handles all the tricky business of integration, including:

  • Scaleout of ingest and egress across nodes for greater throughput
  • Automatic restart and failover of tasks in the event of node failure
  • Automatic offset management
  • Automatic preservation of source data schema
  • Utilization of data’s schema to create target objects (e.g. Hive tables when streaming to HDFS, RDBMS tables when streaming to a database)
  • Single Message Transformations

To use Kafka Connect you simply need a plugin (Connector) that integrates with the technology that you’re interested in. The Confluent Platform has several of these plugins (Connectors), including JDBC, HDFS, Elasticsearch, and S3. You can create custom plugins (Connectors).

Connectors come in two flavors:

  • Source import data from another system (e.g. JDBCSourceConnector would import a relational database into Kafka)
  • Sink export data (e.g. S3SinkConnector would export the contents of a Kafka topic to an S3 bucket).

Connectors do not perform any data copying themselves: their configuration describes the data to be copied, and the Connector is responsible for breaking that job into a set of tasks that can be distributed to workers.

These Tasks also each Task must copy its subset of the data to or from Kafka.

Connect has a REST layer that provides a set of APIs to enable administration. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing the configuration and restarting tasks).

When a connector is first submitted to the cluster, a rebalance is triggered between the Connect workers in order to distribute the load that consists of the tasks of the new connector, also when connectors increase or decrease the number of tasks they require.

Kafka Connect Modes

Kafka Connect currently supports two modes of execution:

  • standalone (single process).
  • distributed.

In standalone mode, all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:

> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]

The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by config/server.properties. It will require tweaking to use with a different configuration or production deployment. The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads).

Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:

> bin/connect-distributed.sh config/connect-distributed.properties

The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets. In particular, the following configuration parameters are critical to set before starting your cluster:

  • group.id (default connect-cluster) — a unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs
  • config.storage.topic (default connect-configs) — topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic
  • offset.storage.topic (default connect-offsets) — topic to use for; this topic should have many partitions and be replicated

Install Kafka Connect in Kubernetes with Helm

To add the chart for your local client, run helm repo add:

$ helm repo add myrepo https://mahagamal.github.io/helm-kafka-connect/
“myrepo” has been added to your repositories

Then install chart:

$ helm install --name my-connect --namespace kafka-connect myrepo/kafka-connect --set KafkaBootstrapServers=??

Steps to creating Kafka S3 connectors with the Kafka Connect APIs.

On the cloud side, first, create an S3 bucket with the appropriate permissions. For instance, After setting permissions, I just need to pick a name and a region for my S3 bucket.

For the S3 connector to authenticate successfully when it contacts S3, I need to setup AWS credentials. An easy way to do that is to add credentials in ~/.aws/credential file or export two environment variables:

export AWS_ACCESS_KEY_ID=AKI….

export AWS_SECRET_ACCESS_KEY=SECRET..

For those who need to customize authentication even further, the S3 connector accepts a provider class as a configuration property that, in turn, can be configured with additional properties with the s3.credentials.provider. prefix. For a complete list of options, read more in the S3 connector documentation

Configure the S3 connector by inserting its properties in JSON format, and store them in a file called sink-s3-example.json:

https://github.com/MahaGamal/helm-kafka-connect
curl -s -H “Content-Type: application/json” -X POST -d @sink-s3-example.json http://localhost:8083/connectors/ | jq .

In Kubernetes

kubectl exec -ti $(kubectl get pod -l app=my-connect-cluster -o=jsonpath=’{.items[0].metadata.name}’) — curl -X POST -H “Content-Type: application/json” -d @sink-s3-example.json http://localhost:8083/connectors

Learning more

Recommended resources if you’d like to learn more about the Kafka connect:

--

--

Maha Amer

I am passionate about Cloud Computing development, I've been working as Site Reliability Engineer and architect/design SWVL/Capiter infrastructure system