Archive for January, 2009

Simple configurable event processing application with Esper and Spring

January 15, 2009

As I was working on the CEP application I wanted to be able to add/remove the Esper statements, or change the expression text without having to recompile, or restart the process.  I used the Spring xml configuration file and XmlBeanFactory to define the eventProcessor bean and the statements that it will be executing against the event stream.

Let’s imagine a hypothetical system that monitors a building, which generates an event (ZoneEnteredEvent, ZoneAbandonedEvent) whenever a person enters or leaves a zone (by swiping a card, RFID, face recognition, whatever). The eventProcessor definition looks like this:

<bean id="eventProcessor" class="com.nailsandhammers.espring.processor.EventProcessor">
<property name="statements">
	<list>
			<ref bean="tooMuchExposureStatement" />
			<ref bean="duplicateEntryStatement" />
		</list>
	</property>
</bean>

When the eventProcessor bean is instantiated by the application it also instantiates all objects defined in the statements list.

The statements are then defined like this:

<bean id="tooMuchExposureStatement"
class="com.nailsandhammers.espring.statement.TooMuchExposureStatement">
<property name="expressionText"
value="select * from pattern[every s=ZoneEnteredEvent -> (timer:interval(30 minutes) and not f=ZoneAbandonedEvent(s.subjectId=f.subjectId and s.zoneId=f.zoneId and s.zoneId='A'))]" />
</bean>
<bean id="duplicateEntryStatement"
class="com.nailsandhammers.espring.statement.DuplicateEntryStatement">
<property name="expressionText"
value="select * from pattern[every s=ZoneEnteredEvent -> (f=ZoneEnteredEvent(s.subjectId=f.subjectId and s.zoneId!=f.zoneId)) and not a=ZoneAbandonedEvent(s.subjectId=a.subjectId and s.zoneId=a.zoneId)]" />
</bean>

The first statement tooMuchExposureStatement ensures that nobody stays in the Zone A longer than 30 minutes (let’s say it’s dangerous there).
The second statement duplicateEntryStatement controls that somebody entered a Zone, they cannot enter another Zone without abandoning the current one. Otherwise the person is clearly in two places at the same time, or it means somebody’s identity has been compromised, or the system is not working correctly. In any case, it is something to be looked at.

The statement could be any object that implements com.espertech.esper.client.UpdateListener and of course before the eventProcessor starts accepting any kind of events, it needs to make sure the statements get registered with the Esper CEP engine. In order to make the statements expressions configurable as well I created an abstract superclass EsperStatement that contains the expressionText and the register method that adds the statement to the Esper engine:


public abstract class EsperStatement implements UpdateListener {
    private String expressionText;

    public EsperStatement() {
        super();
    }

    public void register(EPServiceProvider serviceProvider) {
        EPStatement epStmt = serviceProvider.getEPAdministrator().createEPL(
                expressionText);
        epStmt.addListener(this);
    }

...

}

When eventProcessor starts up, it needs to initialize the Esper engine (see the documentation and samples) and register the statements:

public class EventProcessor {
...
	EPServiceProvider serviceProvider;
	private void registerEsperStatements() {
		if (statements == null || statements.size() == 0)
			return;
		serviceProvider = initServiceProvider();
		for (int i = 0; i < statements.size(); i++) {
			EsperStatement stmt = (EsperStatement) statements.get(i);
			stmt.register(serviceProvider);
		}
	}
...
}

So in the end the statements and the expressions that are used to process events are configurable components, that can be added, removed and changed. With a little bit more coding it would be possible to force the eventProcessor to reload the config file and reinitialize the CEP engine with the changed statements without even restarting the application process.
Happy experimenting with CEP and Esper!