Wadjet -MBean 5.3.0

Introduction

Wadjet-MBean supplies a set of utility MBeans based on the Wadjet-JMX module.

The following MBeans in this package can be integrated into you progtrams:

  • ServicePropertiesMBean
  • ManagedFifo
  • StatisticsCollectorMBean
  • ManagedThread

Using Wadjet-MBeans

The following sections describe how to use the MBeans in your programs:

Managed Service Properties

The ServicePropertiesMBean is an MBean implementation of the IServiceProperties interface that allows you to manage the configuration of a process using JMX. To get this class to be the IServiceProperties implementation for your process you need to set the service-properties entry in the services.conf file to the value com.addc.mbean.props.ServicePropertiesMBean .

The ServicePropertiesMBean allows you to set some properties to be hidden and others to be writable, to do this you will need a file names SPOverrides.conf on your class path that contains one or both properties hidden and writable that contain a list of keys, for example

    hidden=auditfwd.jdbc.password, File.property.name1
    writable=Alarming.snmp.host, Alarming.syslog.host

Otherwise there are no differences to using this class than to using the standard IServicesProperties implementation.


Back to top

Managed Fifos

The ManagedFifo MBean, wraps a Fifo to supply management. You can then change the Maximum Queue Depth and enable/disable the Fifo using JMX. To get a ManagedFifo you simply create a Fifo or a FifoWithThresholds and then create and register the MBean:

    Fifo fifo = new Fifo();
    ManagedFifo mfifo = new ManagedFifo(fifo);
    m_mbs.registerAbstractMBean(mfifo, "Fifo:type=Fifo");

or

    FifoWithThresholds fifo = new FifoWithThresholds();
    ManagedFifo mfifo = new ManagedFifo(fifo);
    m_mbs.registerAbstractMBean(mfifo, "Fifo:type=FifoWithThresholds");

Back to top

Managed Threads

The AManagedThead class is an abstract thread that supports management over JMX. It is designed to supply threads in a process that cycle performing the same or a similar task, for example reading a database queue table. To this end the class supplies 4 methods you must implement:

  • doInitialisation() where you perform any initialisation tasks like getting a database connection or creating a subscriber to a Topic etc..
  • doExecute() where you code any processing the thread should do on each cycle
  • doCleanup() where you would release any resources you obtained in the doInitialisation metod.
  • getDescription() which returns a text description of the object used in the MBeanInfo.

and some other helper methods which by default are empty:

  • initialiseRun() which is invoked prioor to starting the run loop where you can code to perform any processing needed after the thread has started and before entering the processing loop.
  • getExtraAttributes() to get any extra MBean Attributes your implementation may want to publish over JMX.
  • getExtraOperations() to any extra MBean Operations your implementation may want to publish over JMX.

You create managed threads by writing a class that extends the AManagedThread class and implement these methods. The Managed Thread will register itself with the MBeanServer after calling your doInitialisation() method with an object name Domain:tupe=ManagedThread,name=Thread-Name where Domain and Thread-Name are passed as parameters to the constructor.

public class AuditQueueReader extends AManagedThread 
                                implements IShutdownHook, IThresholdListener {

    private static final long RECONN_DELAY = 5000;

    private FifoWithThresholds m_fifo;
    private long m_msgCount = 0L;
    private IAuditForwarder m_forwarder;
    private long m_reconnectTime;
    private List<SecAuditEvent> m_buffer;
    private int m_priority = Thread.NORM_PRIORITY;

    private AuditForwardConfig m_config;

    /**
     * Create a new AuditQueueReader
     */
    public AuditQueueReader(AuditForwardConfig config) {
        super("AuditForwarder", "AuditQueueReader");
        m_config = config;
        m_logger = Logger.getLogger(getClass());
    }

    /**
     * @see ch.post.pf.mw.util.shutdown.ShutdownHook#atexit()
     */
    public void atexit() {
        m_logger.info("Shutdown the reader.");
        if (!isShutdownRequested() && !isTerminated()) {
            m_fifo.notifyReaders();
            shutdown();
            join();
        }
    }

    /**
     * Get the number of messages that have been processed.
     * 
     * @return the number of messages that have been processed.
     */
    public long getMessageCount() {
        return m_msgCount;
    }

    /**
     * Buffer the event in a local linked list. When the buffer reaches 100
     * items save it to the queue persistence file.
     * 
     * @param event
     *            The event to buffer.
     */
    private void bufferEvent(SecAuditEvent event) {
        m_buffer.add(event);
        if (m_buffer.size() >= 100) {
            m_logger.debug("Saving buffer to persistent store.");
            m_forwarder.save(m_buffer);
            m_buffer.clear();
        }
    }

    /**
     * @see com.addc.mbean.thread.AManagedThread#doCleanup()
     */
    protected void doCleanup() {
        m_forwarder.cleanup();
    }

    /**
     * @see com.addc.mbean.thread.AManagedThread#doExecute()
     */
    protected void doExecute() {
        SecAuditEvent event = (SecAuditEvent) m_fifo.get(2000);
        if (event != null) {
            m_msgCount++;

            // ***************************************************************
            // Forward message, if this fails buffer it and save messaged in
            // blocks.
            // ***************************************************************

            if (m_reconnectTime > System.currentTimeMillis()) {
                bufferEvent(event);
            } else {
                if (!m_forwarder.forward(m_buffer, event)) {
                    m_reconnectTime = System.currentTimeMillis() + RECONN_DELAY;
                    bufferEvent(event);
                }
            }

        }
    }

    /**
     * @see com.addc.mbean.thread.AManagedThread#doInitialisation()
     */
    protected void doInitialisation() throws Exception {
        m_buffer = new LinkedList<SecAuditEvent>();
        m_fifo = m_config.getFifo();
        m_forwarder = m_config.getAuditForwarder();
    }

    /**
     * @see com.addc.mbean.thread.AManagedThread#getDescription()
     */
    protected String getDescription() {
        return "Audit Queue Reader";
    }

    /**
     * @see com.addc.util.queue.IThresholdListener#threshholdPassed(
     *           com.addc.util.queue.ThresholdEvent)
     */
    public void threshholdPassed(ThresholdEvent event) {
        if (event == ThresholdEvent.FILLING) {
            m_priority++;
            m_logger.warn("Increasing Reader priority to " + m_priority);
        } else {
            m_priority++;
            m_logger.warn("Decreasing Reader priority to " + m_priority);
        }
        setPriority(m_priority);
    }

    /**
     * @see com.addc.mbean.thread.AManagedThread#getExtraAttributes()
     */
    protected MBeanAttributeInfo[] getExtraAttributes() {
        MBeanAttributeInfo[] mbaa = new MBeanAttributeInfo[1];
        mbaa[0] = new MBeanAttributeInfo("MessageCount", 
                long.class.getName(),
                "The message count",
                true,
                false,
                false); 
        return mbaa;
    }
    
}     

This example is taken from the AuditForward Server.


Back to top

Managed Statistics

The StatisticsCollectorMBean supplies an MBean version of the IStatistics interface. To use these MBeans you need to set the statistics-service key in the services.conf file to com.addc.mbean.statistics.StatisticsCollectorMBean . Then the StatisticsServiceFactory will return instances of this class.

The StatisticsCollectorMBean registers itself with MBeanServer during initialisation int the Domain Statistics: .

WARNING If you use a lot of statistics in you process, there may be an unmanagable number of MBeans created.


Back to top