Difference between revisions of "Northwind Esper: Alerts"
From RifidiWiki
Line 1: | Line 1: | ||
− | + | This is Step 4 in the [[Northwind Tutorial|Northwind Application Tutorial]]<br> | |
− | + | Previous Step: [[Northwind Esper: Track Packages| Step 3: Esper: Track Packages]]<br> | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | This is Step 4 in the [[Northwind Tutorial|Northwind Application Tutorial]] | + | |
− | Previous Step: [[Northwind Esper: Track Packages| Step 3: Esper: Track Packages]] | + | |
Next Step: [[Northwind JMS Send | Step 5: Send Notifications Over JMS]] | Next Step: [[Northwind JMS Send | Step 5: Send Notifications Over JMS]] | ||
===What You Will Learn=== | ===What You Will Learn=== | ||
Line 15: | Line 7: | ||
At this point, we can track the items that can be seen at the various read zones in our scenario. Now we need to implement the functionality that will fire an alert when an item moves backwards (that is from the weigh station to the dock door). | At this point, we can track the items that can be seen at the various read zones in our scenario. Now we need to implement the functionality that will fire an alert when an item moves backwards (that is from the weigh station to the dock door). | ||
====The Code==== | ====The Code==== | ||
− | + | <pre> | |
/** | /** | ||
* A method that sets up business alerts | * A method that sets up business alerts | ||
Line 22: | Line 14: | ||
// 1 Create a window for alert messages | // 1 Create a window for alert messages | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "create window alerts.win:length(20) (alert_type int, tag_ID String)")); | |
// 2 create a window for item leaving the weighstation | // 2 create a window for item leaving the weighstation | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "create window weighstation_recent.std:unique(tag_ID) (tag_ID String)")); | |
// 3 Insert items into weightstation_recent once they leave weighstation | // 3 Insert items into weightstation_recent once they leave weighstation | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "insert rstream into weighstation_recent select tag_ID from weighstation")); | |
// 4 whenever an item is seen at the weighstation and then seen at the dockdoor, insert a new item into the alerts window | // 4 whenever an item is seen at the weighstation and then seen at the dockdoor, insert a new item into the alerts window | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "on pattern[every-distinct(tag.tag_ID) tag=weighstation_recent -> dockdoor(tag_ID = tag.tag_ID)] " + | |
− | + | "insert into alerts " + | |
− | + | "select 1 as alert_type, tag_ID as tag_ID " + | |
− | + | "from weighstation_recent where tag_ID = tag.tag_ID")); | |
} | } | ||
− | + | </pre> | |
The following code is an addition to the setupListeners() method that was previously defined: | The following code is an addition to the setupListeners() method that was previously defined: | ||
− | + | <pre> | |
// Create a listener to handle Alerts | // Create a listener to handle Alerts | ||
StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() { | StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() { | ||
Line 50: | Line 42: | ||
if (arrivals != null) { | if (arrivals != null) { | ||
for (EventBean bean : arrivals) { | for (EventBean bean : arrivals) { | ||
− | int alertType = (Integer) bean.get( | + | int alertType = (Integer) bean.get("alert_type"); |
− | String id = (String) bean.get( | + | String id = (String) bean.get("tag_ID"); |
switch (alertType) { | switch (alertType) { | ||
case 1: | case 1: | ||
− | System.out.println( | + | System.out.println("Package moved backwards: " + id ); |
break; | break; | ||
Line 65: | Line 57: | ||
//Create a query that is triggered on insert and remove events from Weigh Station Window | //Create a query that is triggered on insert and remove events from Weigh Station Window | ||
EPStatement queryAlert= esperService.getProvider().getEPAdministrator().createEPL( | EPStatement queryAlert= esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "select * from alerts"); | |
queryAlert.addListener(alertListener); | queryAlert.addListener(alertListener); | ||
statements.add(queryAlert); | statements.add(queryAlert); | ||
− | + | </pre> | |
====Explanation of the Code==== | ====Explanation of the Code==== | ||
# First we need to create a window that will hold alerts. | # First we need to create a window that will hold alerts. | ||
− | ## Using the [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/epl-views.html#view-win-length | + | ## Using the [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/epl-views.html#view-win-length <tt>win:length()</tt>] view, the window will only keep track of the last 20 alerts |
## This window is defined to hold two pieces of information per entry: an int that is the alert type, and a string that is the tag ID | ## This window is defined to hold two pieces of information per entry: an int that is the alert type, and a string that is the tag ID | ||
# Next we create a window that will hold tags that have departed from the weigh station | # Next we create a window that will hold tags that have departed from the weigh station | ||
− | # This statement inserts items that have left the | + | # This statement inserts items that have left the <tt>weighstation</tt> window into the <tt>weighstation_recent</tt> window |
− | # Finally we get to define the rule! This rule simply looks for the pattern of an event entering the | + | # Finally we get to define the rule! This rule simply looks for the pattern of an event entering the <tt>weighstation_recent</tt> window followed by an event entering the <tt>dockdoor</tt> window which have the same ID. Once this occurs, it makes a new entry in the <tt>alerts</tt> window. |
− | ## We use the [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/event_patterns.html#pattern-logical-everydistinct | + | ## We use the [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/event_patterns.html#pattern-logical-everydistinct <tt>every-distinct</tt>] operator to ensure that we will only match the same tag once in the pattern. |
## We also insert '1' as the alert type. We will assume that that event type corresponds to a 'tag moved backwards' alert. | ## We also insert '1' as the alert type. We will assume that that event type corresponds to a 'tag moved backwards' alert. | ||
Line 84: | Line 76: | ||
The next requirement for the Northwind application is to detect when a package appears at the weigh station but was never seen at the dock door. This is a bit different from detecting events going from the weigh station back to the dock door, because we cannot use the followed-by operator on events coming out of the dock door. It is precisely the events that were never seen at the dock door we are looking for! | The next requirement for the Northwind application is to detect when a package appears at the weigh station but was never seen at the dock door. This is a bit different from detecting events going from the weigh station back to the dock door, because we cannot use the followed-by operator on events coming out of the dock door. It is precisely the events that were never seen at the dock door we are looking for! | ||
====The code==== | ====The code==== | ||
− | Add the following statements to the | + | Add the following statements to the <tt>setUpAlerts</tt> method: |
− | + | <pre> | |
// 1 create a window for item leaving the dock door | // 1 create a window for item leaving the dock door | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "create window dockdoor_recent.std:unique(tag_ID) (tag_ID String)")); | |
// 2 Insert items into dockdoor_recent once they leave dockdoor | // 2 Insert items into dockdoor_recent once they leave dockdoor | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "insert rstream into dockdoor_recent select tag_ID from dockdoor")); | |
// 3 Any time we see a new weighstation event, check to see if it is not already in dockdoor_recent. If not, make a new alert. | // 3 Any time we see a new weighstation event, check to see if it is not already in dockdoor_recent. If not, make a new alert. | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "insert into alerts " + | |
− | + | "select 2 as alert_type, tag_ID as tag_ID " + | |
− | + | "from weighstation as w " + | |
− | + | "where not exists (select * from dockdoor_recent as d where d.tag_ID = w.tag_ID)")); | |
− | + | </pre> | |
In addition, you need to modify the switch statement of the alerts handler: | In addition, you need to modify the switch statement of the alerts handler: | ||
− | + | <pre> | |
switch (alertType) { | switch (alertType) { | ||
case 1: | case 1: | ||
− | System.out.println( | + | System.out.println("Package moved backwards: " + id ); |
break; | break; | ||
− | case 2: System.out.println( | + | case 2: System.out.println("Package skipped the dock door: " + id); |
break; | break; | ||
} | } | ||
− | + | </pre> | |
==== Explanation of the Code ==== | ==== Explanation of the Code ==== | ||
# Create a named window that will hold tags that have recently departed from the dock_door | # Create a named window that will hold tags that have recently departed from the dock_door | ||
− | # Next we insert the tags that have left the | + | # Next we insert the tags that have left the <tt>dockdoor</tt> window into the <tt>dockdoor_recent</tt> window. |
− | # This statement inserts a new alert into the alerts window whenever a tag arrives at the | + | # This statement inserts a new alert into the alerts window whenever a tag arrives at the <tt>weighstation</tt> window which has not been seen at the dock door. |
## We will assume that 2 is the alert type of the 'package skipped dock door' alert. | ## We will assume that 2 is the alert type of the 'package skipped dock door' alert. | ||
− | ## The statement | + | ## The statement <tt>select * from dockdoor_recent as d where d.tag_ID = w.tag_ID</tt> is a [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/epl_clauses.html#epl-subqueries <tt>subquery</tt>] that will return results (just like a normal query). By using it in conjunction with the [http://esper.codehaus.org/esper-3.2.0/doc/reference/en/html/epl_clauses.html#epl-subqueries-exists <tt>exists</tt>] condition, we can test whether or not the subquery returned any results. If it does not return results, we need to fire an alert. |
===Alert: Package is Lost!=== | ===Alert: Package is Lost!=== | ||
====The Code==== | ====The Code==== | ||
Add the following to the setUpAlerts method | Add the following to the setUpAlerts method | ||
− | + | <pre> | |
// 1 Create a new alert whenever a package departs from the dock door and is not seen at the weighstation with a given time period | // 1 Create a new alert whenever a package departs from the dock door and is not seen at the weighstation with a given time period | ||
statements.add(esperService.getProvider().getEPAdministrator().createEPL( | statements.add(esperService.getProvider().getEPAdministrator().createEPL( | ||
− | + | "on pattern[every-distinct(tag.tag_ID) tag=dockdoor_recent -> " + | |
− | + | "(timer:interval(5 min) and not weighstation(tag_ID = tag.tag_ID))] " + | |
− | + | "insert into alerts " + | |
− | + | "select 3 as alert_type, tag_ID as tag_ID " + | |
− | + | "from dockdoor_recent where (tag_ID = tag.tag_ID)")); | |
− | + | </pre> | |
In addition, you need to modify the switch statement of the alerts handler: | In addition, you need to modify the switch statement of the alerts handler: | ||
− | + | <pre> | |
switch (alertType) { | switch (alertType) { | ||
− | case 1: System.out.println( | + | case 1: System.out.println("Package moved backwards: " + id ); |
break; | break; | ||
− | case 2: System.out.println( | + | case 2: System.out.println("Package skipped the dock door: " + id); |
break; | break; | ||
− | case 3: System.out.println( | + | case 3: System.out.println("Package is lost: " + id); |
break; | break; | ||
} | } | ||
− | + | </pre> | |
====Explanation of the Code==== | ====Explanation of the Code==== |
Latest revision as of 19:42, 26 November 2010
This is Step 4 in the Northwind Application Tutorial
Previous Step: Step 3: Esper: Track Packages
Next Step: Step 5: Send Notifications Over JMS
Contents
What You Will Learn
- More about how to use Esper statements
Alert: Package Moved Backwards!
At this point, we can track the items that can be seen at the various read zones in our scenario. Now we need to implement the functionality that will fire an alert when an item moves backwards (that is from the weigh station to the dock door).
The Code
/** * A method that sets up business alerts */ private void setUpAlerts(){ // 1 Create a window for alert messages statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window alerts.win:length(20) (alert_type int, tag_ID String)")); // 2 create a window for item leaving the weighstation statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window weighstation_recent.std:unique(tag_ID) (tag_ID String)")); // 3 Insert items into weightstation_recent once they leave weighstation statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert rstream into weighstation_recent select tag_ID from weighstation")); // 4 whenever an item is seen at the weighstation and then seen at the dockdoor, insert a new item into the alerts window statements.add(esperService.getProvider().getEPAdministrator().createEPL( "on pattern[every-distinct(tag.tag_ID) tag=weighstation_recent -> dockdoor(tag_ID = tag.tag_ID)] " + "insert into alerts " + "select 1 as alert_type, tag_ID as tag_ID " + "from weighstation_recent where tag_ID = tag.tag_ID")); }
The following code is an addition to the setupListeners() method that was previously defined:
// Create a listener to handle Alerts StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() { @Override public void update(EventBean[] arrivals, EventBean[] departures, EPStatement arg2, EPServiceProvider arg3) { if (arrivals != null) { for (EventBean bean : arrivals) { int alertType = (Integer) bean.get("alert_type"); String id = (String) bean.get("tag_ID"); switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break; } } } } }; //Create a query that is triggered on insert and remove events from Weigh Station Window EPStatement queryAlert= esperService.getProvider().getEPAdministrator().createEPL( "select * from alerts"); queryAlert.addListener(alertListener); statements.add(queryAlert);
Explanation of the Code
- First we need to create a window that will hold alerts.
- Using the win:length() view, the window will only keep track of the last 20 alerts
- This window is defined to hold two pieces of information per entry: an int that is the alert type, and a string that is the tag ID
- Next we create a window that will hold tags that have departed from the weigh station
- This statement inserts items that have left the weighstation window into the weighstation_recent window
- Finally we get to define the rule! This rule simply looks for the pattern of an event entering the weighstation_recent window followed by an event entering the dockdoor window which have the same ID. Once this occurs, it makes a new entry in the alerts window.
- We use the every-distinct operator to ensure that we will only match the same tag once in the pattern.
- We also insert '1' as the alert type. We will assume that that event type corresponds to a 'tag moved backwards' alert.
Alert: Package Skipped the Dock Door!
The next requirement for the Northwind application is to detect when a package appears at the weigh station but was never seen at the dock door. This is a bit different from detecting events going from the weigh station back to the dock door, because we cannot use the followed-by operator on events coming out of the dock door. It is precisely the events that were never seen at the dock door we are looking for!
The code
Add the following statements to the setUpAlerts method:
// 1 create a window for item leaving the dock door statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window dockdoor_recent.std:unique(tag_ID) (tag_ID String)")); // 2 Insert items into dockdoor_recent once they leave dockdoor statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert rstream into dockdoor_recent select tag_ID from dockdoor")); // 3 Any time we see a new weighstation event, check to see if it is not already in dockdoor_recent. If not, make a new alert. statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert into alerts " + "select 2 as alert_type, tag_ID as tag_ID " + "from weighstation as w " + "where not exists (select * from dockdoor_recent as d where d.tag_ID = w.tag_ID)"));
In addition, you need to modify the switch statement of the alerts handler:
switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break; case 2: System.out.println("Package skipped the dock door: " + id); break; }
Explanation of the Code
- Create a named window that will hold tags that have recently departed from the dock_door
- Next we insert the tags that have left the dockdoor window into the dockdoor_recent window.
- This statement inserts a new alert into the alerts window whenever a tag arrives at the weighstation window which has not been seen at the dock door.
- We will assume that 2 is the alert type of the 'package skipped dock door' alert.
- The statement select * from dockdoor_recent as d where d.tag_ID = w.tag_ID is a subquery that will return results (just like a normal query). By using it in conjunction with the exists condition, we can test whether or not the subquery returned any results. If it does not return results, we need to fire an alert.
Alert: Package is Lost!
The Code
Add the following to the setUpAlerts method
// 1 Create a new alert whenever a package departs from the dock door and is not seen at the weighstation with a given time period statements.add(esperService.getProvider().getEPAdministrator().createEPL( "on pattern[every-distinct(tag.tag_ID) tag=dockdoor_recent -> " + "(timer:interval(5 min) and not weighstation(tag_ID = tag.tag_ID))] " + "insert into alerts " + "select 3 as alert_type, tag_ID as tag_ID " + "from dockdoor_recent where (tag_ID = tag.tag_ID)"));
In addition, you need to modify the switch statement of the alerts handler:
switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break; case 2: System.out.println("Package skipped the dock door: " + id); break; case 3: System.out.println("Package is lost: " + id); break; }
Explanation of the Code
- The only statement we have to add to implement the required functionality is, like the 'Package cannot move backwards' alert, implemented in two parts. The first part (on pattern) looks for a sequence to happen. The second part (insert into) does something whenever the first part happens.
- We can use the followed-by operator to specify that that we should look for an event leaving a dockdoor followed by a specific time period (15 seconds in our case) and no package with a corresponding id at the weigh station.
- When this pattern triggers, we need to insert a new event into the alerts window.
- For this kind of alert, we will assume the type will be 3.