Wadjet -JMS 5.3.0

Introduction

Wadjet-JMS supplies a set of classes for accessing JMS conformant messaging services. It supplies aset of classes that wrap JMS creating objects that are reused in a program. For example once you have obtained a MsgSubscriber to a given topic, any time you request another to the same topic you will get the same instance until it is explicitly destroyed.

All the objects in Wadjet-JMS are implemented as MBeans that self register with the platform MBeanServer using the Wadjet-JMX if JMX is enabled by using the relevant IMBeanServerHelper instance.

Wadjet-JMS also supplies a naming service that will delegate to whatever implementation you have configured it to use. This naming and messaging service lets you easily connect to multiple MOM (Message Oriented Messaging) services to make bridges or simply because you need to.

All the objects in Wadjet-JMS are thread safe and IShutdownHook s are created internally to ensure that all JMS resources are closed and released at program exit.

XA JMS objects are not supported in this version of Wadjet-JMS .

Using Wadjet-JMS

The following sections describe how to use the classes in the Wadjet-JMS package:

Configuring Wadjet-JMS

The objects in Wadjet-JMS are loaded as services as Services using a service locator which also calls the initialise() method. This in turn expects to finds a configuration file named jms.conf with the following properties:

Key Description Default
numServices (1..n) The number of different Connection Factories to initialise 0
n.name The name of the connection factory, used to get it from the IMessagingService implementation this can be any string bust must be unique in the file None
n.appname The name of the application, this is used to generate the JMS clientId None
n.url The URL for connecting to the Naming Server to get JMS objects for this connection None
n.icfClass The fully qualified InitialNamingContext implementation to use for the naming service com.swiftmq.jndi.InitialContextFactoryImpl
n.retryDelay The delay in milliseconds between connection retries for the naming service 10000
n.retryMax The maximum number of times to try connecting before failing 360
n.nsClass The fully qualified name of the INamingService implementation com.addc.messaging.naming.RetryNamingService
n.nsPrincipal The principal for connecting to the naming service null
n.nsCredential The credential for connecting to the naming service null

So, if you had 2 messaging services you want to connect to, you file might look something like this:

    numServices=2
    # Weblogic lookup for one of the services
    0.name=WLS
    0.appname=MyApplication
    0.icfClass=weblogic.jndi.WLInitialContextFactory
    0.url=t3://robin:7001
    0.nsPrincipal=userId
    0.nsCredential=password
    
    # SwiftMQ lookup for the other one
    1.name=SwiftMQ
    1.appname=MyApplication
    1.icfClass=com.swiftmq.jndi.InitialContextFactoryImpl
    1.url=smqp://robin:5002/timeout=10000

This configuration will allow you to perform messaging to both a Weblogic server and to SwiftMQ messaging service, perhaps to bridge between parts of a system that each use a different service.

This technique can also be used if you have multiple messaging servers in your architecture:

    numServices=3

    0.name=WebAppMom
    0.appname=MyApplication
    0.url=smqp://merlin:5002/timeout=10000
    
    1.name=BackEndMom
    1.appname=MyApplication
    1.url=smqp://robin:5002/timeout=10000

    2.name=FrontEndMom
    3.appname=MyApplication
    4.url=smqp://robin:6002/timeout=10000

This technique will also work with any other MOMs such as IBM MQSeries, FioranoMQ, SonicMQ, etc..

If you only have one connection to make you cam leave out the n. prefix to the properties and numServices=0 or leave it out altogether:

    # SwiftMQ lookup for messaging
    name=SwiftMQ
    appname=MyApplication
    url=smqp://robin:5002/timeout=10000

Back to top

Initialising The Messaging Service

The IMessagingService implementation is obtained using a locator which will create a singleton instance and initialise it the first time a request is made:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();

The initialisation of the Messaging Service includes initialising the Naming Service, after first making this call, the instance that is returned is ready to use.


Back to top

Connecting To the MOM

Once you have an instance of IMessagingService you can start to connect to the MOMs configured in the jms.conf file. This is done using MsgConnectionFactory factory instances, one for each of MOMs which are obtaines using the name defined in the configuration file:

    MsgConnectionfactory wlsFactory = messaging.getConnectionFactory("WLS");
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");

You can now get an actual connection using this factory:

    MsgConnection wlsConn = wlsFactory.getConnection("LookupName");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");

or if authentication is required:

    MsgConnection wlsConn = wlsFactory.getConnection("LookupName", userId, passwd);
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName", userId, passwd);

You are now ready to start working with Consumer and Producer objects.


Back to top

Getting and Using a Consumer

Consumers are split into Topic Subscribers and Queue Readers. Topic Subscribers can also be durable. Consumers also support a listener mechanism through the IMessagehandler interface. Consumers can optionally be transactional.

The Consumers return POJOs not JMS Messages, so if you need to get correlation ids or header info from the last message, they supply the necessary methods see Getting Header information .

You obtain Consumers using the MsgConnection object:

    MsgReceiver queueReceiver = swiftConn.getQueueReceiver(queueLookupName);

To get a transactional receiver:

    MsgReceiver queueReceiver = swiftConn.getQueueReceiver(queueLookupName, true);

To get a transactional receiver with a listener and message selector:

    List<IMessagehandler> listeners = new ArrayList<IMessagehandler>();
    listeners.add(this);
    MsgReceiver queueReceiver = swiftConn.getQueueReceiver(queueLookupName, 
                                                           listeners, 
                                                           selector, 
                                                           true);

To get a transactional receiver with a message selector:

    MsgReceiver queueReceiver = wlsConn.getQueueReceiver(queueLookupName, 
                                                         null, 
                                                         selector, 
                                                         true);

Getting a Subscriber is very similar, there are a selection of methods to choose from which can be seen in the java documentation, the simplest form is:

    MsgSubscriber topicSubscriber = swiftConn.getTopicSubscriber(topicLookupName);

For a transactional durable Subscriber:

    MsgSubscriber topicSubscriber = swiftConn.getTopicSubscriber(topicLookupName, 
                                                                 true, 
                                                                 durableName);

Listeners are added in exactly the same way as for the MsgReceiver .

You can now receive messages from your queue or topic, either by listening using an implementation of the IMessageHandler or by directly calling one of the receive() methods:

public class MyClass implements IMessageHandler {

    private AbstractConsumer m_consumer;
    ...
    ...
    public void setConsumer(AbstractConsumer consumer) {
        m_consumer = consumer;
    }
    ...
    ...
    public void void handleMessage(Object message) {
        ...
    }
}

or to read a message in a method;

public class MyClass {
    
    private MsgReceiver m_queueReceiver;
    ...
    ...
    public void initialise throws MessagingException {
        IMessagingService messaging = MessagingServiceFactory.getMessengingService();
        MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
        MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
        m_queueReceiver = swiftConn.getQueueReceiver(queueLookupName);
    }
    ...
    public MyObject getNextMyObject() throws MessagingException, ApplicationException {
        Object obj = m_queueReceiver.receive(timeout);
        if (obj instanceof MyObject} {
            ...
            ...
        }
        throw new AplicationException("Received garbage on the queue);
    }
    ...
    ...
}

If your Consumer is transactional, you can check the object you receive before the receive returns or the handleMessage method is invoked by registering an instance of the ITransactionalMsgChecker which is called after getting the message and extracting correlationId and header information.

This call-back allows you to throw a JMSException which will cause the message to roll back:

public class MyClass implements ITransactionalMsgChecker {
    
    private MsgReceiver m_queueReceiver;
    ...
    ...
    public void initialise throws MessagingException {
        IMessagingService messaging = MessagingServiceFactory.getMessengingService();
        MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
        MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
        m_queueReceiver = swiftConn.getQueueReceiver(queueLookupName, true);
        m_queueReceiver.setTransactionalForwarder(this);
    }
    ...
    public void checkMessage(Object msg) throws JMSException {
        if (!(msg instanceof MyObject)} {
            throw new JMSException("This is not what I expected");
        }
    }
    ...
    public MyObject getNextMyObject() throws MessagingException, ApplicationException {
        MyObject obj = (MyObject) m_queueReceiver.receive(timeout);
        ...
        ...
    }
    ...
    ...
}

Note that the danger in doing this is that as the message is returned to the queue, it will keep being read and rolled back until the message expires.


Back to top

Getting and Using a Producer

You get Producer (queue sender or Topic Publisher) in the same way as as Consumer:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
    
    m_queueSender = swiftConn.getQueueSender(queueLookupName);
    m_queueSenderTrans = swiftConn.getQueueSender(queueLookupName, true);
    
    m_topicPublisher = swiftConn.getTopicPublisher(topicLookupName);
    m_topicPublisherTrans = swiftConn.getTopicPublisher(topicLookupName, true);

Producers can be either synchronous or asynchronous . Synchronous producers will block until the the message has been successfully put to the MOM queue or topic whereas asynchonous producers will put the object to send on a Fifo and send them on another thread.

By default, producers are synchronous. To set a producer to run in asynchronous mode:

    m_topicPublisher.setSynchronous(false);

Note when running in asynchrounous mode the producer cannot throw any exceptions if the send failed, you will only see the failures in the log4j output.

Sending a message is slightly different for Queue senders and Topic publishers:

    m_topicPublisher.publish(myMessageObject);
    
    m_queueSender.send(myMessageObject);

If you call send on topic publisher or publish on a queue sender it will throw an UnsupportedOperationException .

There are several overloads of the publish and send methods which allow you to add headers, correlation id and a reply to destination (queue or topic).


Back to top

Getting Header Information

As the Consumer classes in Wadjet-JMS return the object that was sent and not the JMS message you have to get the header, correlation id and reply to information from the Consumer itself:

    public MyObject getNextMyObject() throws MessagingException, ApplicationException {
        Object obj = m_queueReceiver.receive(timeout);
        Map<String, Object> headers = m_queueReceiver.getMessageHeaders();
        String correlationId = m_queueReceiver.getJMSCorrelationID();
        Destination jmsDestination = m_queueReceiver.getReplyToDestination();
        ... 
    }

or in an IMessageHandler

    public void void handleMessage(Object message) {
        ...
        Map<String, Object> headers = m_consumer.getMessageHeaders();
        String correlationId = m_consumer.getJMSCorrelationID();
        Destination jmsDestination = m_consumer.getReplyToDestination();
        ...
    }

Back to top

Using Concurrent Queues

Concurrent queues allow you to share messages from a queue between multiple consumers by dealing out the messages between them. Most MOMs lock the queue for a consumer so no one else can get a message as the principal behind a queue is N:1 (producers:consumers). Topics deliver all the messages to everyone N:N.

The allows you to have multiple clients read messages from a queue without getting the messages being handled by the others by opening the queue, reading a message and then closing the queue allowing access to other concurrent readers.

You get concurrent queue receiver in the same way you get a wuwue receiver and read it or listen on it in exactly the same way:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
    ConcurrentQueueReceiver queueReceiver = swiftConn.getConcurrentQueueReceiver(
                                                                    queueLookupName);

To get a transactional receiver:

    queueReceiver = swiftConn.getConcurrentQueueReceiver(queueLookupName, true);

To get a transactional receiver with a listener and message selector:

    List<IMessagehandler> listeners = new ArrayList<IMessagehandler>();
    listeners.add(this);
    queueReceiver = swiftConn.getConcurrentQueueReceiver(queueLookupName, 
                                                         listeners, 
                                                         selector, 
                                                         true);

To get a transactional receiver with a message selector:

    queueReceiver = wlsConn.getConcurrentQueueReceiver(queueLookupName, 
                                                       null, 
                                                       selector, 
                                                       true);

Back to top

Browsing Queues

Sometimes you need to see what is in a queue while leaving the message there. This is done with a queue browser:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
    MsgBrowser browser = swiftConn.getQueueBrowser(queueLookupName, selectorString);

You can then inspect the queue. To see what messages there are waiting:

    List<Object> messages = browser.peekQueueMessages();
    ...

To check the queue size:

    int size = browser.getQueueSize();

Back to top

Using Temporary Queues

Temporary queues are used when you don't need the queue or the messages in it once a process has terminated or (a bad idea) for making synchronous requests. Whatever you you use them for you have to pass the temporary queue as a reply to to the other end so that it can read from the queue (see Getting Header Information ).

You get a temporary queue for receiving like this:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
    MsgTQReceiver tqr = swiftConn.getTempQueueReceiver(null);

or for a transactional one:

    List<IMessagehandler> listeners = new ArrayList<IMessagehandler>();
    listeners.add(this);
    tqr = swiftConn.getTempQueueReceiver(listeners, true);

You then need to inform the senders of the temporary queue:

    topicPublisher.publish(infoMessage, tqr.getTemporaryQueue(), id, headers); 

The sender end will need to create a sender:

    IMessagingService messaging = MessagingServiceFactory.getMessengingService();
    MsgConnectionfactory swiftFactory = messaging.getConnectionFactory("SwiftMQ");
    MsgConnection swiftConn = swiftFactory.getConnection("LookupName");
    ...
    ...
    msg = topicSubscriber.receive();
    TemporaryQueue tq = (TemporaryQueue) topicSubscriber.getReplyToDestination();
    
    MsgTQSender sender = swiftConn.getTempQueueSender(tq);

or for transactional:

    MsgTQSender sender = swiftConn.getTempQueueSender(tq, true);

Back to top