I'm playing around with the GPars library while working to improve the scalability of a matching system. I'd like to be able to query the database and immediately query the database while the results are being processed concur开发者_开发百科rently. The bottleneck is reading from the database so I would like to keep the database busy full time while processing the results asynchronously when they are available. I realise I may have some fundamental misunderstandings on how the actor framework works and I'd be happy to be corrected!
In pseudo code I'm trying to do the following:
Define two actors, One for running selects against the database and another for processing the records.
- queryActor querys database and sends results to processorActor
- queryActor immediately querys database again without waiting for processorActor to finish
I could probably achieve the simple use case without using actors but my end goal is to have an actor pool that is always working on new queries with potentially different datasources in order to increase the throughput of the system in general.
The processing Actor will always be much faster than the database query so I would like to query multiple replicas concurrently in future.
def processor = actor {
loop {
react {querySet ->
println "processing recordset"
if (querySet instanceof Object[]) {
MatcherDataRowProcessor matcher = new MatcherDataRowProcessor(matchedRecords, matchedRecordSet);
matchedRecords = matcher.processRecordset(querySet);
reply matchedRecords
}
else {
println 'processor fed nothing, halting processor actor'
stop()
}
}
}
}
def dbqueryer = actor {
println "dbqueryer has started"
while (batchNum.longValue() <= loopLimiter) {
println "hitting db"
Object[] querySet
def thisRuleBatch = new MatchRuleBatch(targetuidFrom, targetuidTo)
thisRuleBatch.targetuidFrom = batchNum * perBatch - perBatch
thisRuleBatch.targetuidTo = thisRuleBatch.targetuidFrom + perBatch
thisRuleBatch.targetName = targetName
thisRuleBatch.whereClause = whereClause
querySet = dao.getRecordSet(thisRuleBatch)
processor.send querySet
batchNum++
}
react { processedRecords ->
processor.send false
}
}
I would suggest taking a look at Dataflow Queues in the Dataflow Concurrency section of the user guide for GPars. You may find that Dataflows provide a better/cleaner abstraction for your problem at hand. Dataflows can also be used in conjunction with actors.
I think either actors or dataflows would work in this situation and feel that the decision comes down to which one provides the abstraction that more closely matches what you are trying to accomplish. For me, the concept of tasks, queues, dataflows seems to be a closer fit terminology-wise.
After some more research I have found that the DataFlow concurrency stuff in Gpars is actually built on top of the Actor support. The DataflowOperatorTest in the gpars java demo distribution (I need to do a java implementation) seems to be a good match for what I need to do. The main thread waits for multiple stream inputs to be populated which in my case are the parallel database queries.
精彩评论