EdgeServerJMS

From RifidiWiki

Jump to: navigation, search

JMS in the Edge Server

This page describes how JMS is used in the Edge Server

A very, very brief JMS Introduction

JMS (Java Messaging Service) is a standard for a messaging API as specified by JSR 914. A JMS Provider is an implementation of the JMS specification. The idea is that there is a server whose job it is to route messages. It is up to the provider to define exactly how this server works. For example, some providers require you to start the server administratively from the command line and allow you to control it using JMX. Other providers allow you to start the server in your source code.

There are two types of clients to the server. A JMS Consumer will consume JMS Messages. A JMS Producer will send messages to the server.

There are two ways to use JMS. You can either use a point-to-point model, or you can use a publish-subscribe model. The basic difference is that with the point-to-point model, for every message produced, there is at most one consumer. With the publish-subscribe model, there can be more than one consumer. Point-to-point implementations will use a JMS Queue as a destination. Publish-subscribe implementations will use a JMS Topic. For a better explanation, see: this article.

Rifidi Edge Server uses ActiveMQ as its provider. With ActiveMQ, the server is called a Broker. Rifidi uses Spring to create and start the broker.

Overview

The Rifidi Edge Server uses JMS (ActiveMQ) to pass certain kinds of data from component to component. The Edge Server separates its usage of JMS into two logical components:

  • Internal Message Bus - This is used for tag data that is collected by sensor plugins (e.g. Commands executing on an Alien9800 session). It is implemented using an ActiveMQ broker with one Destination (specifically a Topic). Since it is only used internally, clients (both producers and consumers) connect to it using a VM connection.
  • External Notification - This is used to expose tag events and notification events to software outside the Edge Server. It is implemented using an ActiveMQ broker with two Destinations (one Topic for tag events and one for notification events). Clients to the broker (both producers and consumers) can connect to it using a VM connection. Clients external to the edge server can use a TCP connection.

There are two kinds of events used in the Rifidi Edge Server:

  • Tag Events - These events consist of tag data produced by sensors (depicted in blue below)
  • Notification Events - These are system events such as a new reader being created, a session state change, or a property on a reader changing. (depicted in red below)
Edgeserverjms.png

The image above shows commands (JMS Producers) producing tag events and sending them to the internal topic. Other components (such as ReaderFactories) produce notification events. There are two consumers of events that are put on the internal topic: Esper (which uses the events as a stream for event stream processing) and the notification plugin (which simply relays the events to the external tags topic. External Edge Server clients can then connect to the two external topics to get notification and tag events.

Components

There several JMS objects that are automatically created when the Edge Server starts up (inside jms.xml in the spring folder of the JMS bundle). These objects should be reused when applicable. The objects that should be reused are made available as OSGi services. To see the description of the JMS OSGi Services see

Edge_Server_Architecture#JMS_Services

Broker

As mentioned above, there are two brokers: an internal and and external one. You can connect to either broker within the edge server using a vm connection.

  • For the internal broker use this url: vm://internalBroker
  • For the external broker use this url: vm://externalBroker
  • To create a connection to the external broker from outside the vm, you can use this url: tcp://localhost:1099

Destination

There are three topic automatically created:

  • org.rifidi.edge.external.notifications - Used to send system messages to software outside of the edge server
  • org.rifidi.edge.external.tags - Used to send tag events to software outside of the edgeserver
  • org.rifidi.edge.internal - Used to collect tag messages from sensors

These three destinations are available as services in the OSGi registry. The bean names are:

  • externalNotificationsDest
  • externalTagsDest
  • internalDest

You can access them from another bundle as follows:

<!-- Get a reference to the internal Message Buffer -->
<osgi:reference id="internalMB" interface="javax.jms.Topic" bean-name="internalDest" />

Connection Factories

A connection factory allows JMS clients to create connections to the broker. There are two connection factories available in the OSGi service registry (one for each broker). Both connection factories use the vm transport.

  • internalBrokerConFac allows you to create connections to the internal broker
  • externalBrokerConFac allows you to create connections to the external broker

you can access them from another bundle as follows:

<!-- Get reference to connection factory to internal Broker -->
<osgi:reference id="confac" interface="javax.jms.ConnectionFactory" bean-name="internalBrokerConFac" />

Templates

A template is a class provided by spring that helps you produce messages. They are thread-safe and are intended to be used by multiple components. If used without any kind of pooling, templates will create a JMS Connection, a JMS Session, JMS Producer each time it sends a message. However, the two templates available in the OSGi registry wrap a connectionfactory in a PooledConnectionFactory to avoid this problem. These templates should usually not be used to consume messages and instead a SimpleMessageListenerContainer (see below) should be used instead. There are two templates available in the OSGi registry:

  • internalJMSTemplate. This is used to produce messages and send them to the internal broker. It wraps the internalBrokerConFac and has its default destination set to org.rifidi.edge.internal.
  • externalJMSTemplate This is used to produce messages and send them to the external broker. It wraps the externalBrokerConFac and does not have a default destination set.

You can access the templates from another bundle as follows:

<!-- Get a reference to the internal message bus -->
<osgi:reference id="internalMB" interface="org.springframework.jms.core.JmsTemplate" bean-name="internalJMSTemplate" />

Setting up a Producer

This section shows you how to set up a JMS producer in the Rifidi Edge Server

Tag Producer

Sensor plugins will need to be able have access to the internal message bus to add tag events to it. In order to do this, they need to have this statement in their spring xml file:

<osgi:reference id="internalMB" interface="org.springframework.jms.core.JmsTemplate" bean-name="internalJMSTemplate" />

Because the internalJMSTemplate already has the internal topic as its default destination, the spring xml does not need to get a destination out of the OSGi registry.

Now they can pass this bean into the objects that will use it (for a sensor this is typically the ReaderFactory).

To send a message, you might have to define a message creator that implements MessageCreator.

	/**
	 * Used to create a JMS message to send to the Queue that collects Tag Data
	 * 
	 * @author Kyle Neumeier - kyle@pramari.com
	 * 
	 */
	private class ObjectMessageCreator implements MessageCreator {

		/** Message to send */
		private ActiveMQObjectMessage objectMessage;

		/**
		 * Constructor.
		 * 
		 * @param tags
		 *            the tags to add to this message
		 */
		public ObjectMessageCreator(Set<TagReadEvent> tags) {
			super();
			objectMessage = new ActiveMQObjectMessage();

			try {
				objectMessage.setObject(new ReadCycle(tags, reader, System
						.currentTimeMillis()));
			} catch (JMSException e) {
				logger.warn("Unable to set tag event: " + e);
			}
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see
		 * org.springframework.jms.core.MessageCreator#createMessage(javax.jms
		 * .Session)
		 */
		@Override
		public Message createMessage(Session arg0) throws JMSException {
			return objectMessage;
		}

	}

Now you can use the send method in the template to send the message to the broker.

Sending Tags to an External Source

TODO: Since there will be classloading issues if you try to use ObjectMessages when sending tags to something that is running in a different JVM, it is better to serialize the object you are trying to send first into a ByteArrayMessage. Then deserialize the message yourself when receiving it.

Setting up a Consumer

To consume messages from a topic, you will need to add the following code to the spring xml file in your bundle

	<bean id="jmsContainer"
		class="org.springframework.jms.listener.SimpleMessageListenerContainer">
		<property name="autoStartup" value="true" />
		<property name="connectionFactory" ref="confac" />
		<property name="destination" ref="internalMB" />
		<property name="messageListener" ref="myMessageListener" />
		<property name="acceptMessagesWhileStopping" value="true" />
	</bean>

	<!-- Get reference to connection factory to internal Broker -->
	<osgi:reference id="confac" interface="javax.jms.ConnectionFactory"
		bean-name="internalBrokerConFac" />

	<!-- Get a reference to the internal Message Buffer -->
	<osgi:reference id="internalMB" interface="javax.jms.Topic"
		bean-name="internalDest" />
        
        <!-- Create a message listener -->
	<bean id="myMessageListener" class="org.rifidi.edge.examplecode.JMSConsumer"/>

The only tricky part is creating the message listener. This is a class that implements the javax.jms.MessageListener interface. It has a method called onMessage() in which you need to implement the custom logic that happens when a message is recieved. For example:

/**
 * @author Kyle Neumeier - kyle@pramari.com
 * 
 */
public class JMSConsumer implements MessageListener {

	/*
	 * (non-Javadoc)
	 * 
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message arg0) {
		System.out.println("Got message!");
		//process arg0 here!!!
	}

}

Future Work

There are still a few open questions about whether or not we are using JMS and spring correctly or not:

  • From the debug output observed it appears a new connection, reciever, and session are being created every time a new message is received. This makes me believe that org.springframework.jms.listener.SimpleMessageListenerContainer might be too simplistic and may need to be wrapped in a pool of some sort.
  • Not sure if psersistance is really being disable.
  • Right now the IP address of the external broker is hardcoded as localhost:1099. It appears however, that I can still connect to it even from a different machine. We need to figure out if this is ok. Also, we need a way to specify the port from the vm arguments passed in when the edge server first starts up.

References

A few links that I found useful when reading up on JMS & spring:

Personal tools