Safdar Husain

Big Data Integration


Akka Scrap book -- Part-1

In this blog we will explore various ways Akka Source , Sink and Streams can be formulated.

  • A Source is a set of stream processing steps that has one open output. It can comprise any number of internal sources and transformations that are wired together, or it can be an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into a Reactive Streams Publisher (at least conceptually). In other words Source is a partial graph with no input and exactly one output, that is it returns a SourceShape.
Different ways a source can be created are shown below-



// Create a source from an Iterable
val source = Source(List(1, 2, 3))
val source1 = Source(1 to 100000)

// Create a source from a single element
Source.single("only one element")

// an empty source
Source.empty

// repteat unlimited times
Source.repeat("Multiple times")

//Synchronously executing up to eight Futures in the main thread (Blocking)
Source(1L to 10000000L)
.mapAsync(8)(_ => Future.successful("abc123"))
.runWith(Sink.ignore)

//ASynchronously executing up to eight Futures in the main thread (non-Blocking)
Source(1L to 10000000L)
.mapAsync(8)(_ => Future("abc123"))
.runWith(Sink.ignore)

// specify an iterator
val primeSource: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.continually(Random.nextInt(1000000)))

// another iterator implementation
Source.fromIterator { () =>
  new Iterator[Int] {
   override def hasNext: Boolean = true
   override def next(): Int = Random.nextInt(1000000)
  }
}

// Create a source by generating a reocrd at a regular interval  infinitely
def source11 =  Source(initialDelay = 3.second, interval = 3.second, Tick())

// Create source from Scala Stream
def numsFrom (n :Int):Stream[Int]  = Stream.cons(n,numsFrom2 (n+1))
lazy val P = numsFrom(0)
val source12 = Source(P)
  • A Sink is a set of stream processing stage that has no output with exactly one input. It requests and accepts data elements from upstream producers and possibly slow them down by applying "Back Pressure".


// Sink that folds over the stream and returns a Future of the final result as its materialized value
Sink.fold[Int, Int](0)(_ + _)
// Sink that returns a Future as its materialized value, containing the first element of the stream
Sink.head
// A Sink that consumes a stream without doing anything with the elements
Sink.ignore
// A Sink that executes a side-effecting call for every element of the stream
Sink.foreach[String](println(_))


 
  • A flow  is a processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.


// A flow to apply backpressure to upstream in case rate goes beyond 10 elements
val flow = Flow[Int].buffer(10,OverflowStrategy.backpressure).fold(0)(_ + _)

// A flow to fold Data infinitely (without any sliding/grouping), it will block processing
val flow1 = Flow[Int].fold(0)(_ + _)

// A flow to map data from input stream to generate output stream
val flow2 = Flow[Int].map(_ + 1)

// combine flow with sink to produce another sink
val sink = flow.to(Sink.foreach[Int](println(_)))



 
  • A Graph  is the tiniest Unit in Akka Streams. Everything is a graph which exhibit a shape (which contains in/out ports). It can combines inputs (sources), flows, or outputs (sinks) and can be partial (still exposing opened inputs or outputs) or closed (self-sufficient graph, no input, no output). A graph is a whole set of Outlet/FlowShape/Inlet linked together, that is closed. A Closed Graph exhibits a blueprint which can be materialized when required. A graph blueprint can be materialized multiple times (in which case it will require multiple resources).

// this is a Graph that can be run
Source.fromFuture(Future { 2 })
    .via(Flow[Int].map(_ * 2))
    .to(Sink.foreach(println))

// Grouping performed on the stream element befrore passing it through flow (flow will get a grouped data to operate)
source.grouped(1000).via(flow).runWith(Sink.foreach[Int](println(_)))

// source generates huge data at fast rate and flow creates a group of 10 records before adding them and emits a stream of added integers.
// Down the line it throttles up stream to slow it down before printing it in sink
 Source.tick(0 milliseconds, 10 milliseconds, ()) .map(_ => 1)
      .via (Flow[Int].grouped(10)).map(x => println(x.sum))
      .throttle(elements = 1, per = 5 second, maximumBurst = 1, mode = ThrottleMode.shaping)
      .runWith(Sink.foreach(println))

// In this case source is generating a group of 10 integers before passing it into Flow
 Source.tick(100 milliseconds, 100 milliseconds, ()) .map(_ => 1)
      .grouped(10)
      .map(x => println(x.sum))
      .runWith(Sink.ignore)

// In this scenario source is waiting 1000 milliseconds to generate 20 records, in case it couldnt get 20 records it will emit whatever records it gets in that time.
Source.tick(100 milliseconds, 100 milliseconds, ()) .map(_ => 1)
      .groupedWithin(20, 1000 milliseconds)
      .map(x => println(x.sum))
      .runWith(Sink.ignore)

// Here source is sliding into windows to gather records and scoops whatever it finds in that window.
Source(1 to 1000)
      .sliding(5,3)
      .map(x => println(x.sum))
      .runWith(Sink.ignore)

// Here flow groups records in batch of 10
Source.tick(100 milliseconds, 100 milliseconds, ()) .map(_ => 1)
      .via (Flow[Int].grouped(10)).map(x => println(x.sum))
      .runWith(Sink.ignore)

// Syncrhonouse model where elements flow from origin to destination one by one, it uses single thread .. there is no boundaries.
     Source(1 to 100)
    .map { i => println(s"A: $i"); i }
    .map { i => println(s"B: $i"); i }
    .map { i => println(s"C: $i"); i }
    .runWith(Sink.ignore)

// ASyncrhonouse model where elements there are 3 boundaries and elements ar processed in 3 different threads.
Source(1 to 100)
    .map { i => println(s"A: $i"); i }.async
    .map { i => println(s"B: $i"); i }.async
    .map { i => println(s"C: $i"); i }.async
    .runWith(Sink.ignore)

// Demonstrate single thread synchronous processing
val completion = Source.single("Hello Stream World!\n")
     .map { s ⇒ println(Thread.currentThread().getName() + " " + s); s }
     .map { s ⇒ println(Thread.currentThread().getName() + " " + s); s }
     .runWith(Sink.foreach(s ⇒ println(Thread.currentThread().getName + " " + s)))

    completion.onComplete(_ => system.terminate())


// Demonstrate a processing stage (flow performing some operation ,, and see it is happening in different threads.
def processingStage(name: String): Flow[String, String, NotUsed] =
   Flow[String].map { s ⇒
     println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
     Thread.sleep(100) // Simulate long processing *don't sleep in your real code!*
     println(name + " finished processing " + s)
     s
   }

    val completion = Source(List("Hello", "Streams", "World!"))
     .via(processingStage("A")).async
     .via(processingStage("B")).async
     .via(processingStage("C")).async
     .runWith(Sink.foreach(s ⇒ println("Got output " + s)))

    completion.onComplete(_ => system.terminate())

// creating stream of stream using mapconcat
val source = Source(List(List(1,2,3), List(4,5,6)))
      val sink = Sink.foreach[Int](println)

      val graph = source.mapConcat(identity).to(sink)
      graph.run


// Framing concept- grouping occurs when a delimiter is found in the stream and another group is created.
    val data = ByteString("""Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.
        Lorem Ipsum is simply.BYE Dummy text of the printing.And typesetting industry.""")

   Source(List(data))
    .via(Framing.delimiter(ByteString("."), Int.MaxValue))
    .map(_.utf8String)
    .runForeach(println)
    .onComplete(_ => system.terminate())


 // Explainng File IO
   Source(List(data)).via(Framing.delimiter(ByteString("."), maximumFrameLength = 256))
    .runWith(FileIO.toPath(new File("example.out").toPath)) 


 

Conclusion

In this first part of the Scrap Book Series, we have examined various ways Akka Soruce , flows , Sinks and Streams can be created. In next part of this Scrap Book Series we will different ways Sync and Async boundaries of the Akka Flow.

Please do let me know if this article was helpful to you or if something is not clear to you.