通知(Notification)を行うNotificationEmitterを実装したMXBean

JMXでの「通知」を行えるMBean(MXBean)を作った。その概要を説明するためのサンプル。JVMのOldピープ増加速度を通知する(jstatを使えば通知はできなくてもシェルレベルでログ出力まではできそう)。実行時のJConsoleイメージはこのエントリの下のほうにのせる。

MBean(MXBean)のインターフェース

通知がメインなのでとりあえず属性一つ。

package ihiroky.sample;

import javax.management.MXBean;

@MXBean
public interface HeapMonitor {
    String getHeapOldIncreaseSpeed();
}
Bean本体

MBeanの公開インターフェースとNotificationEmitterインターフェースを実装する。NotificationBroadcasterSupportを継承してもよいけれど親は空けておきたいのでNotificationEmitterを利用。でも結局はNotificationBroadcasterSupportに委譲してる。通知を行う手順としては、

  1. NotificationBroadcasterSupportにNotificationListenerを登録する
  2. MBeanServerにMBean登録する
  3. 通知内容を作成する
  4. 通知を送信する

MBeanServerにMBean登録してから、MBeanServerを介してMBeanにNotificationListenerを追加してもいいかも。この場合はNotificationBroadcasterSupportがインスタンス化される前にgetNotificationInfo()が呼び出されるようなるのでここで移譲せずに実装メソッドに直接MBeanNotificationInfo[]を返すロジックを書いておかないといけない。

package ihiroky.sample;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.LinkedList;
import java.util.List;

import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;

public class HeapMonitorImpl implements Runnable, NotificationEmitter, HeapMonitor {

    private volatile boolean running;
    private volatile Thread thread;
    
    private MemoryPoolMXBean heapOldPool;
    private NotificationBroadcasterSupport broadcaster;
    private double increaseSpeed;
    
    private static final String HEAP_OLD_NAME = "Tenured Gen";
    private static final String NOTIFICATION_SOURCE = "ihiroky.sample:type=monitor,name=HeapMonitor";
    private static final String NOTIFICATION_TYPE   = "ihiroky.sample.monitor.heap.interval";
    private static final int    INTERVAL = 3000;
        
    public synchronized void start() {
        if (thread == null && ! running) {
            running = true;
            thread = new Thread(this, "HeapMonitor");
            thread.start();
        }
    }
    
    public synchronized void stop() {
        if (thread != null) {
            running = false;
            thread.interrupt();
        }
    }

    private void initialize() throws Exception {
        this.heapOldPool = getMemoryPoolMXBean(HEAP_OLD_NAME);
        NotificationBroadcasterSupport broadcaster = new NotificationBroadcasterSupport();
        // filter と handback は持たない
        broadcaster.addNotificationListener(new SystemOutNotificationListener(), null, null); /** 1 **/
        this.broadcaster = broadcaster;
        
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        server.registerMBean(this, new ObjectName(NOTIFICATION_SOURCE)); /** 2 **/
    }
    
    private MemoryPoolMXBean getMemoryPoolMXBean(String mbeanName) {
        List<MemoryPoolMXBean> memoryPoolBeans= ManagementFactory.getMemoryPoolMXBeans();
        MemoryPoolMXBean heapOldPool = null;
        for (MemoryPoolMXBean pool : memoryPoolBeans) {
            if (mbeanName.equals(pool.getName())) {
                heapOldPool = pool;
                break;
            }
        }
        return heapOldPool; 
    }
    
    private class SystemOutNotificationListener implements NotificationListener {
        @Override
        public void handleNotification(Notification notification, Object handback) {
            System.out.println("type     : " + notification.getType());
            System.out.println("sequence : " + notification.getSequenceNumber());
            System.out.println("message  : " + notification.getMessage());
            System.out.println("data     : " + notification.getUserData());
            System.out.println("source   : " + notification.getSource());
            System.out.println("tstamp   : " + notification.getTimeStamp());
            
            System.out.println("handback : " + handback);
            System.out.println();
        }
    }
   
    @Override
    public void run() {
        try {
            initialize();
            long prevUsed = heapOldPool.getUsage().getUsed();
            long sequenceNumber = 0;
            
            while (running) {
                Thread.sleep(INTERVAL);

                MemoryUsage usage = heapOldPool.getUsage();
                long used = usage.getUsed();
                increaseSpeed = (used - prevUsed) * 1000d / INTERVAL;
                String speedStr = getHeapOldIncreaseSpeed(); 

                String message = 
                    "heap old region growth speed : " + getHeapOldIncreaseSpeed() + " bytes/sec.";
                String source = NOTIFICATION_SOURCE;
                String type = NOTIFICATION_TYPE;
                long timestamp = System.currentTimeMillis();
                Notification notification = 
                    new Notification(type, source, sequenceNumber++, timestamp, message); /** 3 **/
                notification.setUserData(increaseSpeed);
                
                broadcaster.sendNotification(notification); /** 4 **/
                
                prevUsed = used;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            running = false;
            thread = null;
        }
    }

    @Override
    public void removeNotificationListener(NotificationListener listener,
            NotificationFilter filter, Object handback)
            throws ListenerNotFoundException {
        if (broadcaster == null) {
            throw new NullPointerException("no broadcaster found in this thread.");
        }
        broadcaster.removeNotificationListener(listener, filter, handback);
    }

    @Override
    public void addNotificationListener(NotificationListener listener,
            NotificationFilter filter, Object handback)
            throws IllegalArgumentException {
        if (broadcaster == null) {
            throw new NullPointerException("no broadcaster found in this thread.");
        }
        broadcaster.addNotificationListener(listener, filter, handback);
    }

    @Override
    public MBeanNotificationInfo[] getNotificationInfo() {
        MBeanNotificationInfo info =
            new MBeanNotificationInfo(new String[]{NOTIFICATION_TYPE},
                    "javax.management.Notification", "CtlerHeapMonitor Notification");
        return new MBeanNotificationInfo[]{info};
    }

    @Override
    public void removeNotificationListener(NotificationListener listener)
            throws ListenerNotFoundException {
        if (broadcaster == null) {
            throw new NullPointerException("no broadcaster found in this thread.");
        }
        broadcaster.removeNotificationListener(listener);
    }

    @Override
    public String getHeapOldIncreaseSpeed() {
        return String.format("%11.3f", increaseSpeed);
    }
}
メイン関数

HeapMonitorImplと一緒に書いたけど抜粋は別に。ヒープが増えていかないと変化がないので、HeapMonitorImplと一緒にヒープを無駄に消費するスレッドを走らせる。

    public static void main(String[] args) {
        // モニタ開始
        HeapMonitorImpl monitor = new HeapMonitorImpl();
        monitor.start();
        
        // ヒープを消費する
        Runnable r = new Runnable() {
            public void run() {
                try {
                    LinkedList<String> list = new LinkedList<String>();
                    for (int i=0; i<10000000; i++) {
                        list.add("123456789abcdefghijklmnopqrstuvwxyz" + i);
                        Thread.sleep(1);
                    }
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        };
        Thread t = new Thread(r, "LoadThread");
        t.start();
        
        try {
            System.in.read(); // 待ち
        } catch (Exception e) {
            e.printStackTrace();
        }
        t.interrupt();
        monitor.stop();
    }
実行結果

SystemOutNotificationListener により、標準出力にログが出る。

…
type     : ihiroky.sample.monitor.heap.interval
sequence : 15
message  : heap old region growth speed :  122813.333 bytes/sec.
data     : 122813.33333333333
source   : ihiroky.sample:type=monitor,name=HeapMonitor
tstamp   : 1238548597631
handback : null
…

また、JConsoleから通知登録してやるとNotificationの内容が確認できる。

送信者 ihiroky

まとめ

自作MBeanに通知をさせるには、

  1. MBeanにNotificationEmitter を実装、もしくはNotificationBroadcasterSupportを継承(後者は試してないけど)
  2. 通知実行にはNotificationBroadcasterSupport.sendNotification()を使う。

MBean作ること自体そんなになさそうだけど作ったときにさくっと外部へ通知できるので覚えておくと便利かな。