s

Safdar Husain

Big Data Integration


Big Data Pipeline Part 1: Akka+Kafka

SMACK Stack (Spark, Mesos, Akka, Cassandra and Kafka) is gaining prominence in Big Data processing and each technology under this umbrella plays a unique role in Data crunching. In the first blog of the series our aim is to build a Data pipeline and explore the role of each components involved in the stack. The idea behind developing this application is to explore Akka's "Reactive Streams Design" which allows it to inter operate with other Reactive applications (here Kafka and Cassandra) for seamless flow of unbounded streams.

To make my task easier I will dissect the stack in three parts and will cover all the parts in three different blogs.

  • Part 1 This part will introduce the design of a data pipeline we will implement throughout the course of this blog series. We will start with two primary components (i.e. Apache Akka and Apache Kafka) and explore the ways these components are used to control Data Flow (using throttling, Grouping  etc ..)
  • Part 2  This part will introduce another component of SMACK stack wherein Cassandra will be added in the pipeline developed in Part 1. The purpose of Cassandra is to Store Data Stream for Report generation and analytics.
  • Part 3  This part will conclude our application implementation with the introduction of Spark for Data processing, analytics and Report generation.

Application Design

Data pipeline is made up of three inherent components; Source, Flows and Sinks.
Source represents unbounded "Data Source" like tweets, logs, Internet of Things etc , whereas Flows represents "Data Transformation" stage (convert data from one format to another). Sink phase is the culmination of Data pipeline where transformed Data is Stored for subsequent processing or report generation.

In this blog we will design two self explanatory pipelines involving two technologies (Akka Stream and Apache Kafka).
Pipeline1:  For simplicity of our application design we will use Akka's "Ticking Source" to generate unbounded infinite Data stream at a very fast rate. Stream will be passed throw a Flow to convert Data element into Avro Format which will then go through another Flow for "Kafka Object" transformation before sinking into the Kafka (where it will be stored).
Pipeline2: This pipeline will use "Kafka Subscriber" as a Source to pull Data from Kafka partition where the pipeline1 sinks its data. Subsequently, Data will passed through an Akka Flow to transform data into"Kafka Object" and later through Avro Format (to generate User Data) before printing on the screen. This pipeline runs very slow compared to pipeline1 (by frequency of less than 100 times) and that's where Kafka comes for rescue.


Kafka Background

In production environments Kafka is mainly used as "Pipeline boundary" between multiple pipelines to allow the flexibility to store Data as a placeholder (to prevent Data loss and serve it on demand) when different pipelines executes at a different processing rate. Kafka provides various flavors of connectivity with third party systems via below API's.

Publisher API : This API can be used by a third party application to produce data and push into  Kafka cluster and is executed into the clients memory/space.


Subscriber API :  This API can be used by a third party application to retrieve data from Kafka cluster and is executed into the clients memory/space.


Connect API : Produce Data at a high rate to sink in Kafka and for producer to retrieve it later from Kafka to process it. This API allows Kafka to be used as a buffer storage so that fast  generating source can store data in Kafka while slow processing consumer can retrieve data at its convenience without worrying for Data loss.


Stream API : Retrieve input stream from kafka topic and process it to generate result stream and divert it to result Kafka Topic. This code executes at client Application.

Reactive Kafka : Apache Kafka Doesn't provide Reactive APIs and it would be nice to access Kafka with reactive API.

  • Reactive-kafka: This is developed by Softwaremill to allow a seamless integration of Kafka with other Reactive systems in the form of streams.
  • Akka DSL: Akka provides "Kafka DSL" to enable seamless Akka-Kafka integration. We will use Akka DSL in our sample application.

Creation of Data PipeLine

Our application uses kafka as a glue to join two different "Data Pipelines" running at varying speed. Pipeline1 runs 100 times faster than pipeline2 and Kafka act as a placeholder for data generated by pipeline1. In our application "Kafka Sink" of Pipeline1 act as a source of Pipeline2.
We will generate Data Stream,  serialize it using AVRO and propagate it to Kafka and the stream will cross multiple hosts and application  boundaries while going through AVRO Serialization/Deserialization and ultimately getting consumed by Akka Sink (where we shall print it; whereas in real world Data will be thrown to Apache Spark for Analysis).To enable to run our application we need to perform some steps:-

  • Kafka needs to be installed on a single or multiple nodes. For simplicity we shall install on a single node and run broker on localhost:9092.
  • Install Scala 2.12 and Akka 2.5.6.
  • SBT files with all package dependencies have to be setup.


enablePlugins(JavaServerAppPackaging)

name := "AkkaKafka"

version := "1.0"

organization := "com.saf"

libraryDependencies ++= {
  val akkaVersion = "2.5.6"
  Seq(
    "com.typesafe.akka" %% "akka-actor"      % "2.4.12",
    "com.typesafe.akka" %% "akka-http-core"  % "10.0.0",
    "com.typesafe.akka" %% "akka-http"  % "10.0.0",
    "com.typesafe.akka" %% "akka-http-spray-json"  % "10.0.0",
    "com.typesafe.akka" %% "akka-slf4j"      % "2.4.12",
    "ch.qos.logback"    %  "logback-classic" % "1.1.3",
    "com.typesafe.akka" %% "akka-testkit"    % akkaVersion   % "test",
    "org.scalatest"     %% "scalatest"       % "3.0.1"       % "test",
    "com.typesafe.akka" %% "akka-cluster" % "2.5.6",
    "com.typesafe.akka" %% "akka-remote" % "2.5.6",
    "org.apache.kafka" %% "kafka" % "1.0.0",
    "org.apache.kafka" % "kafka-streams" % "0.11.0.0",
    "org.apache.thrift" % "libthrift" % "0.10.0",
    "org.apache.avro"  %  "avro"  %  "1.7.7",
    "com.typesafe.akka" %% "akka-stream-kafka" % "0.18"

  )
}

// Assembly settings
mainClass in assembly := Some("KafkaStream")

assemblyJarName in assembly := "kafkaStream.jar"

  • SafAkkaKafka.scala file has been created with below code in src folder.


import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import akka.stream._
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._


 import org.apache.avro.Schema
 import org.apache.avro.file.{CodecFactory, DataFileWriter}
 import org.apache.avro.specific.{SpecificDatumWriter,SpecificDatumReader}
 import org.apache.avro.generic.{GenericDatumWriter, GenericRecord,GenericData}
 import org.apache.avro.io.EncoderFactory
 import org.apache.avro.io._

import java.io.ByteArrayOutputStream
import scala.util.Randomcase class User(id: Int, name: String, email: Option[String])
 object SafAkkaKafka  {




  	val schema_scheme =  """{
    	"namespace": "kakfa-avro.test",
     	"type": "record",
     	"name": "user",
     	"fields":[
         	{  "name": "id", "type": "int"},
         	{   "name": "name",  "type": "string"},
         	{   "name": "email", "type": ["string", "null"]}
     		]
    	}"""



    def main(args: Array[String]): Unit = {

        implicit val system = ActorSystem("reactive-tweets")
        implicit val ec = system.dispatcher
        implicit val materializer = ActorMaterializer()


      val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
        .withBootstrapServers("localhost:9092")

      val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
        .withBootstrapServers("localhost:9092")
        .withGroupId("group1")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

      val partition = 0
      val subscription = Subscriptions.assignment(new TopicPartition("test", partition))

      val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
          import GraphDSL.Implicits._

          val tickSource = Source.tick(100 milliseconds, 100 milliseconds, "Hello from Akka Streams using Kafka!")
          val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
          val kafkaSink = Producer.plainSink(producerSettings)
          val printlnSink = Sink.foreach(println)

          val mapToByteArray = Flow[String].map(elem => setUser(elem))
          val mapToProducerRecord = Flow[Array[Byte]].map(elem => new ProducerRecord[Array[Byte], Array[Byte]]("test", elem))


          val mapFromByteArray = Flow[Array[Byte]].map(elem => getUser(elem))
          val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], Array[Byte]]].map(record => record.value())
                    .throttle(elements = 1, per = 5 second, maximumBurst = 1, mode = ThrottleMode.shaping)


          tickSource  ~> mapToByteArray ~> mapToProducerRecord   ~> kafkaSink
          kafkaSource ~> mapFromConsumerRecord  ~> mapFromByteArray ~> printlnSink

          ClosedShape
      })

      runnableGraph.run()
    }

    def setUser(name:String) =
    {
      val rnd = Random.nextInt(100)
       val schema: Schema = new Schema.Parser().parse(schema_scheme)
       val genericUser: GenericRecord = new GenericData.Record(schema)
        genericUser.put("id", rnd)
        genericUser.put("name", name)
        genericUser.put("email", "saf@kkk$rnd")

        val writer = new SpecificDatumWriter[GenericRecord](schema)

        val out = new ByteArrayOutputStream()
        val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
        writer.write(genericUser, encoder)
        encoder.flush()
        out.close()
        out.toByteArray()
    }

     def getUser(message: Array[Byte]) = {
       val schema: Schema = new Schema.Parser().parse(schema_scheme)
       // Deserialize and create generic record
       val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
       val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
       val userData: GenericRecord = reader.read(null, decoder)
       // Make user object
       val user = User(userData.get("id").toString.toInt, userData.get("name").toString, try {
       Some(userData.get("email").toString)
       } catch {
         case _ :Throwable => None
       })
       Some(user)
    }
}


 
The above code can be explained by dissecting it into various parts.
    • Stream generation in Akka- To demonstrate unbounded continuous stream generation we are using Akka Source "tickSource" which trickles record in the stream at continuous interval.
    • Stream Serialization- Avro is being used to serialize User Data in byte Format by defining "mapToByteArray" flow.
    • Stream in Kafka format- Serialized byte format is converted into Kafka format by defining "mapToProducerRecord"
    • Push Data to Kafka- Finally Data is pushed into Kafka by defining KafkaSink.
    • Kafka Source- Retrieved from Kafka by defining Source "KafkaSource".
    • Retrieve Serialized Stream from Kafka- Serialized Record (byte format) is retrieved from Kafka by defining Source "mapFromConsumerRecord". To demonstrate "Back pressure" concept we are throttling this flow for 1 record per 5 second while source is pushing data at the rate of 5000 per 5 secs at the middleware (Kafka). Kafka is acting as a gigantic buffer and harvesting these generated messages in its huge cluster of partitions while serving requested numbers to its consumer (Here 1 record per 5 secs).
    • Stream Deserialization- Deserialize record into User object and push it to Akka "mapFromByteArray" flow.
    • Consume Deserialized stream- Process "User record" by Akka by defining a Sink "printlnSink". In real world this would be a SparkSink to push data for analytics.

Conclusion

In this first part of the series, we have examined how data from kafka can be leveraged to act as a stream and can be processed by Akka leveraging its powerful RunnableGraph concept.  We haven’t covered all there is to say about streams and various serialization/deserialization aspects yet and that will be the source of our next discussion.

Please do let me know if this article was helpful to you or if something is not clear to you.