Picture

About King Context.

In short: on context, (streaming) (big) data & information architecture.

More info soon. Someday.

Connect Kafka to S3

I already wrote about the Kafka Connect HDFS Connector to connect Kafka to HDFS. Up next: offloading data from Kafka to S3. For those of you that don’t want to run a HDFS cluster.

The Kafka Connect HDFS Connector should theoretically allow to simply connect to S3 as well (via the S3 Hadoop file system), but soon it became clear it’s not working out of the box. The main problem is the connector updating a WAL co-located with the data on S3. But other than regular HDFS, the S3 Hadoop file system doesn’t support appending to existing files.

Below you’ll read about the steps involved to get it to work, but please keep in mind that I’m still in a test phase. If you’re up for it, do test it yourself and let me know your findings.

Fingers crossed & good luck!


First, familiarize yourself with kafka-connect-hdfs. Install the confluent platform (refered to as $CONFLUENT_HOME) and hadoop (refered to as $HADOOP_HOME).

After that, clone and build this kafka-connect-hdfs fork. To do so, you’ll first have to clone and build (mvn install) some dependencies first: schema-registry, common and rest-utils. I’ll refer to the kafka-connect-hdfs fork directory as $CONNECT_FORK.

Install the kafka-connect-hdfs fork and install the S3 libraries as a connector kafka-connect-s3 (they’re part of the hadoop installation):

rm -R $CONFLUENT_HOME/share/java/kafka-connect-hdfs
cp -R $CONNECT_FORK/target/kafka-connect-hdfs-2.1.0-SNAPSHOT-package/share/java/kafka-connect-hdfs $CONFLUENT_HOME/share/java
mkdir $CONFLUENT_HOME/share/java/kafka-connect-s3
cp $HADOOP_HOME/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar $HADOOP_HOME/share/hadoop/tools/lib/jets3t-0.9.0.jar $CONFLUENT_HOME/share/java/kafka-connect-s3/

Create a hadoop configuration folder somewhere (refered to as $HADOOP_CONF). In it, create a file core-site.xml with the following content (substitute your AWS keys and optionally ajust the endpoint):

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <property>
    <name>dfs.support.append</name>
    <value>false</value>
  </property>
  <property>
    <name>fs.s3a.endpoint</name>
    <value>s3-eu-west-1.amazonaws.com</value>
  </property>
  <property>
    <name>fs.s3a.access.key</name>
    <value>your-access-key</value>
  </property>
  <property>
    <name>fs.s3a.secret.key</name>
    <value>your-secret-key</value>
  </property>
</configuration>

Edit $CONFLUENT_HOME/etc/kafka-connect-hdfs/quickstart-hdfs.properties so it looks like this (substitute $HADOOP_HOME and $HADOOP_CONF):

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_topic
hdfs.url=s3a://kafka-connect
flush.size=3
hive.integration=false
hadoop.home=$HADOOP_HOME
hadoop.conf.dir=$HADOOP_CONF
logs.dir=log

To test, first start the Confluent platform components (zookeeper, Kafka and schema-registry). We’ll use the default connection settings, this assumes a default confluent install on localhost. Create a topic test-topic and generate some data you’ll want to offload to S3. Also create a S3 bucket named kafka-connect.

We’re ready now to start the kafka-s3-connector. In $CONFLUENT_HOME execute

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

and verify output is created in your S3 bucket.