Northwind JMS Send

From RifidiWiki

Jump to: navigation, search

This is Step 5 in the Northwind Application Tutorial
Previous Step: Step 4: Esper: Alerts
Next Step: Step 6: Create the Web Application

What You Will Learn

  • Importing the JMS services provided by Rifidi Edge Server
  • Exporting packages from a bundle
  • Sending out events over JMS

Once you have the esper queries defined, debugged, and working, it's time to figure out exactly how you want to react to the various events. You might want to log the events to a database. You may want to expose the events to a .Net application via web services. Or you might want to integrate with an ERP system such as SAP. For the Northwind application, we want to send out notifications over Java Messaging Service (JMS) for a web application (which we will build soon) to display.


Create Notification Classes

Creating an API Bundle
For this example, we are creating the notification classes in the same bundle that our application is in in order to simplify the tutorial. This means that our web application will have to have a direct dependency on the application bundle, so that the web bundle has access to the notification classes. A cleaner solution would be to extract out the notification classes into a separate bundle so that both the application bundle and the web bundle would depend on the API bundle.

The first in using JMS to alert another component about the RFID events that we are capturing in our Esper listeners is to create a few Java classes that define the kind of events we want to send out. In particular we will need:

  • ItemArrivalNotification that is sent out when an item has arrived at a zone (either dock door or weigh station).
  • ItemDepartureNotification that is sent out when an item has departed from a zone.
  • AlertNotification that is sent when one of the three exception cases happens.

In addition, we will define two enums:

  • EZone that defines the two zones we have in our system.
  • EAlertType that defines the three alert types that we have.

There are three steps:

  1. Create a new package. Right-click on the com.northwind.rfid.shipping package and select new->package. Name the package com.northwind.rfid.shipping.notifications.
  2. Create the classes (see below)
  3. Export the com.northwind.rfid.shipping.notifications package in the Manifest. This allows other OSGi bundles to see this package. To do this, open up the Manifest.MF file. Click on the Runtime tab. In the 'Exported Packages' section, click Add. Choose the com.northwind.rfid.shipping.notifications package. Save.

Notification Classes

ItemArrivalNotification

package com.northwind.rfid.shipping.notifications;

import java.io.Serializable;

/**
 * This class defines an Item Arrival Event
 * @author Kyle Neumeier - kyle@pramari.com
 *
 */
public class ItemArrivalNotification implements Serializable{
	/** Default Serial ID */
	private static final long serialVersionUID = 1L;
	/** The zone that this tag arrived at*/
	private final EZone zone;
	/** The tag that arrived */
	private final String tag_Id;

	/**
	 * @param zone
	 *            The zone that this tag arrived at
	 * @param tag_id
	 *            The ID of the tag that arrived
	 */
	public ItemArrivalNotification(EZone zone, String tag_id) {
		this.zone = zone;
		this.tag_Id = tag_id;
	}

	/**
	 * @return the zone
	 */
	public EZone getZone() {
		return zone;
	}

	/**
	 * @return the tag_id
	 */
	public String getTag_Id() {
		return tag_Id;
	}
}

ItemDepartureNotification

package com.northwind.rfid.shipping.notifications;

import java.io.Serializable;

/**
 * This class defines an Item Departure Event
 * 
 * @author Kyle Neuemeier - kyle@pramari.com
 * 
 */
public class ItemDepartureNotification implements Serializable {
	/** Default Serial ID */
	private static final long serialVersionUID = 1L;
	/** The zone that this tag departed from */
	private final EZone zone;
	/** The tag that departed */
	private final String tag_Id;

	/**
	 * @param zone
	 *            The zone that this tag departed from
	 * @param tag_Id
	 *            The ID of the tag that departed
	 */
	public ItemDepartureNotification(EZone zone, String tag_Id) {
		this.zone = zone;
		this.tag_Id = tag_Id;
	}

	/**
	 * @return the zone
	 */
	public EZone getZone() {
		return zone;
	}

	/**
	 * @return the tag_id
	 */
	public String getTag_Id() {
		return tag_Id;
	}
}

AlertNotification

package com.northwind.rfid.shipping.notifications;

import java.io.Serializable;

/**
 * This class defines an Alert Event.
 * 
 * @author Kyle Neumeier - kyle@pramari.com
 * 
 */
public class AlertNotification implements Serializable {

	/** Default Serial ID */
	private static final long serialVersionUID = 1L;
	/** The alert message */
	private final EAlertType alert;
	/** The tag that the alert applies to */
	private final String tagID;

	/**
	 * @param alert
	 * @param tagID
	 */
	public AlertNotification(EAlertType alert, String tagID) {
		super();
		this.alert = alert;
		this.tagID = tagID;
	}

	/**
	 * @return the alert
	 */
	public EAlertType getAlert() {
		return alert;
	}

	/**
	 * @return the tagID
	 */
	public String getTag_Id() {
		return tagID;
	}

	/**
	 * @return A message that describes this alert type
	 */
	public String getMessage() {
		switch (alert) {
		case Package_is_Lost:
			return "Package is lost";
		case Package_Moved_Backwards:
			return "Package moved backwards";
		case Package_Skipped_Dock_Door:
			return "Package skipped dock door";
		default:
			return "Unknown alert";
		}
	}
}

EZone

package com.northwind.rfid.shipping.notifications;

/**
 * This enum  lists the read zones for our system.
 * @author Kyle Neumeier - kyle@pramari.com
 *
 */
public enum EZone {
	DOCK_DOOR,
	WEIGH_STATION;
}

EAlertType

package com.northwind.rfid.shipping.notifications;

/**
 * This enum lists the kinds of alerts that are defined.
 * @author Kyle Neumeier - kyle@pramari.com
 *
 */
public enum EAlertType {
	Package_Moved_Backwards,
	Package_Skipped_Dock_Door,
	Package_is_Lost;
}

Export Notification Package

In order for another bundle to be able to use the Notification classes we created, we need to export the notification package.

  1. Open the Manifest.MF.
  2. Select the Runtime Tab.
  3. In the Exported Packages section, click add.
  4. Choose the com.northwind.rfid.shipping.notifications package.

Use JMS to Send out Notifications in Application

In this step we will modify our code to use the Notification objects to send out notifications over JMS.

Add JMS Dependencies

The first step you need to do is to add the the necessary package dependencies to your bundle. Open up the Manifest.MF, select the 'Dependencies' tab. On the 'Imported Packages' section click add choose the following packages.

  • javax.jms
  • org.apache.activemq.command
  • org.springframework.beans.factory
  • org.springframework.core
  • org.springframework.jms
  • org.springframework.jms.core

Add JMS Setters

The next step is to modify ShippingApp to have a setter for the JMSTemplate and Destination objects that spring can use to inject them.

	/** Esper service */
	private volatile EsperManagementService esperService;
	/**All statements that have been defined so far*/
	private final Set<EPStatement> statements = new CopyOnWriteArraySet<EPStatement>();
	/**JMS Destination to send messages to*/
	private volatile Destination destination;
	/**JMS Template to use to send messages*/
	private volatile JmsTemplate template;

	/**
	 * Constructor
	 */
	public ShippingApp() {
		Activator.shippingApp=this;
	}

	/**
	 * Called by spring
	 * 
	 * @param esperService
	 */
	public void setEsperService(EsperManagementService esperService) {
		this.esperService = esperService;
		start();
		setUpAlerts();
		setupListeners();
	}
	
	/**
	 * Called by spring
	 * @param destination the destination to set
	 */
	public void setDestination(Destination destination) {
		this.destination = destination;
	}

	/**
	 * Called by spring
	 * @param template the template to set
	 */
	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}

Modify spring.xml to inject the JMS objects

Open up the spring.xml file. There are a few things to take care of here.

  1. Lookup the JMS template from the OSGi service registry. You will use this object to send out notifications
  2. Create a new JMS Destination (specifically a Topic desitination). This is where the notifications will be sent.
  3. Inject the template and destination into the application.
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:osgi="http://www.springframework.org/schema/osgi"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://www.springframework.org/schema/osgi 
    http://www.springframework.org/schema/osgi/spring-osgi.xsd">

	<!-- Create the  Application bean and inject dependencies-->
	<bean id="rfidapp" class="com.northwind.rfid.shipping.ShippingApp">
		<property name="esperService" ref="esperManagementService" />
		<property name="destination" ref="NorthwindTopic" />
		<property name="template" ref="jmsTemplate" />
	</bean>

	<!--Get a reference to the Esper Management Service from the OSGi Service Registry-->
	<osgi:reference id="esperManagementService"
		interface="org.rifidi.edge.core.services.esper.EsperManagementService" />

	<!-- Create a new Topic to publish Notifications to -->
	<bean id="NorthwindTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<property name="physicalName" value="com.northwind.rfid.shipping.topic" />
	</bean>

	<!-- Lookup the JMS Template from OSGi Service Registry -->
	<osgi:reference id="jmsTemplate"
		interface="org.springframework.jms.core.JmsTemplate" bean-name="externalJMSTemplate" />
</beans>

Add a send method

We can encapsulate the logic required to send out a JMS message in one method. Add this method to the application class

       /**
	 * Method to use to send out notifications over JMS. It seriallizes the
	 * message into an array of bytes, then sends those bytes out over JMS
	 * 
	 * @param notification
	 */
	private void send(final Serializable notification) {
		if (template == null || destination == null) {
			// TODO: Log error message;
			return;
		}
		try {
			template.send(destination, new MessageCreator() {
				@Override
				public Message createMessage(Session arg0) throws JMSException {
					BytesMessage message = arg0.createBytesMessage();
					ByteArrayOutputStream bos = new ByteArrayOutputStream();
					try {
						ObjectOutput out = new ObjectOutputStream(bos);
						out.writeObject(notification);
						out.close();
						message.writeBytes(bos.toByteArray());
						return message;
					} catch (IOException e) {
						throw new JMSException(e.getMessage());
					}
				}
			});
		} catch (JmsException exception) {
			// TODO: log error message
			return;
		}
	}

Finally, let's send some messages!

Edit the dockDoorListener as follows:

		//create a listener to handle Dock Door events 
		StatementAwareUpdateListener dockDoorListener = new StatementAwareUpdateListener() {
			@Override
			public void update(EventBean[] arrivals, EventBean[] departures, EPStatement arg2,
					EPServiceProvider arg3) {
				if(arrivals!=null){
					for(EventBean bean : arrivals){
						String id = (String)bean.get("tag_ID");
						send(new ItemArrivalNotification(EZone.DOCK_DOOR, id));
					}
				}
				
				if(departures!=null){
					for(EventBean bean : departures){
						String id = (String)bean.get("tag_ID");
						send(new ItemDepartureNotification(EZone.DOCK_DOOR, id));
					}
				}
				
			}
		};

Similarly edit the weightStationListener:

		StatementAwareUpdateListener weighStationListener = new StatementAwareUpdateListener() {
			@Override
			public void update(EventBean[] arrivals, EventBean[] departures, EPStatement arg2,
					EPServiceProvider arg3) {
				if(arrivals!=null){
					for(EventBean bean : arrivals){
						String id = (String)bean.get("tag_ID");
						send(new ItemArrivalNotification(EZone.WEIGH_STATION, id));
					}
				}
				
				if(departures!=null){
					for(EventBean bean : departures){
						String id = (String)bean.get("tag_ID");
						send(new ItemDepartureNotification(EZone.WEIGH_STATION, id));
					}
				}
				
			}
		};
		

Finally edit the Alert Listener:

	StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() {
			@Override
			public void update(EventBean[] arrivals, EventBean[] departures,
					EPStatement arg2, EPServiceProvider arg3) {
				if (arrivals != null) {
					for (EventBean bean : arrivals) {
						int alertType = (Integer) bean.get("alert_type");
						String id = (String) bean.get("tag_ID");
						AlertNotification alert = null;
						switch (alertType) {
						case 1:
							alert = new AlertNotification(EAlertType.Package_Moved_Backwards, id);
							break;
						case 2:
							alert = new AlertNotification(EAlertType.Package_Skipped_Dock_Door, id);
							break;
						case 3:
							alert = new AlertNotification(EAlertType.Package_is_Lost, id);
							break;

						}
						if (alert != null) {
							send(alert);
						}
					}
				}
			}
		};
		
Personal tools