Simple configurable event processing application with Esper and Spring

By nailsandhammers

As I was working on the CEP application I wanted to be able to add/remove the Esper statements, or change the expression text without having to recompile, or restart the process.  I used the Spring xml configuration file and XmlBeanFactory to define the eventProcessor bean and the statements that it will be executing against the event stream.

Let’s imagine a hypothetical system that monitors a building, which generates an event (ZoneEnteredEvent, ZoneAbandonedEvent) whenever a person enters or leaves a zone (by swiping a card, RFID, face recognition, whatever). The eventProcessor definition looks like this:

<bean id="eventProcessor" class="com.nailsandhammers.espring.processor.EventProcessor">
<property name="statements">
	<list>
			<ref bean="tooMuchExposureStatement" />
			<ref bean="duplicateEntryStatement" />
		</list>
	</property>
</bean>

When the eventProcessor bean is instantiated by the application it also instantiates all objects defined in the statements list.

The statements are then defined like this:

<bean id="tooMuchExposureStatement"
class="com.nailsandhammers.espring.statement.TooMuchExposureStatement">
<property name="expressionText"
value="select * from pattern[every s=ZoneEnteredEvent -> (timer:interval(30 minutes) and not f=ZoneAbandonedEvent(s.subjectId=f.subjectId and s.zoneId=f.zoneId and s.zoneId='A'))]" />
</bean>
<bean id="duplicateEntryStatement"
class="com.nailsandhammers.espring.statement.DuplicateEntryStatement">
<property name="expressionText"
value="select * from pattern[every s=ZoneEnteredEvent -> (f=ZoneEnteredEvent(s.subjectId=f.subjectId and s.zoneId!=f.zoneId)) and not a=ZoneAbandonedEvent(s.subjectId=a.subjectId and s.zoneId=a.zoneId)]" />
</bean>

The first statement tooMuchExposureStatement ensures that nobody stays in the Zone A longer than 30 minutes (let’s say it’s dangerous there).
The second statement duplicateEntryStatement controls that somebody entered a Zone, they cannot enter another Zone without abandoning the current one. Otherwise the person is clearly in two places at the same time, or it means somebody’s identity has been compromised, or the system is not working correctly. In any case, it is something to be looked at.

The statement could be any object that implements com.espertech.esper.client.UpdateListener and of course before the eventProcessor starts accepting any kind of events, it needs to make sure the statements get registered with the Esper CEP engine. In order to make the statements expressions configurable as well I created an abstract superclass EsperStatement that contains the expressionText and the register method that adds the statement to the Esper engine:


public abstract class EsperStatement implements UpdateListener {
    private String expressionText;

    public EsperStatement() {
        super();
    }

    public void register(EPServiceProvider serviceProvider) {
        EPStatement epStmt = serviceProvider.getEPAdministrator().createEPL(
                expressionText);
        epStmt.addListener(this);
    }

...

}

When eventProcessor starts up, it needs to initialize the Esper engine (see the documentation and samples) and register the statements:

public class EventProcessor {
...
	EPServiceProvider serviceProvider;
	private void registerEsperStatements() {
		if (statements == null || statements.size() == 0)
			return;
		serviceProvider = initServiceProvider();
		for (int i = 0; i < statements.size(); i++) {
			EsperStatement stmt = (EsperStatement) statements.get(i);
			stmt.register(serviceProvider);
		}
	}
...
}

So in the end the statements and the expressions that are used to process events are configurable components, that can be added, removed and changed. With a little bit more coding it would be possible to force the eventProcessor to reload the config file and reinitialize the CEP engine with the changed statements without even restarting the application process.
Happy experimenting with CEP and Esper!

5 Responses to “Simple configurable event processing application with Esper and Spring”

  1. German Says:

    Cool stuff!
    BTW, do you see any way in which db4o and Esper could work together?
    Best!
    German

    • nailsandhammers Says:

      Hi German,
      There is no direct reference to db4o on the espertech website, even though they do support all major relational databases with their EsperHA product. But the idea of dropping event objects into an object storage feels pretty natural, actually I began using db4o for unit testing while working with Esper as I needed to store the derived events somewhere.

  2. German Says:

    Thx. That was my impression too!

    Best.

    German

  3. Laura Says:

    Maybe I don’t understand… but in which way can be removed statements at runtime?

    • nailsandhammers Says:

      One way how to do this is to trigger a refresh of the entire Spring object hierarchy using for example

      beanFactory.destroyBean("eventProcessor", eventProcessor);
      beanFactory = new XmlBeanFactory(new ClassPathResource(CONFIG_FILE));
      eventProcessor = (EventProcessor) beanFactory.getBean("eventProcessor");

      this will re-create the eventProcessor object and all associated statements, but you need to make sure that (in this case) that the Esper objects get correctly released and reinitialized as well.
      Of course this is somewhat dangerous approach, since if there is any object that holds a reference to the original eventProcessor or any of the associated objects, it’s going to bomb.

Leave a Reply