Now lets go over to CSP (Continuous Sequential Processing)
Here is the Code Block
Now in main() we have created an instance of the callable task and started the task by passing it as an argument to the task method of DefaultPGroup.
Invoking the task() starts the Callable task in a separate thread spawned from main().We have chained handler for the return type, such that when the task finishes by returning the String "Okkkkk" the first handler gets invoked, and when the first handler successfully gets executed, the return type of the first handler is passed on to the second handler and from there onto the SyncDataflowVariable.
So after the task gets started, there is a loop which checks for whether the current thread gets interrupted or not and as we pass values from main method to SyncDataflowQueue, it gets printed from within the loop, and when no more inputs are there, we interrupt the thread and hence the task exits by returning "Okkkkk", and this is when the chained handlers gets executed.
Please note that the Handlers, Tasks all gets executed in separate thread, so the handler thread waits until we interrupt the task thread.
This is a small example of CSP.
SyncDataflow Queue
Here is the Code Block
import java.util.concurrent.Callable;
import groovy.transform.TupleConstructor;
import groovyx.gpars.dataflow.DataflowChannel
import groovyx.gpars.dataflow.DataflowQueue;
import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.dataflow.SyncDataflowVariable;
import groovyx.gpars.group.DefaultPGroup
class CSPTest {
static def taskThread
static def interrupt = false
@TupleConstructor
class CallableTask1 implements Callable
{
SyncDataflowQueue channel;
public String call()
{
taskThread = Thread.currentThread()
println "In call Method...... for Thread: ${Thread.currentThread().getName()}"
while(!Thread.currentThread().isInterrupted())
{
try{
sleep(2000)
println "fetching value...."
//println "The channel is: $channel"
println channel.getVal().toUpperCase()
println "Value Fetched...."
sleep(2000)
if(interrupt)
{
Thread.currentThread().interrupt()
}
}
catch(InterruptedException e){
e.printStackTrace()
}
"****** Interrupting, Hence EndingLoop ********"
}
"Okkkkk"
}
}
static main(args)
{
SyncDataflowQueue queue = new SyncDataflowQueue()
SyncDataflowVariable var = new SyncDataflowVariable()
def ok = new CSPTest()
def c = new CallableTask1(ok,queue)
def group = new DefaultPGroup()
def j = group.task(c)
j.then {
println "The result is $it"
it
}.then {
println "I am Hererrrrr"
var << it
}
println "Sending First Value"
queue << "First value"
sleep(3000)
println "Sending Second Value"
queue << "Second value"
sleep(5000)
println "Sending Third Value"
queue << "Third value"
println "************ The callable Thread name is: ${taskThread.getName()}"
//taskThread.interrupt()
CSPTest.interrupt = true;
println "IsInterrupted: ${taskThread.isInterrupted() }"
println "The Result is: ${var.val}"
}
}
Here we have created a Callable Task, returning String, having a SyncDataflowQueue as one of its parameters as defined by @TupleConstructor.Now in main() we have created an instance of the callable task and started the task by passing it as an argument to the task method of DefaultPGroup.
Invoking the task() starts the Callable task in a separate thread spawned from main().We have chained handler for the return type, such that when the task finishes by returning the String "Okkkkk" the first handler gets invoked, and when the first handler successfully gets executed, the return type of the first handler is passed on to the second handler and from there onto the SyncDataflowVariable.
So after the task gets started, there is a loop which checks for whether the current thread gets interrupted or not and as we pass values from main method to SyncDataflowQueue, it gets printed from within the loop, and when no more inputs are there, we interrupt the thread and hence the task exits by returning "Okkkkk", and this is when the chained handlers gets executed.
Please note that the Handlers, Tasks all gets executed in separate thread, so the handler thread waits until we interrupt the task thread.
This is a small example of CSP.
SyncDataflow Queue








