Apache Kafka installation and integration with Databricks platform

© Mariusz Rafało

This document specifies basic Apache Kafka configuration and consumer/producer implementation. The document does not cover setting up an AWS machine nor AWS VPC network configuration. However, references to tutorials covering theses issues have been mentioned below.

Setting up AWS EC2 machine

  1. Set up AWS EC2 machine on Amazon Linux OS. Select Amazon Linux 2 AMI 2.0.20181024 x86_64 HVM gp2.
  2. Configure ssh access to your machine via private key. Run ssh client (preferably multi tabbed client, like MTPuTTY or CMDER).
  3. In general, at least t2.medium server is required, as Kafka requires at least 4 GB of RAM. However is is possible to run Kafka on t2.micro instance, but this requires changes in zookeeper and kafka executable scripts. In order to do it, you need to adjust the JVM heap size by editing kafka-server-start.sh and zookeeper-server-start.sh (after complete kafka instalation). Change section:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

to:

export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

The -Xms parameter specifies the minimum heap size and -Xmx parametrer changes maximum heap size. Confiuration above, changes it to 256M and 128M respectively.

  1. Assign elastic IP to your machine. You can find information about elastic IP's here.

  2. Configure security groups. You need to configure network in order to open port 9092 (Kafka default port). You can find instructions here.

  3. Install Java JDK using this tutorial.

  4. Open port 9092 on server firewall:

iptables -A INPUT -i eth0 -p tcp --dport 9092 -j ACCEPT

Kafka installation

  1. Create user kafka that will be used to host and run server:
sudo useradd kafka -m
  1. Add user kafka to wheel group, so that it has sufficent privileges to install packages and dependencies:
sudo usermod -aG wheel kafka
  1. Login to user kafka
sudo su kafka
  1. Select appropriate kafka version from here and download it:
cd ~ wget "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz"
  1. Create folder kafka and unzip downloaded archive into the folder:
mkdir kafka cd kafka tar -xvzf ~/kafka_2.11-2.1.1.tgz --strip 1
  1. Configure network in config/server.properties file. This configurations is needed, as we need to send and read Kafka messages from ouside AWS network:
vim ~/kafka/config/server.properties

Add following line to the file (at the end):

listeners=PLAINTEXT://xx.xx.xx.xx:9092

Where xx.xx.xx.xx is Public DNS (IPv4) assigned to EC2 machine (full DNS address, not IP, e.g.: ec2-15-127-209-111.eu-central-1.compute.amazonaws.com).

  1. Logout from user kafka using exit command.

  2. Configure zookeepeer service

    1. Create file \etc\init.d\zookeer. Assign permissions, so that user kafka has access to the file.
    cd \etc\init.d\ sudo vim zookeeper sudo chmod u=rwx,g=rx,o=xr zookeeper
    1. Create zookeeper log folder:
    mkdir /var/log/zookeeper sudo chown -R kafka /var/log/zookeeper/
    1. Assing folder permissions:
    sudo chmod u=rwx,g=rx,o=xr zookeeper
  3. Configure kafka service

    1. Create file \etc\init.d\kafka. Assign permissions, so that user kafka has access to the file.
    cd \etc\init.d\ sudo vim zookeeper sudo chmod u=rwx,g=rx,o=xr zookeeper
    1. Create kafka log folder:
    mkdir /var/log/kafka sudo chown -R kafka /var/log/kafka/
    1. Assing folder permissions:
    sudo chmod u=rwx,g=rx,o=xr kafka
  4. Start zookeeper service

sudo service zookeeper start sudo service zookeeper status

Exected output:

zookeeper is running (pid 2646)
  1. Start kafka service
sudo service kafka start sudo service kafka status

Expected output:

kafka is running (pid 2649)

Basic terms

Apache Kafka uses some common terminology. It is useful to define common concepts:

Topic is a feed channel where messages are being broadcasted

Producer process that sends messages to specific topic

Consumer: process that subscribes to specific topic and retrieves messages

Partition a group of topics configured for scalability

Broker an element of physical Kafka cluster (single node)

Kafka configuration

  1. Create Kafka topic called openTopic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic openTopic
  1. Verify topic creation by listing all Kafka topics:
bin/kafka-topics.sh --zookeeper localhost:2181 --list

On the list above, you should be able to see previously created topic, named openTopic:

__consumer_offsets openTopic

Test Kafka environment locally

To test overall Kafka configuration and installation, do following steps:

Login to AWS server using SSH client twice: in two windows or tabs. Login to kafka user:

sudo su kafka

In first window run simple Kafka producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic openTopic

This scripts activates input from keyboard, so whatever you type here is sent to Kafka topic openTopic.

In the second window run Kafka consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic openTopic --from-beginning

Now, every message you type in producer window, should be visible on consumer window.

Python Kafka producer

You can also use producer written in Python. Python code should be run from Databricks Spark cluster, with following specification: Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 3.

To send and receice messages from kafka cluster in Python, you need to use kafka-python library. Note, that previos library: kafka is now depraciated and may not work with new versions of kafka.

Simple producer below, sends lorem ipsum dolor message to topic openTopic.

from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:9092']) msg = "lorem ipsum dolor" producer.send('openTopic',msg).get(timeout=30)

Where xx.xx.xx.xx is Public DNS (IPv4) assigned to EC2 machine

This producer opens file police-department-incidents.csv and sends it to Kafka with the speed of 1 record per 5 seconds.

import sys, csv import threading, logging, time import datetime from kafka import KafkaProducer policeData = csv.reader(open("police-department-incidents.csv"), delimiter=",") kafkaTopic = 'openTopic' producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:9092']) for IncidntNum, Category, Descript, DayOfWeek, Date, Time, PdDistrict, Resolution, Address, X, Y, Location, PdId in policeData: if IncidntNum !='IncidntNum': producer.send(kafkaTopic, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+";"+IncidntNum +";"+ Category + ";" + Descript + ";" + DayOfWeek +";" + Date +";"+ Time +";" + X +";"+ Y).get(timeout=30) time.sleep(5)

Where xx.xx.xx.xx is Public DNS (IPv4) assigned to EC2 machine

Python Kafka consumer

You can also use consumer written in Python. Python code should be run from Databricks Spark cluster, with following specification:

Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 2.

Simple consumer below, sends receives message from topic openTopic every 4 seconds.

from kafka import KafkaConsumer import sys import time consumer = KafkaConsumer( 'openTopic', bootstrap_servers=['xx.xx.xx.xx:9092']) for message in consumer: print(message.value) time.sleep(4)

Where xx.xx.xx.xx is Public DNS (IPv4) assigned to EC2 machine

Troubleshooting

Common issues that may occur during Kafka setup and execution. Note that, the solutions to these issues are mentioned above.

Error Cannot allocate memory while starting kafka server

Possible cause: JVM allocates insuficient memory to run kafka. You need to change KAFKA_HEAP_OPTS parameter in kafka-server-start.sh file.

Local Kafka producer does not deliver messages to Kafka

Possible cause: Kafka port 9092 is not open.

Remote Kafka producer does not deliver messages to Kafka

Possible cause: Assuming that Kafka port 9092 is open, check Kafka configuration file server.properties for listeners specifications.

Some producers deliver data to Kafka, some not

Possible cause: Assuming that:

  1. Kafka port 9092 is open
  2. Configuration of listeners is OK

The cause may be in Kafka timeout configuration. You may need to set timeout=30 explicitly in producer.