Northwind Esper: Alerts

From RifidiWiki

Jump to: navigation, search

This is Step 4 in the Northwind Application Tutorial
Previous Step: Step 3: Esper: Track Packages
Next Step: Step 5: Send Notifications Over JMS

What You Will Learn

  • More about how to use Esper statements

Alert: Package Moved Backwards!

At this point, we can track the items that can be seen at the various read zones in our scenario. Now we need to implement the functionality that will fire an alert when an item moves backwards (that is from the weigh station to the dock door).

The Code

	/**
	 * A method that sets up business alerts
	 */
	private void setUpAlerts(){
// 1		Create  a window for alert messages
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"create window alerts.win:length(20) (alert_type int, tag_ID String)"));

// 2		create a window for item leaving the weighstation 
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"create window weighstation_recent.std:unique(tag_ID) (tag_ID String)"));
		
// 3		Insert items into weightstation_recent once they leave weighstation
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"insert rstream into weighstation_recent select tag_ID from weighstation"));
		
// 4		whenever an item is seen at the weighstation and then seen at the dockdoor, insert a new item into the alerts window
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"on pattern[every-distinct(tag.tag_ID) tag=weighstation_recent -> dockdoor(tag_ID = tag.tag_ID)] " +
			"insert into alerts " +
			"select 1 as alert_type, tag_ID as tag_ID " +
				"from weighstation_recent where tag_ID = tag.tag_ID"));
	}

The following code is an addition to the setupListeners() method that was previously defined:

		// Create a listener to handle Alerts
		StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() {
			@Override
			public void update(EventBean[] arrivals, EventBean[] departures,
					EPStatement arg2, EPServiceProvider arg3) {
				if (arrivals != null) {
					for (EventBean bean : arrivals) {
						int alertType = (Integer) bean.get("alert_type");
						String id = (String) bean.get("tag_ID");
						switch (alertType) {
						case 1:
							System.out.println("Package moved backwards: " + id );
							break;

						}
					}
				}
			}
		};
		
		//Create a query that is triggered on insert and remove events from Weigh Station Window 
		EPStatement queryAlert= esperService.getProvider().getEPAdministrator().createEPL(
				"select * from alerts");
		queryAlert.addListener(alertListener);
		statements.add(queryAlert);

Explanation of the Code

  1. First we need to create a window that will hold alerts.
    1. Using the win:length() view, the window will only keep track of the last 20 alerts
    2. This window is defined to hold two pieces of information per entry: an int that is the alert type, and a string that is the tag ID
  2. Next we create a window that will hold tags that have departed from the weigh station
  3. This statement inserts items that have left the weighstation window into the weighstation_recent window
  4. Finally we get to define the rule! This rule simply looks for the pattern of an event entering the weighstation_recent window followed by an event entering the dockdoor window which have the same ID. Once this occurs, it makes a new entry in the alerts window.
    1. We use the every-distinct operator to ensure that we will only match the same tag once in the pattern.
    2. We also insert '1' as the alert type. We will assume that that event type corresponds to a 'tag moved backwards' alert.


Alert: Package Skipped the Dock Door!

The next requirement for the Northwind application is to detect when a package appears at the weigh station but was never seen at the dock door. This is a bit different from detecting events going from the weigh station back to the dock door, because we cannot use the followed-by operator on events coming out of the dock door. It is precisely the events that were never seen at the dock door we are looking for!

The code

Add the following statements to the setUpAlerts method:

// 1		create a window for item leaving the dock door 
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"create window dockdoor_recent.std:unique(tag_ID) (tag_ID String)"));
		
// 2		Insert items into dockdoor_recent once they leave dockdoor
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"insert rstream into dockdoor_recent select tag_ID from dockdoor"));
		
// 3		Any time we see a new weighstation event, check to see if it is not already in dockdoor_recent.  If not, make a new alert.
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"insert into alerts " +
			"select 2 as alert_type, tag_ID as tag_ID " +
			"from weighstation as w " +
			"where not exists (select * from dockdoor_recent as d where d.tag_ID = w.tag_ID)"));

In addition, you need to modify the switch statement of the alerts handler:

		switch (alertType) {
		case 1:
			System.out.println("Package moved backwards: " + id );
			break;
		case 2: System.out.println("Package skipped the dock door: " + id);
			break;

		}

Explanation of the Code

  1. Create a named window that will hold tags that have recently departed from the dock_door
  2. Next we insert the tags that have left the dockdoor window into the dockdoor_recent window.
  3. This statement inserts a new alert into the alerts window whenever a tag arrives at the weighstation window which has not been seen at the dock door.
    1. We will assume that 2 is the alert type of the 'package skipped dock door' alert.
    2. The statement select * from dockdoor_recent as d where d.tag_ID = w.tag_ID is a subquery that will return results (just like a normal query). By using it in conjunction with the exists condition, we can test whether or not the subquery returned any results. If it does not return results, we need to fire an alert.

Alert: Package is Lost!

The Code

Add the following to the setUpAlerts method

// 1		Create a new alert whenever a package departs from the dock door and is not seen at the weighstation with a given time period
		statements.add(esperService.getProvider().getEPAdministrator().createEPL(
			"on pattern[every-distinct(tag.tag_ID) tag=dockdoor_recent -> " +
			"(timer:interval(5 min) and not weighstation(tag_ID = tag.tag_ID))] " +
			"insert into alerts " +
			"select 3 as alert_type, tag_ID as tag_ID " +
			"from dockdoor_recent where (tag_ID = tag.tag_ID)"));

In addition, you need to modify the switch statement of the alerts handler:

			switch (alertType) {
			case 1:	System.out.println("Package moved backwards: " + id );
				break;
			case 2: System.out.println("Package skipped the dock door: " + id);
				break;
			case 3: System.out.println("Package is lost: " + id);
				break;
			}

Explanation of the Code

  1. The only statement we have to add to implement the required functionality is, like the 'Package cannot move backwards' alert, implemented in two parts. The first part (on pattern) looks for a sequence to happen. The second part (insert into) does something whenever the first part happens.
    1. We can use the followed-by operator to specify that that we should look for an event leaving a dockdoor followed by a specific time period (15 seconds in our case) and no package with a corresponding id at the weigh station.
    2. When this pattern triggers, we need to insert a new event into the alerts window.
    3. For this kind of alert, we will assume the type will be 3.
Personal tools