Safdar Husain

Big Data Integration


Big Data Pipeline Part 2: Akka+Kafka+Cassandra

In Part-1 we created a "Data processing Pipeline" wherein data stream was generated and flowed through multiple stages before finally getting printed on the screen.  In this blog we will replace printSink with CassandraSink to store Data in NoSQL database. We will introduce another component of SMACK stack  i.e. Cassandra in the existing pipeline2.


Cassandra connectivity-

  • Application: It provides Java Driver to connect with Cluster
  • Reactive: It does not provide any connector to achieve reactive connectivity. Alpakka project provides a connector to establish reactive connectivity between Akka and Cassandra and we will use this connector to achieve create CassandraSink.
To enable us to introduce Cassandra and Alpakka in our application we need to add below Dependencies in the SBT file.

libraryDependencies ++=  "com.datastax.cassandra" % "cassandra-driver-core" % "3.3.2",
    			"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.15"

  • Install Cassandra (for simplicity reason we add localhost node to the cluster).
  • create a TestKeySpace and a User table in it.

create keyspace TestKeySpace 
with replication = {
	'class': 'SimpleStrategy',
	'replication_factor':1
};


create table User( user_id INT, first_name TEXT, last_name TEXT, PRIMARY KEY (user_id));


 
  • SafAkkaKafka.scala file shall be modified to include cassandaSink in Akka's RunnableGraph.

import akka.stream.alpakka.cassandra.scaladsl._

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import scala.collection.JavaConverters._ 

def main(args: Array[String]): Unit = { implicit val session = Cluster.builder .addContactPoint("127.0.0.1") .withPort(9042) .build .connect() val preparedStatement = session.prepare(s"INSERT INTO TestKeySpace.user(user_id,first_name,last_name) VALUES (?,?,?)") val statementBinder = (user: User, statement: PreparedStatement) => statement.bind().setInt(0,user.id).setString(1,"Hakim").setString(2,"kk") ...
...
...
...
...
val cassandraSink = CassandraSink[SafAkkaKafka.User](parallelism = 2, preparedStatement, statementBinder)
...
...
...
...
...
tickSource ~> mapToByteArray ~> mapToProducerRecord ~> kafkaSink kafkaSource ~> mapFromConsumerRecord ~> mapFromByteArray ~> cassandraSink

The above code can be explained by dissecting it into various parts.
    • Sesssion- A Session is created with Cassandra cluster (running on 127.0.0.1) in implicit scope.
    • PreparedStatement- A Cassandra preparedStatement is declared with blank values. This statement is sent by driver to cluster for preCompilation and is stored at cluster with unique ID.
    • Binder function- Binder function is declared to bind the preparedStatement unknown values with the actual values received from Akka Stream.
    • CassandraSink- CassdandraSink is declared by passing preparedStatement and binder function.
    • Graph- CassandraSink is used in RunnableGraph to achieve the full pipeline.

Conclusion

In this second part of the series, we have examined how data flows from two pipelines and ended up in Cassandra. We have used Alpakka connector to implement the Reactive stream concept of Cassandra.

Please do let me know if this article was helpful to you or if something is not clear to you.