SpringでJMS

コネクション、例外ハンドリングがまったく出てこないのが特徴的。

メッセージ送る側。

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private String destination;
    
    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }
    
    public void setDestination(String destination) {
        this.destination = destination;
    }
    
    public void simpleSend(final String msg) {
        jmsTemplate.send(destination, new MessageCreator() {
            @Override public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello world " + msg);
            }
        });
    }
    
    public void sendWithConversation(String hoge, int i) {
        Map<String,Object> map = new HashMap<String,Object>();
        map.put("hoge", hoge);
        map.put("int", new Integer(i));
        jmsTemplate.convertAndSend(destination, map, new MessagePostProcessor() {
            @Override public Message postProcessMessage(Message message) throws JMSException {
                message.setIntProperty("id", 111);
                message.setJMSCorrelationID("123-4567");
                return message;
            }
        });
    }
}

メッセージを受け取る側。とりあえずテキストとMapだけ。
MessageListenerAdaptorが扱うデフォルトのハンドラ名はhandleMessageで、変更も可能。

public class DefaultMessageDelegate implements MessageResponse {

    private Logger logger = Logger.getLogger(DefaultMessageDelegate.class);
    
    private Queue<String> textQueue;
    private Queue<Map<String,Object>> mapQueue;
    
    public DefaultMessageDelegate() {
        textQueue = new LinkedList<String>();
        mapQueue = new LinkedList<Map<String,Object>>();
    }
    
    // とりあえず
    public String getText() { // MessageResponse 
        return textQueue.poll();
    }

    // とりあえず    
    public Map<String,Object> getMap() { // MessageResponse 
        return mapQueue.poll();
    }
    
    // return しても可(JMSヘッダのReply-toにreturn)。引数がMessageでも可
    public void handleMessage(String message) {
        if (logger.isDebugEnabled()) {
            logger.debug("[handleMessage(String)] " + message);
        }
        textQueue.add(message);
    }
    
    public void handleMessage(Map<String,Object> message) {
        if (logger.isDebugEnabled()) {
            logger.debug("[handleMessage(Map)] " + message);
        }
        mapQueue.add(message);
    }
}

bean定義。

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg>
            <value>tcp://localhost:61616</value>
        </constructor-arg>
    </bean>
    
    <bean id="sender" class="examples.jms.JmsQueueSender" >
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" value="TestQueue" />
    </bean>

    <bean id="messageListener" class="examples.jms.ExampleListener" />
    
    <bean id="delegate" class="examples.jms.DefaultMessageDelegate" />
    
    <bean id="listenerAdaptor" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="delegate" />
    </bean>
    
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="TestQueue" />
        <!-- 
        <property name="messageListener" ref="messageListener" />
         -->
         <property name="messageListener" ref="listenerAdaptor" />
         <property name="sessionTransacted" value="true"/>
    </bean>

裏でActiveMQを動かして、テスト。

    private ApplicationContext context;
    
    @Before public void before() {
        context = new ClassPathXmlApplicationContext("jms.xml");
    }
    
    @Test public void testJmsQueueSender() throws Exception {
        JmsQueueSender sender = (JmsQueueSender)context.getBean("sender");
        // MessageResponse response = (MessageResponse)context.getBean("messageListener");
        MessageResponse response = (MessageResponse)context.getBean("delegate");
        
        sender.simpleSend("from spring"); 
        String message = null;
        while ((message = response.getText()) == null) { Thread.sleep(500); }
        assertEquals("hello world from spring", message);
        
        sender.sendWithConversation("hoge value", Integer.MAX_VALUE);
        Map<String,Object> map = null;
        while ((map = response.getMap()) == null) { Thread.sleep(500); }
        assertEquals("hoge value", map.get("hoge"));
        assertEquals(Integer.MAX_VALUE, map.get("int"));        
     }