Safdar Husain

Big Data Integration


Saf Scrapbook -- Part 2

Akka Remoting and Akka Cluster

This scrapbook will focus on different ways Akka can be deployed on cluster to extract concurrency, throughput, High availability, fault tolerance, elasticity and location transparency.

To simplify the clustering concepts, we shall run Akka Cluster on a single machine with multiple JVM's. We will write some examples to exhibit Akka Clustering properties and its deployment flavors . All examples in this scrapbook refers to a simple Trivial Actor "TestActor" which will be propagated/deployed to Akka cluster in different flavors.

class TestActor extends Actor with ActorLogging {

	override def receive: Receive = {
	case message:String =>
		println("I am EchoLocal and received ", message)
		println(self.path)
		log.info("$$$I am EchoLocal and received message$$$ ")

	case message:Int =>
		println("I am EchoLocal and received ", message)
		println(self.path)
		log.info("$$$I am EchoLocal and received message$$$ ")
	}
}
Example 1- ActorSystem (Here "TestRouter") will initiate 5 actor instances   ("TestActor") running in the same JVM and will supervise them. To test RoundRobin concept, we will generate 10 messages which will be delivered to 5 instances (two message per instance) of "TestActor" which will print 5 different actor paths confirming different identities of each instance.


val _system = ActorSystem("TestRouter")
val randomRouter = _system.actorOf(Props[TestActor].withRouter(RandomPool(10)), name = "RandomPoolActor")
Thread.sleep(5000)
1 to 10 foreach {
  i => randomRouter ! i
}



 
Example 2- This is similar to Example-1 with a difference in the creation of an ActorSystem which is initialized using configuration file..



RouterExample { akka { actor { deployment { /RandomPoolActor { router = random-pool nr-of-instances = 5 } } } } }



val system = ActorSystem.create("Router", ConfigFactory.load().getConfig("RouterExample")) val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RandomPoolActor") 1 to 10 foreach { i => randomRouter ! i }
Example 3- The purpose of this example is to show "High availability" and "Horizontal scalability" of the Akka system. For the purpose of simplicity we will create a cluster of two Hosts running remote Akka Systems. In our simple setup we will simulate two hosts on two JVM's on different ports running on one machine.  Our application will create a router with 10 instances of an actor on two remote systems such that each remote system gets 5 instances each as shown below.

"system1" is configured by loading "remote_application.conf"file (localhost , port 5150) whereas "system2" is configured on port 5151 by overriding "remote_application.conf" file's "tcp.port" setting giving us two remote systems on same machine and on ports (5150 and 5151)

remote_applicaion.conf


akka { loglevel = "INFO" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 5150 } log-sent-messages = on log-received-messages = on } }
RemoteRouterPoolExample


RemoteRouterPoolExample { akka { loglevel = "INFO" actor { deployment { /RemoteRoundRobinActor { router = random-pool nr-of-instances = 5 target { nodes = ["akka.tcp://RemoteSystem1@127.0.0.1:5150", "akka.tcp://RemoteSystem2@127.0.0.1:5151"] } } } } } }



val config = ConfigFactory.load("remote_application") val system1 = ActorSystem("RemoteSystem1" , config) val overrides = ConfigFactory.parseString("akka.remote.netty.tcp.port=5151"); val actualConfig = overrides.withFallback(ConfigFactory.load("remote_application")); val system2 = ActorSystem("RemoteSystem2" , actualConfig) val system = ActorSystem.create("RemotePoolRouter", ConfigFactory.load().getConfig("RemoteRouterPoolExample")) val remoteRoundRobinActor = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RemoteRoundRobinActor") 1 to 10 foreach { i => remoteRoundRobinActor ! s"POP+$i }
10 messages will be posted to  router which will propagate them to 10 different actor instances running on the two remote systems. Each actor instance running on remote system will get one message to process. If for some reason one remote system goes down , all messages will be delivered on another remote akka system. This feature is used for "High Availability"

Example 4- Sometimes, rather than having the router actor create its routees, it is desirable to create routees separately and provide them to the router for its use. You can do this by passing an paths of the routees to the router’s configuration. Messages will be sent with ActorSelection to these paths.
This is similar to Example-3 with a difference in the process of creation of actors on remote systems. In example-3 fresh actors are created on remote systems by the router whereas in this example router would expect these actors to be readily available.



RemoteRouterGroupExample { akka { actor { deployment { /RemoteRoundRobinActor { router = round-robin-Group nr-of-instances = 10 routees.paths = [ "akka.tcp://RemoteSystem1@127.0.0.1:5150/user/remote1", "akka.tcp://RemoteSystem2@127.0.0.1:5150/user/remote2"] } } } } }


// Launch an actor System on Remote machine on port 5150 with an EchoActor instance
// external to Router Application val config = ConfigFactory.load("remote_application") val _system = ActorSystem("RemoteSystem" , config) val remoteActor1 = _system.actorOf(Props[EchoActor], name="remote1") // Launch another actor System on Remote machine on port 5151 with an EchoActor
// instance external to Router Application val overrides = ConfigFactory.parseString("akka.remote.netty.tcp.port=5151"); val actualConfig = overrides.withFallback(ConfigFactory.load("remote_application")); val _system2 = ActorSystem("RemoteSystem2" , actualConfig) val remoteActor2 = _system2.actorOf(Props[EchoActor], name="remote2") // Launch Router application and refer externally launched remote systems as routees val system = ActorSystem.create("Router", ConfigFactory.load().getConfig("RemoteRouterGroupExample")) val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RemoteRoundRobinActor") 1 to 10 foreach { i => randomRouter ! i }


Example 5- In this case we are going to generate a Cluster aware router. We are going to create two akka Systems on two different JVMs on port 2551 and 2552 and then initiate a Cluster aware router.



ClusterAwareRouter{ akka { actor { provider = "akka.cluster.ClusterActorRefProvider" deployment { /ClusterAwareActor { router = random-pool nr-of-instances = 10 cluster { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = on } } } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down-unreachable-after = 10s } } }
Run the below code on two different JVMs by passing two arguments in the same order
> scala safCluster node-1 2551 : This would run an  actor system inside a cluster. At this point there is one node "node-1" in the cluster.
> scala safCluster node-2 2552 : This would run another actor system inside the same cluster. At this point there are two nodes in the cluser. It also fires a cluster aware router and generates 10 instances in the cluster (comprising of two remote nodes). Later it sends 10 messages to cluster aware routers which are propagated to the actor instances in cluster randomly. This property exhibits high availability, Load Balancing and fault Tolerance property of Apache Akka.

safCluster.scala


var port = 0 if (!args.isEmpty && (args(0).equals("node-1"))) port = 2551; if (!args.isEmpty && (args(0).equals("node-2"))) port = 2552; val system = ActorSystem.create("ClusterSystem", ConfigFactory.parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}"). withFallback(ConfigFactory.load()).getConfig("ClusterAwareRouter")) if (!args.isEmpty && (args(0).equals("node-2"))) { println("Node-1 Running") val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "ClusterAwareActor") 1 to 100 foreach { i => randomRouter ! i } }
Example 6- In this case we will deploy an actor  on remote Akka System (on port 5150). The pre-Requisite for this example is to have an Actor system running remotely on 5150 port. We will generate ActorRef from ActorPath and vice Versa.

Remote_deployment.conf


akka { loglevel = "INFO" actor { provider = "akka.remote.RemoteActorRefProvider" deployment { /someActor { remote = "akka.tcp://RemoteSystem@127.0.0.1:5150" } } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } log-sent-messages = on log-received-messages = on } }



// This will fire an actor system on remote machine and opens a remote port 5150
// It creates an actor instance of "EchoRemoteActor"
val config = ConfigFactory.load("remote_application") val _system = ActorSystem("RemoteSystem" , config) val remoteActor = _system.actorOf(Props[EchoRemoteActor], name="remote")
// This will try to deploy an actor EchoActor on the above created
// Remote Actor system. val config1 = ConfigFactory.load("remote_deployment") val system = ActorSystem("LocalSystem" , config1) val someActor = system.actorOf(Props[EchoActor], "someActor")

// Sends a message to an actor someActor (exhibits Location transparency
// as code doesn't know the location of the instance)
someActor ! "Heya" println("PATH OBTAINED IS " + someActor.path)
// Get a path from remote actor and then
// get an actor instance from the path (reverse action) val referRemote = system.actorSelection(someActor.path) referRemote ! "Message to Remote Actor "
// Get actor instance from path again which creates ActorRef and use it to
// send a message. val getAnotherLocalInstance = system.actorSelection("/user/someActor") getAnotherLocalInstance ! "Another message1"
// Get actor instance from full path again which creates ActorRef and use it to
// send a message
val getAnotherRemoterInstance = _system.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remote") getAnotherRemoterInstance ! "Another Message2


Conclusion

In the second part of the Scrapbook Series, we have examined various ways Akka actors are deployed in Local,Remote and Cluster models. We have demonstrated High Availability, Fault Tolerance, concurrency and throughput by deploying Akka in different flavors.

Please do let me know if this article was helpful to you or if something is not clear to you.