What is the recommended way to create a completely custom aggregator in mule 3.x? By completely custom, I mean according to my own logic, not using correlation IDs, message counts, etc.
The documentation on the mulesoft site is outdated, saying to use AbstractEventAggregator which does not exist in 3.x:
http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio
Digging deeper, it looks like this class has been renamed to AbstractAggregator in 3.x:
http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/rout开发者_运维百科ing/AbstractAggregator.html
However, there are no examples that show how to use this. The LoanBroker example described in the first link above actually uses a correlation aggregator (in the 2.x examples, which I assume is what the document is referring to).
At one point, there was an abstract class that had abstract methods shouldAggregate and doAggregate. This is the kind of class I would like to extend.
Look at TestAggregator
below for an example of subclassing AbstractAggregator.
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;
import java.util.Iterator;
public class TestAggregator extends AbstractAggregator
{
@Override
protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
{
return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
{
@Override
public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
{
StringBuffer buffer = new StringBuffer(128);
try
{
for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
{
MuleEvent event = iterator.next();
try
{
buffer.append(event.transformMessageToString());
}
catch (TransformerException e)
{
throw new AggregationException(events, null, e);
}
}
}
catch (ObjectStoreException e)
{
throw new AggregationException(events,null,e);
}
logger.debug("event payload is: " + buffer.toString());
return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
}
};
}
}
精彩评论