Thursday, 17 December 2015

(C)ontinuous (S)equential (P)rocessing (CSP)

Now lets go over to CSP (Continuous Sequential Processing)

Here is the Code Block


import java.util.concurrent.Callable;

import groovy.transform.TupleConstructor;
import groovyx.gpars.dataflow.DataflowChannel
import groovyx.gpars.dataflow.DataflowQueue;
import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.dataflow.SyncDataflowVariable;
import groovyx.gpars.group.DefaultPGroup

class CSPTest {
 
 static def taskThread
 static def interrupt = false
 @TupleConstructor
 class CallableTask1 implements Callable
 {
  
 
  SyncDataflowQueue channel; 
  
  public String call()
  {
   taskThread = Thread.currentThread()
   println "In call Method...... for Thread: ${Thread.currentThread().getName()}"
   while(!Thread.currentThread().isInterrupted())
    {
     try{
     sleep(2000)
     println "fetching value...."
     //println "The channel is: $channel"
     println channel.getVal().toUpperCase()
     println "Value Fetched...."
     sleep(2000)
     
     if(interrupt)
     {
      Thread.currentThread().interrupt()
     
     }
     }
     catch(InterruptedException e){
      e.printStackTrace()
     }
     
     "****** Interrupting, Hence EndingLoop ********"
    } 
    
    "Okkkkk"    
  }
 }
 
 static main(args)
 {
  SyncDataflowQueue queue = new SyncDataflowQueue()
  SyncDataflowVariable var = new SyncDataflowVariable()
  
  def ok = new CSPTest()
  def c = new CallableTask1(ok,queue)
  
  def group = new DefaultPGroup()
  def j = group.task(c)
  
  
  j.then {
   
   println "The result is $it"
   it
   
  }.then {
  
   println "I am Hererrrrr"
   var << it
  }
  
  println "Sending First Value"
  queue << "First value"
  
  sleep(3000)
  
  println "Sending Second Value"
  queue << "Second value"
  
  sleep(5000)
  
  println "Sending Third Value"
  queue << "Third value"
  
  println "************  The callable Thread name is: ${taskThread.getName()}"
  //taskThread.interrupt()
  
  CSPTest.interrupt = true;
  
  println "IsInterrupted: ${taskThread.isInterrupted() }"
  
  println "The  Result is: ${var.val}"
  
  
 }
}


Here we have created a Callable Task, returning String, having a SyncDataflowQueue as one of its parameters as defined by @TupleConstructor.

Now in main() we have created an instance of the callable task and started the task by passing it as an argument to the task method of DefaultPGroup.

Invoking the task() starts the Callable task in a separate thread spawned from main().We have chained handler for the return type, such that when the task finishes by returning the String "Okkkkk" the first handler gets invoked, and when the first handler successfully gets executed, the return type of the first handler is passed on to the second handler and from there onto the SyncDataflowVariable.

So after the task gets started, there is a loop which checks for whether the current thread gets interrupted or not and as we pass values from main method to SyncDataflowQueue, it gets printed from within the loop, and when no more inputs are there, we interrupt the thread and hence the task exits by returning "Okkkkk", and this is when the chained handlers gets executed.

Please note that the Handlers, Tasks all gets executed in separate thread, so the handler thread waits until we interrupt the task thread.
This is a small example of CSP.

SyncDataflow Queue
View Subhankar Paul's profile on LinkedIn

Wednesday, 16 December 2015

PARALLEL COLLECTIONS

Working with Parallel Collections:


import groovyx.gpars.GParsPool;

class ParallelCollectionTest {

 static mains(args)
 {
  GParsPool.withPool{

   def x = [1,2,3,4,5,6].parallel.
   filter {it%2 ==0}.
   map {

    println "1. $it by Thread ${Thread.currentThread()}"
    it ** 2

   }.
   reduce {
    a,b ->
    println "2. $a and $b by Thread ${Thread.currentThread()}"
    
    //a + b
   }

   15.times {print "*"
    
   }

   println x
   
   def nums = 1..4
   //def ok = nums.makeTransparent()
   println nums 
  }
  
  30.times {
   print "="
  }
  
  println "" 
  
  def y = [[10,20],[20,30]].combinations().each {
   a, b ->
   println "a:$a"
   println "b:$b"
   
  }
  println y.class
  
  (0..10).inject(0, {a, b -> println  '$a-------$b'})
  

 }
}

In this Code Snippet we have parallely processed elemets of a list with filter, map and reduce. Besides, there are methods like eachParallel, collectParallel etc. with which we can start parallel processing on the data elements of a Collection item.

But there is a catch, when we need to perform parallel processing on a collection, we should try to opt for filter, map and reduce as calling parallel property on the Collection coverts the collection to Parallel Collection only once and then we perform the needed operations on the parallel tree like structure, But while using chained parallel methods like eachParallel, collectParallel it creates a parallel collection during each call to such chained parallel methods, and so there will be overhead involved in creating and destroying parallel collection during such Method call.

SyncDataflow Queue
View Subhankar Paul's profile on LinkedIn

ACTORS with THREAD Group

Till now the different GPars components that we have come across gets executed in separate Daemon Threads(Thread pool), spawned from the MAIN thread, be it Actors, Dataflow Tasks, Operators, Selectors ETC.

So when the main thread completes the other Thread too dies with it, we have tries always to wait the main thread, for the other threads to end their completion using []*.join().

But to avoid these kind of problems, we can initate the Tasks in a Separate Non Daemon thread by using a Non Daemon Thread pool. Lets traverse an example for it:

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor;
import groovyx.gpars.actor.DynamicDispatchActor;
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.group.NonDaemonPGroup;
import static groovyx.gpars.dataflow.Dataflow.*
import groovyx.gpars.dataflow.*


import java.util.concurrent.TimeUnit

class ActorsWithTreadGroup {

 static def nonDaemonGroup = new NonDaemonPGroup(4)
 static def daemonGroup = new DefaultPGroup(4)
 static DataflowVariable dataflowVariable =  new DataflowVariable()
 static DataflowVariable dataflowVariableInout =  new DataflowVariable()
 static DataflowVariable dataflowPromise = new DataflowVariable()
 
 
 class Act1 extends DefaultActor
 {
  
  Act1()
  {
   println "#### Constructor is Invoked ####"
   this.parallelGroup = nonDaemonGroup
   //this.parallelGroup = daemonGroup
  }
  
  def secretNum

  void afterStart() {

   println " ***** After Start is invoked ***** "

   secretNum = new Random().nextInt(10)

   println " ***** After Start is invoked done***** "
  }

  /**
   *  Static Message Handler
   * However Dynamic Message Handlers can be registered
   * in case of DynamicDispatchActor
   *
   */
  
  
  void act() {

   println "**** Act is Invoked **** "

   loop {

    println "**** Loop is Invoked **** "

    react {  msg ->
     
     println "Sleeping Thread...."
     TimeUnit.MILLISECONDS.sleep 10000
     println "Awakening Thread...."
     println "The Msg is $msg"
    }

    println "**** Loop is Finished **** "
   }
  }
    

 }
 
 /**
  * Dynamic Actor
  * 
  */
 
 class Act2 extends DynamicDispatchActor
 {
  
  Act2()
  {
   this.parallelGroup = nonDaemonGroup
  }
  
  /**
   *  Static Message Handler
   * However Dynamic Message Handlers can be registered
   * in case of DynamicDispatchActor, and the Dynamic Message
   * handlers takes precendence over this static ones
   *
   */
  def act()
  {
   loop{
    
    react{msg ->
     
     println "The msg in Act method is $msg"
    }
   }
  }
  
  void onMessage(def msgObject)
  {
   println "The msg in onMessage is $msgObject"
  }
 }
 
 
 static def act;
 static void actFunc()
 {
  act = daemonGroup.actor{
   
   
   loop{
    
    react { msg ->
     
     println "Sleeping Thread...."
     TimeUnit.MILLISECONDS.sleep 10000
     println "Awakening Thread...."
     println "The Msg is $msg"
    }
   }
   
   //setParallelGroup new DefaultPGroup(2)
  }
  
 }
 
 static main(args)
 {
  
 /* actFunc()
  println "The actor is: ${act.getClass()}"
  act << "I am first"
  
  TimeUnit.MILLISECONDS.sleep 2000
  
  act << "I am last"*/
  
  //act.join()
  
  /*def act1 = ActorsWithTreadGroup.Act1.newInstance(new ActorsWithTreadGroup())
  act1.start()
  
  println "===========Sending First Msg......"
  act1 << "I am first"
  
  TimeUnit.MILLISECONDS.sleep 2000
  
  println "============Sending Next Msg......"
  act1 << "I am last"*/
  
  //act1.join()
  
  /**
   * Consolidating Multiple Dynamic Message Handler (when)
   * with become
   * 
   * This message HANDLERS can take only parameter,
   * i.e. the incoming msg......
   * 
   * When we are registering multiple ones the same Type of
   * message, the last one will be in Action....
   * 
   */
  
  def act2 = ActorsWithTreadGroup.Act2.newInstance(new ActorsWithTreadGroup())
  act2.become {
   when{
   msg ->
   println " ^^^^^ Msg Received by Dynamic msg Handler 1 $msg ^^^^^ "
  }}
  
  /**
   * The below Message Handler will be in Action as this
   * is last declared
   * 
   */
  
  /*act2.when{ msg2->
   
   println " ^^^^^ Msg Received by Dynamic msg Handler 2 $msg2 ^^^^^ "
   sleep(10000)
  }
  */
  
  act2.start()
  act2 << "I am first"
  //TimeUnit.MILLISECONDS.sleep 2000
  act2 << "I am last"
  
  Promise promise1 = task{
   
   println "I am doing a Task"
   println "Waiting for input...."
   def var = dataflowVariableInout.get()
   println "Got Input"
   sleep(10000)
   var
  }
  
  Promise promise2 = task{
   
   println "For Task2"
   sleep(1000)
   println "**** Returning Data for TASK2"
   "promise 2"
  }
  
  Promise promise3 = task{
   
   println "For Task3"
   sleep(2000)
   println "**** Returning Data for TASK3"
   "promise 3"
  }
 
  promise1.whenBound{msg ->
  
  println "The Bounded Promise Data is: $msg"
  dataflowVariable << msg
  }
  
  def convert = {msg -> msg.toUpperCase()}
  
  promise1.then {msg -> println " @@@@@ The then Handler is $msg"
          "My name is Subhankar $msg"}
         .then{ convert(it) }.then{ dataflowPromise << "Heyyyy!!!!!"+it}
  
  dataflowVariableInout << "Okiesss"
  
  println dataflowVariable.get()
  
  println dataflowPromise.get()
  
  whenAllBound([promise1,promise2,promise3]) {arg1, arg2, arg3 ->
    
   println " ************  FINALLY ALL THE PROMISES HAS ARRIVED ************* \n PROMISE1: $arg1 \n PROMISE2: $arg2 \n PROMISE3: $arg3"
   
   }
   
  
  
  
 }

}

Here in this example we also have introduced GPars Promise, which is much Like Java Concurrency Future feature and where we can also bind Handlers using whenBound, then OR whenAllBound

We have initialized two Pooled Parallel Groups:


 static def nonDaemonGroup = new NonDaemonPGroup(4)
 static def daemonGroup = new DefaultPGroup(4)

and then we have used them with Actors one at a time.
For Actors whicg are defined as Inner class by either extending DefaultActor or DynamicDispatchActor has a property parallelGroup, so setting this property to the above defined Pooled Group achieves our end.
In other cases we can use the Pooled Group to initiate an Actor like:


daemonGroup.actor{.....

Apart from this there are some interesting features which I have mentioned as comments with the code snippet.
View Subhankar Paul's profile on LinkedIn

Monday, 14 December 2015

DATAFLOW

Now when we talk about Dataflows, the first thing that comes in our mind is the data flowing between differnt nodes, connecting different nodes and getting processed in between nodes (if required) and all other things happening.

Not to get Confused GPars Dataflow comes to the rescue. Now lets move on to DataFlows

Lets start with a simple demo:


import groovyx.gpars.dataflow.Dataflows
import static groovyx.gpars.dataflow.Dataflow.task

class DataflowTest {

 static main(args)
 {
  Dataflows data = new Dataflows()
  def z

  
  def t1 = task{
   
   println "Sum Task is waiting (Thread: ${Thread.currentThread()})"
   
   data.z = data.x+data.y
   
   println "The Summation is $data.z (Thread: ${Thread.currentThread()})" 
   
  }
  
  def t2 = task
  {
   println "Mul Task is waiting (Thread: ${Thread.currentThread()})"
   def a = data.z * data.x
   
   println "The Product is: $a"
   
  }
  
  def t3 = task{
   
   println "Assigning x.... (Thread: ${Thread.currentThread()})"
   
   data.x = 3
   
  }
  
  def t4 = task{
   
   println "Assigning y.... (Thread: ${Thread.currentThread()})"
   
   data.y = 7
  }
  
  println "The Main Thread is (Thread: ${Thread.currentThread()})"
  [t2.join()]
  
 }
 
}

 

Here we have defined tasks with the the help of Dataflow and some variables associated with Dataflow.
Whenever the scenario is such that a task is dependent on another task then we can use Dataflows, and the tasks gets executed in another thread just like Actors

Here Task 2 (t2) depends in Task 1 (t1), which depends on Task 3 (t3) and Task 4(t4).
t2 waits for t1 and t1 eventually for t3 and t4 to complete the whole cycle.
Lets go through another example:


import groovyx.gpars.dataflow.Dataflows
import static groovyx.gpars.dataflow.Dataflow.task

class DataFlowTest2 {

 static Dataflows data = new Dataflows()
 static Thread t1,t2
 
 static def t1()
 {
  def t1 = task{
   
   println "Sum Task is waiting (Thread: ${Thread.currentThread()})"
   
   def z = data.x+data.y
   
   println "The Summation is...... $z (Thread: ${Thread.currentThread()})" 
   
  }
 }
 
 static def t2()
 {
  task{
   println "Assignment by (Thread: ${Thread.currentThread()})"
    
  try{ 
   data.x = 100
   data.y = 30
   data.x = 1000
  }
  catch(e)
  {
   e.printStackTrace()
  }
   println "Assignment Done...."
  }
 }
 
 static main(args)
 {
  t1()
  t2()
  
  Thread.currentThread().join()
 }
 
}


This is also quite same as the previous one, the only difference is that the Dataflow Tasks we have defined within static methods. From the main we have invoked those static methods to initiate those tasks.

The two important inseparable aspects of Dataflow tasks are:
1) Operators
2) Selectors
Here is a simple example of a Dataflow Selector.
The concept behing Dataflow Selector is as follows:
A Selector starts processing as soon as it finds value in any of the available channels passed to it.

Let's probe it with the help of an example:


import groovyx.gpars.dataflow.*
import static groovyx.gpars.dataflow.Dataflow.*

class DataFlowSelectTest {

 static def a = new DataflowVariable()
 static def b = new DataflowVariable()
 static def c = new DataflowVariable()
 
 static mains(args)
 {
  
  
  task{
   
   println "Sleeping Task1"
   sleep 3000
   println "EndSleep Task1"
   a << 2
  }
  
  task{
   println "Sleeping Task2"
   sleep 6000
   println "EndSleep Task2"
   b << 22
 }
  
  task{
  
   println "Sleeping Task3"
   sleep 5000
   println "EndSleep Task3"
   c << 222
 }
  
  
  /**
   * Select waits for one of the Tasks to finish as there is wait
   */
  def result = select([a, b, c])
  println "The fastest result is class ${result.getClass()} ------- ${result([false,true,true]).value}"
  
  def oper = '+'
  def num1 = 11
  def num2 = 22
  
  def formula= "$num1 $oper $num2"
  println "$formula: ${new GroovyShell().evaluate(formula)}" 
  
  
 }
 
}


So in the above example we can see that we have defined 3 tasks each populating a DataflowVariable with some delay and there is a Selector, which is acting on these 3 DataflowVariable(s), and as soon as any one of the DataflowVariable(s) becomes populated the Selector executes and it populates the result with the value of the DataflowVarible being populated first.

Interesting isn't it!!!!!

LIKE Selector, Dataflow Operator just acts in reverse to Selector principle.

Here, the operator will not execute until all the values are available in the specified channel(s), and once they are available, the associated closure gets invoked and we can also bind values explicitly to the output channel.

Let's see an example:


import static groovyx.gpars.dataflow.Dataflow.*
import groovyx.gpars.dataflow.*

public class DataFlowOperator {

 static DataflowQueue a = new DataflowQueue()
 static DataflowQueue b = new DataflowQueue()
 static DataflowQueue c = new DataflowQueue()
 static DataflowQueue d = new DataflowQueue()

 static mains(args)
 {
  println "By Thread ${Thread.currentThread()}"


  try{
   
    operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
     println "x:$x \n y:$y \n z:$z By Thread ${Thread.currentThread()}"
     bindAllOutputValues x+y+z
    }
   
   task{

    println "In Assignment Task.....By Thread ${Thread.currentThread()}"
    a << 10
    b << 20
    c << 30
    println "End Of Assignment Task.....By Thread ${Thread.currentThread()}"

   }

   def t = task{

    println "The value of d is By Thread ${Thread.currentThread()}"
    def x=d.val
    println "The value of x is $x By Thread ${Thread.currentThread()}"
   }




   Thread.sleep(10000)

  }

  catch(e)
  {
   e.printStackTrace()
  }

  println "End of Main........ By Thread ${Thread.currentThread()}"
 }
}




Here we have defined two tasks and an operator.
The operator waits for all the 3 channels to get the data and Task2 waits for the output channel to be populated.
So the Task1 executes, and it populates all the 3 channels.
Once all the 3 channels are populated, the operator which is in waiting status till now executes the closure and binds the required value to the output channel, which makes Task2 execute which has been waiting for the output channel to be populated.

So we can say that the Operator Synchronizes until values of all DataflowVariable becomes available and a Selector when any one of the channels has a value.

Lets move on to another example:

import java.util.concurrent.TimeUnit;

import groovyx.gpars.dataflow.DataflowQueue;
import static groovyx.gpars.dataflow.Dataflow.*
import groovyx.gpars.dataflow.*
import groovyx.gpars.dataflow.operator.DataflowEventListener;
import groovyx.gpars.dataflow.operator.DataflowProcessor;

class DataFlowOperatorTest {

 static DataflowQueue a = new DataflowQueue()
 static DataflowQueue b = new DataflowQueue()
 static DataflowQueue c = new DataflowQueue()

 /**
  * When we will be using the Broadcast then 
  * we need to comment the Queue Block and open up 
  * the Broadcast block.
  * 
  *  Cause we have used the same DataFlow entities in both
  *  Operators and Selectors so if we use Queues, the Operators
  *  will be in effect, the Selectors will not be in action, as in
  *  case of Queues it can be used with either Operator or Selector.
  *  In order to use for both Operators and Selectors Broadcasts
  *  should be used, so that they can be registered with multiple
  *  operational entities.....
  * 
  */
 
 static DataflowBroadcast d = new DataflowBroadcast()
 static DataflowBroadcast e = new DataflowBroadcast()
 
 /*static DataflowQueue d = new DataflowQueue()
 static DataflowQueue e = new DataflowQueue()*/
 
 
 static DataflowQueue highVolumeQueue = new DataflowQueue()
 
 
 static def listnrs = new DataflowEventsListenerImpl()
 static def ChanlListener = new DataflowChannelListenerImpl()

 static mains(args)
 {
  println "By Thread ${Thread.currentThread()}"
  highVolumeQueue.getEventManager().addDataflowChannelListener(ChanlListener)

  try{
   
   /**
    * Block for Broadcast
    * 
    */
   
   def op1 = operator(inputs:[d.createReadChannel(), e.createReadChannel()]) { val1, val2 ->
    
    /**
     * The delegate inside Operators/Selectors is
     * DataflowProcessor
     * 
     */
    
    registerChannelListenersToAllInputs(ChanlListener)
    println "***** The value is of d N e is: $val1 N $val2"
   }
   
   def selector1 = selector(inputs : [d.createReadChannel(), e.createReadChannel()]) {val, idx ->
     println "%%%%%%%  The SELECTOR Val is: $val \n Index: $idx \n"
   }
   
   /**
    * Within Selectors we can specify outputs too just like normal Operators.
    * For the sake of simplicity we have omitted that.
    * 
    */
   
   /**
    * End of Block for Broadcast
    */
   
   
   
   /**
    * Block for Queue
    *
    */
   
   /*def op1 = operator(inputs:[d, e]) { val1, val2 ->
    println "***** The value is of d N e is: $val1 N $val2"
   }
   
   def selector1 = selector(inputs : [d, e]) {dVal, eVal ->
     println "%%%%%%%  The SELECTOR dVal is: $dVal \n eVal: $eVal \n"
   }*/

   /**
    * End of Block for Queue
    */
   
   
   /*def op2 = operator(inputs:[e]) { val ->
    
    println "The value is of e: $val"
    
   }*/
   
   def op3 = operator(inputs:[highVolumeQueue]) { highVal ->
    
    println "The High vOlume DATA: $highVal"
    
   }
   
    operator(inputs: [a, b, c], outputs: [d,e,highVolumeQueue], listeners: [listnrs]) {x, y, z ->
     println "x:$x \n y:$y \n z:$z \n By Thread ${Thread.currentThread().isDaemon()}"
     
     println "===== Binding First Output ====="
     bindOutput 1, x+y+z
     
     TimeUnit.MILLISECONDS.sleep(5000)
     
     println "===== Binding Second Output ====="
     bindOutput 0, x+z
     
     if(x+2*y+z >= 80 )
      bindOutput(2,fireCustomEvent(x+2*y+z))
     else
      bindAllOutputs x+2*y+z
    }
   
   task{

    println "In Assignment Task.....By Thread ${Thread.currentThread()}"
    a << 10
    b << 20
    TimeUnit.MILLISECONDS.sleep(6000)
    c << 30
    println "End Of Assignment Task.....By Thread ${Thread.currentThread()}"

   }

   task
   {
    println "Doing some Stupidity in the mean time..... :-) :-)"
   }


   [op1,op3]*.join()
   
   /**
    * Termination of the Operators/Selectors can be done with
    * terminate(), terminateAfterNextRun()
    * These methods call the Life Cycle Event Methods of the registered Life Cycle Event Handlers of the
    * corresponding Selectors/Operators.  
    * 
    * Apart from these we can also use PoisonPill Control Message,
    * which needs to be passed to the inputs of the Operators/Selectors
    * i.e. Channels/Broadcasts/Variable etc and Event Methods of the registered Life Cycle Event Handlers
    * gets invoked. 
    * 
    */

   

  }

  catch(e)
  {
   e.printStackTrace()
  }

  println "End of Main........ By Thread ${Thread.currentThread()}"
 }
}

class DataflowEventsListenerImpl implements DataflowEventListener {
 
  public void registered(DataflowProcessor processor) {
   // TODO Auto-generated method stub
   println "******* Registered  ...."
   
  }
 
  public void afterStart(DataflowProcessor processor) {
   // TODO Auto-generated method stub
   println "******* afterStart  ...."
   
  }
 
  public void afterStop(DataflowProcessor processor) {
   // TODO Auto-generated method stub
   println "******* afterStop  ...."
   
  }
 
  public boolean onException(DataflowProcessor processor, Throwable e) {
   // TODO Auto-generated method stub
   println "******* onException1  .... ${e.printStackTrace()}"
   return false;
  }
 
  public Object messageArrived(DataflowProcessor processor,
    DataflowReadChannel<Object> channel, int index, Object message) {
   // TODO Auto-generated method stub
    println "******* messageArrived  .... $message for Channel $channel at Index $index"
   return message;
  }
 
  public Object controlMessageArrived(DataflowProcessor processor,
    DataflowReadChannel<Object>  channel, int index, Object message) {
   // TODO Auto-generated method stub
    println "******* controlMessageArrived  .... $message for Channel $channel at Index $index"
   return message;
  }
 
  public Object messageSentOut(DataflowProcessor processor,
    DataflowWriteChannel<Object>  channel, int index, Object message) {
   // TODO Auto-generated method stub
    println "******* messageSentOut  .... $message for Channel $channel at Index $index"
   return message;
  }
 
  public List<Object> beforeRun(DataflowProcessor processor,
    List<Object>  messages) {
    println "******* beforeRun  .... messages $messages"
   // TODO Auto-generated method stub
   return messages;
  }
 
  public void afterRun(DataflowProcessor processor, List<Object>  messages) {
   // TODO Auto-generated method stub
   println "******* afterRun  .... messages $messages"
   
  }
 
  public Object customEvent(DataflowProcessor processor, Object data) {
   // TODO Auto-generated method stub
   println "******* customEvent  .... for Data: $data"
   return 'I am Firing Custom Event';
  }
 
 }


 class DataflowChannelListenerImpl implements DataflowChannelListener
 {

  public void onMessage(Object message) {
   // TODO Auto-generated method stub
   println "@@@@@@@ Channel Listener invoked with Input"
  }
  
 }  
 


In this code snippet we have introduced Listeners for Operators which responds to Opeartor's LifeCycle Phases and Events and another Listener for DataflowQueue.

The Callaback methods within Listeners which gets executed at the onset/end of Opeartor's various LifeCycle Phases and are itself self explanatory.

Unlike previous example, we have used DataflowBroadcast and DataflowQueue instead of DataflowVariable.

There is a task populating DataflowQueue a,b & c with some delay. As soon as they are populated an operator acts on them and binds them manually (output channels represented by index ordering) to different output channels based on some condition.

There are other Selectors and Operators on those output channels which acts accordingly. The main objective behind this example is to demonstrate the usage of Listeners.

View Subhankar Paul's profile on LinkedIn

ACTORS

HEY!!! I am back with Actors, one of the GPars interesting features.
I will start with Actor
Actor is such an Entity which allows only one thread at a time, irrepective of the object the thread belongs!!!!
Lets start with a simple exemplar.


import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors

import java.util.concurrent.TimeUnit


class ActorTest {

 static def msg11="Default"
 static def startTime;
 
 /**
 * Ac Actor is initiated as  Actors.actor{...
 *
 */
 
 static def actor = Actors.actor{
  
  println " Actor Initialized by:${Thread.currentThread().getName()}"
  println "The metaclass is: ${delegate.getClass()}"
  
  /**
   * The Delegate inside Actors.actor Closure is Default Actor
   * 
   */
  
  loop{
   
   println "Inside Actor Loop :${Thread.currentThread().getName()}"
   
   react(20000) {msg2 ->println "The Msg-3 is $msg2 :${Thread.currentThread().getName()}" 
    
    
    
    if (msg2 == Actor.TIMEOUT) {
     //continue conversation
     println "Thank you for $msg2 I am terminating Myself as Time Difference is ${new Date().getTime() - startTime}"
     terminate()
    }
  
    
     msg2 = msg2+msg2
     msg11 = msg2 + ' Subha '
    //reply "Received!!!!"
    //println "Sleeping Start.....The Current Thread is (Actor-33):${Thread.currentThread().getName()}"
    
    Thread.sleep(8000)
    
    msg11 += ' Paul '
    
    //println "Sleeping End....."
    }
  
  
  println "Exiting Actor Loop :${Thread.currentThread().getName()}"
  
 }
  
 }
 
 static mains (args)
 {
  startTime=new Date().getTime()
 
  TimeUnit.MILLISECONDS.sleep(4000)
    
  15.times {print "*"  
  }
  
  println ''
  
  println "The Main Thread.... Before Sending  1: ${Thread.currentThread().getName()}"
  
  def a =  actor.send "1"
  
  println "The Main Thread.... Before Sending  11: ${Thread.currentThread().getName()}"
  
  def b =  actor.send "11"
  
  15.times {print "*"
  }
  println ''
  
  println "The Main Thread.... Before Sending  111: ${Thread.currentThread().getName()}"
  
  TimeUnit.MILLISECONDS.sleep(15000)
  
  def c =  actor.send "111"
  
  println "..... Sleep Finished...."
  
  
  
  
  //Thread.sleep(10000)
  [actor]*.join()
  
  println "The msg is $msg11"
 }
 
 
 
 
}


The main consideration behind the above mentioned code block is to exhibit that at a time only One thread can enter the critical area inside the Actor.

We have send messages to the Actor from the main thread thrice , but all these messages are being processed sequentially by the Actor in a Separate thread, and the main Thread thus quits, because all these Threads that execute the job inside the actor are Daemon threads, so in order to make the programme excute (as we have introduced delay here), we have joined the actor in the end as:


[actor]*.join()

This will make the Main Thread to waitfor the Actor(s) to finish, but as we have introduced loop inside the Actor, the actor will keep on running until we manually terminate the Actor, so in order to do that, we have used react() [N.B. Th reach() is the method which reacts or we can say responds to the messages being sent to the Actor] with a Timeout, which will make the actor wait for 20000 ms max for another message to arrive, and if there has been no message within that timeframe, then a Actor.TIMEOUT will be thrown and the Actor will terminate and thus terminating main.

We have also intrduced delay inside the Actor to show that, while a thread is sleeping, other threads, that has been started from the main thread, do not get the chance to execute, until and unless the current thread finishes its execution.

Actor has another propitious utility i.e. reply, which we have commented out here. Please check the GPars for furthur details.

We can execute the Tasks inside Actor by Non-Daemon Threads, by using Non-Daemon thread pool.

The Output of the above code will be:


 Actor Initialized by:Actor Thread 1
The metaclass is: class groovyx.gpars.actor.DefaultActor
Inside Actor Loop :Actor Thread 1
Exiting Actor Loop :Actor Thread 1
***************
The Main Thread.... Before Sending  1: main
The Main Thread.... Before Sending  11: main
The Msg-3 is 1 :Actor Thread 1
***************
The Main Thread.... Before Sending  111: main
Inside Actor Loop :Actor Thread 1
Exiting Actor Loop :Actor Thread 1
The Msg-3 is 11 :Actor Thread 1
..... Sleep Finished....
Inside Actor Loop :Actor Thread 1
Exiting Actor Loop :Actor Thread 1
The Msg-3 is 111 :Actor Thread 1
Inside Actor Loop :Actor Thread 1
Exiting Actor Loop :Actor Thread 1
The Msg-3 is TIMEOUT :Actor Thread 1
Thank you for TIMEOUT I am terminating Myself as Time Difference is 48077
The msg is 111111 Subha  Paul 



The above outputs are being printed with some deplay, and which is interesting..

Lets track another Actor example


import groovyx.gpars.actor.DefaultActor;
import groovyx.gpars.actor.Actors;

class ActorTest2 {



 class Act1 extends DefaultActor
 {
  def secretNum

  void afterStart() {

   println " ***** After Start is invoked ***** "

   secretNum = new Random().nextInt(10)

   println " ***** After Start is invoked done***** "
  }

  void act() {

   println "**** Act is Invoked **** "

   loop {

    println "**** Loop is Invoked **** "

    react { int num ->

     println "**** Reaction Started for $num **** "

     if(num == 10){
      reply 'you win'
      terminate()
     }
     else if (num > secretNum)
      reply 'too large'
     else if (num < secretNum)
      reply 'too small'
     else{
      reply 'you win'
      println "Terminating Myself....."
      terminate()
      println "Terminated....."
     }
     println "**** Reaction Finished for $num **** "
    }

    println "**** Loop is Finished **** "
   }
  }

 }

 class Act2 extends DefaultActor
 {
  String name = 'Subha'
  Act1 server = new Act1()

  int myNum

  void afterStart() {

   println " ***** After Start is invoked for Act2 ***** "
   server.start()
   println " ***** After Start is invoked done for Act2 ***** "



  }

  void act() {
   loop {
    myNum = new Random().nextInt(10)
    server.send myNum
    react {

     println "The Sender is: $sender"

     switch (it) {
      case 'too large': println "$name: $myNum was too large"; break
      case 'too small': println "$name: $myNum was too small"; break
      case 'you win': println "$name: I won $myNum"; endActor.send server ; terminate(); break
     }
    }
   }
  }



 }

 static def  endActor = Actors.actor{

  //loop{
  react{ server1 ->

   println "%%%%%% End Actor is invoked %%%%%%"

   server1.stop()
   // terminate()

   println "%%%%%% All Terminated %%%%%%"
  }
  //}
 }


 static def testActor = Actors.actor{

  react { obj ->

   println "The Object is $obj"
  }
 }


 static def loopTestActor = Actors.actor{
  def counter =1
  loop{
   println "Entering Loop.......... $counter"
   react { obj ->
    println "The Object is $obj"
   }
   println "Ending Loop.......... ${counter++}"
  }
 }
 
 /**
  * Usually the react method accepts a single Msg
  * Now in order to make it accept more than one i/p
  * parameter, we need to nest react
  * 
  */
 
 static def calculator = Actors.actor{
  
  react{ a ->
   println "The First input is: $a Thread-${Thread.currentThread()}"
   
   react{ b ->
    println "The second input is: $b Thread-${Thread.currentThread()}"
    
    console.send(a+b)
   }
   
  }
  
 }
 
 
 static def console = Actors.actor{
  
  react{result ->
   
   println "The result of addition is: $result Thread-${Thread.currentThread()}"
  }
 }

 static main(args)
 {
  //def act1 = new  ActorTest2.Act1(new ActorTest2())
  //def act1 = ActorTest2.Act1.newInstance(new ActorTest2())


  def act2 = ActorTest2.Act2.newInstance(new ActorTest2())
  act2.start()
  [endActor]*.join()


  testActor << "Okkkiii----9"
  testActor << "Nexttt-Okkk"

  testActor.join()

  20.times {
   print "="
  }

  println ""

  loopTestActor << "Okkkiii"
  loopTestActor << "Nexttt-Okkk"

  loopTestActor.join()


  println "============End In Main Join............."


  calculator.send 5
  
  calculator.send 9
  
  calculator.join()
  
 }

}


As in the previous example we have initialized Actor with Actors.actor {...., here we have extended DefaultActor and created an Inner class for the Actor.

DefaultActor contains some Lifecycle methods Like:

1) afterStart() -> Gets invoked after the Actor is started explitly using the start() method.
2) act() -> This method reacts to the incoming messages.
The above example depicts different aspects of actors, but the significant one is, how an actor replies to the messages sent by another Actor.
In the above example there are Actors with names Act1, Act2 and endActor

In the main method we have initiated Act2 and started it and made the main Thread to wait until the endActor has finished execution
After we have started Act2, from within the afterStart() method we have started Act1.

Now the Act2 actor sends messages to Act1 actor and which reacts accordingly to the messages of Act2 by sending appropiate replies, which the Act2 processes and on termination endActor is invoked.

Another important point is if there is no loop inside an Actor then it can react to only the first message send to the actor. Besides , an Actor has a handful of built in variables like sender, which denotes the sender of messages and etc.

This is in Short about actor.

Actots with Thread Group

View Subhankar Paul's profile on LinkedIn