First of all Wishing All a Very Happy and Prosperous 2016
Now lets get into something NEW!!! and interesting....... Kanban Flow.
Now before diving into Kanban Flow, FRIENDS I would like to draw your attention towards a interesting perhaps important incident while Dealing with SyncDataflowQueue.
If we want to populate more than one SyncDataflowQueue from the same Thread, the Thread goes into a wait state. Now if you ask me "HOW", I do not have the answer.
Lets describe it with a code excerpt:
class PipelineTest {
static def defaultPGroup = new DefaultPGroup(3)//new DefaultPGroup(new ResizeablePool(false, 3))
static SyncDataflowQueue syncQueue = new SyncDataflowQueue()
/**
* Changing SyncDataflowQueue to DataFlowQueue changes the entire
* behaviour But Why????
*
*/
static SyncDataflowQueue outputQueue = new SyncDataflowQueue()
//static DataflowQueue outputQueue = new DataflowQueue()
static def result
class SyncQueueEntry implements Callable
{
DataflowQueue q2
SyncQueueEntry(DataflowQueue q2)
{
this.q2 = q2
}
public String call() throws Exception {
// TODO Auto-generated method stub
println "**** The Task Thread is ${Thread.currentThread()} ******"
Dataflow.task{
q2 << "okkkkkkies and Onnneeeessssss"
}
" !!!!!!!!!!! Inter Data !!!!!!!!!"
}
}
/* static def t1 = Dataflow.task{
println "**** The Task Thread is ${Thread.currentThread().getName()} ******"
outputQueue << "Another Sync Queue"
}*/
static def upperCase = { msg -> msg.toUpperCase()
}
static def append1 = {msg ->
"Subhankar IN Action $msg"
}
static def save = {text ->
//Just pretending to be saving the text to disk, database or whatever
'Saving ' + text
}
static main(args)
{
println "**** The Main Thread is ${Thread.currentThread().getName()} ******"
Promise jk
//def jk
result = syncQueue.chainWith(defaultPGroup,upperCase).chainWith(defaultPGroup,append1).chainWith(defaultPGroup,save) /*into outputQueue*/
def pipeLineTest = new PipelineTest()
SyncQueueEntry syncQueue3 = new SyncQueueEntry(pipeLineTest,outputQueue)
def group = new NonDaemonPGroup(3)
println "Sleeping Thread.........."
sleep(3000)
println "Waking Thread......."
syncQueue << "Groovy"
syncQueue << "Grails"
//sleep(6000)
/**
* Makes the main Thread goes into wait state
* as more than One SyncDataflowQueue are being
* populated from same Thread i.e. Main Thread
*/
//outputQueue << "Data Manually entered in OutputQueueee"
/**
* But when SyncDataflowQueue is being populated from another
* Thread
*
*/
Dataflow.task
{
outputQueue << "Data Manually entered in OutputQueueee"
}
println "The OUTPUTQUEUE Result is: ${outputQueue.val}"
println "The syncQueue Result1 is: ${result.val}"
println "The syncQueue Result2 is: ${result.val}"
try{
syncQueue << "cloud"
println "The syncQueue Result3 is: ${result.val}"
/**
* As the SyncDataflowQueue in the Callable Task is Populated
* through a separate Thread, so the Promise can be
* accessed it is not going to get stuck after shutting
* down the Thread group
*
* If we want to do before shutting down the Thread Group,
* We need to populate SyncDataflowQueue in the callable Task
* in a Separate Thread
*
*/
jk = group.task(syncQueue3)
}
catch(e)
{
e.printStackTrace()
}
/*jk.then{
println "The CALLABLE result is $it"
}*/
println "The promise is: ${jk.get()}"
syncQueue << "Heroku"
println "The syncQueue Result4 is: ${result.val}"
println "The OutputQueue Result is: ${outputQueue.val}"
group.shutdown()
//println "The promise is: ${jk.get()}"
println "****End The Main Thread is ${Thread.currentThread().getName()} ******"
}
}
Here we have used chainWith.We can refer to chainWith as a kind of Handler.
So when data is being populated in a SyncDataflowQueue, then the registered Handler for the SyncDataflowQueue (chainWith) gets invoked and returns another SyncDataflowQueue (result) which we have accessed here to get the resultant value from the Handler.
Now from the same main Thread when we are trying to populate outputQueue i.e. another SyncDataflowQueue, the main thread will get stuck.
So we have used another thread with Dataflow.task{...., to populate outputQueue from another thread, the same thing happens when we initiate the callble task with the help of ThreadGroup.
Please refer to the comments as provided in code snippets block.
KANBAN FLOW
No comments:
Post a Comment