ActiveMQ与spring集成实例之运用音讯监听器ITeye - 千亿集团

ActiveMQ与spring集成实例之运用音讯监听器ITeye

2019-01-11 19:14:09 | 作者: 巧蕊 | 标签: 音讯,监听器,运用 | 浏览: 2094

        在EJB国际里,JMS音讯最常用的功用之一是用于完成音讯驱动Bean(MDB)。Spring供给了一个办法来创立音讯驱动的POJO(MDP),而且不会把用户绑定在某个EJB容器上。

        通常用音讯监听器容器从JMS音讯行列接纳音讯并驱动被打针进来的MDP。音讯监听器容器担任音讯接纳的多线程处理并分发到各MDP中。一个音讯侦听容器是MDP和音讯供给者之间的一个中介,用来处理音讯接纳的注册,业务办理的参加,资源获取和开释,反常转化等等。这使得运用开发人员能够专心于开发和接纳音讯(或许的呼应)相关的(杂乱)业务逻辑,把和JMS根底结构有关的样板化的部分托付给结构处理。 

        Spring供给了三种 AbstractMessageListenerContainer 的子类,每种各有其特色。

1.SimpleMessageListenerContainer

        这个音讯侦听容器是三种中最简略的。它在启动时创立固定数量的JMS session并在容器的整个生命周期中运用它们。这个类不能动态的习惯运转时的要求或参加音讯接纳的业务处理。可是它对JMS供给者的要求也最低。它只需要简略的JMS API。

2.DefaultMessageListenerContainer

        这个音讯侦听器运用的最多。和 SimpleMessageListenerContainer 相反,这个子类能够动态习惯运转时侯的要求,也能够参加业务办理。每个收到的音讯都注册到一个XA业务中(假如运用 JtaTransactionManager 装备过),这样就能够使用XA业务语义的优势了。这个类在对JMS供给者的低要求和供给包含业务参于等的强壮功用上取得了很好的平衡。

3.ServerSessionMessageListenerContainer

        这个监听器容器使用JMS ServerSessionPool SPI动态办理JMS Session。 运用者各种音讯监听器能够获得运转时动态调优功用,可是这也要求JMS供给者支撑ServerSessionPool SPI。假如不需要运转时功能调整,请运用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。 

一.自定义音讯监听器代码

package com.bijian.activemq.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import com.bijian.activemq.entity.User;
 * 自定义音讯侦听器
public class MyMessageListener implements MessageListener {
 * @param arg0
 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
 @Override
 public void onMessage(Message message) {
 System.out.println(message.toString());
 System.out.println("MyMessageListener");
 if(message instanceof ObjectMessage){
 ObjectMessage objectMessage=(ObjectMessage) message;
 try {
 User user=(User) objectMessage.getObject();
 System.out.println(user.getUserName());
 } catch (JMSException e) {
 e.printStackTrace();
}

 

二.自定义音讯转化器代码

package com.bijian.activemq.convert;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import com.bijian.activemq.entity.User;
 * 音讯转化器
public class MyConvert implements MessageConverter {
 * @param arg0
 * @return
 * @throws JMSException
 * @throws MessageConversionException
 * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
 @Override
 public Object fromMessage(Message message) throws JMSException, MessageConversionException {
 User user=null;
 if(message instanceof ActiveMQObjectMessage){
 ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message; 
 user=(User) aMsg.getObject();
 return user;
 * @param arg0
 * @param arg1
 * @return
 * @throws JMSException
 * @throws MessageConversionException
 * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session)
 @Override
 public Message toMessage(Object object, Session session) throws JMSException,
 MessageConversionException {
 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
 msg.setObject((Serializable) object);
 return msg; 
}

 

三.音讯发送代码

package com.bijian.activemq;
import javax.jms.Destination;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import com.bijian.activemq.entity.User;
 * 发送者
public class Sender {
 public static void main(String[] args) {
 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
 JmsTemplate template = (JmsTemplate) applicationContext.getBean("jmsTemplate");
 //Destination destination = (Destination) applicationContext.getBean("destination");
 User user=new User();
 user.setUserName("张三");
 template.convertAndSend(user);
 System.out.println("成功发送了一条JMS音讯");
}

 

四.实体类user,有必要完成序列化代码

package com.bijian.activemq.entity;
import java.io.Serializable;
 * User类
public class User implements Serializable{
 private String userName;
 * @return the userName
 public String getUserName() {
 return userName;
 * @param userName the userName to set
 public void setUserName(String userName) {
 this.userName = userName;
}

 

五.装备文件

 ?xml version="1.0" encoding="UTF-8"? 
 beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
 http://www.springframework.org/schema/context 
 http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 default-autowire="byName" 
 !-- 装备connectionFactory -- 
 bean id="jmsFactory" destroy-method="stop" 
 property name="connectionFactory" 
 bean 
 property name="brokerURL" 
 value tcp://127.0.0.1:61616 /value 
 /property 
 /bean 
 /property 
 property name="maxConnections" value="100" /property 
 /bean 
 !-- Spring JMS Template -- 
 bean id="jmsTemplate" 
 property name="connectionFactory" 
 ref local="jmsFactory" / 
 /property 
 property name="defaultDestinationName" value="subject" / 
 !-- 差异它选用的形式为false是p2p,为true是订阅 -- 
 property name="pubSubDomain" value="true" / 
 property name="messageConverter" ref="myConvert" /property 
 /bean 
 !-- 发送音讯的目的地(一个行列) -- 
 bean id="destination" 
 !-- 设置音讯行列的姓名 -- 
 constructor-arg index="0" value="subject" / 
 /bean 
 !-- 音讯监听 -- 
 bean id="listenerContainer" 
 !-- 音讯监听器输出音讯的数量 -- 
 property name="concurrentConsumers" value="1" / 
 property name="connectionFactory" ref="jmsFactory" / 
 property name="destinationName" value="subject" / 
 property name="messageListener" ref="myMessageListener" / 
 property name="pubSubNoLocal" value="false" /property 
 /bean 
 !-- 音讯转化器 -- 
 bean id="myConvert" 
 /bean 
 !-- 音讯侦听器 -- 
 bean id="myMessageListener" /bean 
 !-- 
 bean id="messageReceiver" 
 property name="jmsTemplate" ref="jmsTemplate" /property 
 property name="destination" ref="destination" /property 
 /bean 
 /beans 

        完好工程见附件,如要正常运转,请到ActiveMQ官方网站http://activemq.apache.org/下载ActiveMQ包并start运转ActiveMQ程序,详见ActiveMQ入门实例。

        运转Sender的main办法,成果如下所示:

成功发送了一条JMS音讯
ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:bijian-PC-54726-1468514947217-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:bijian-PC-54726-1468514947217-1:2:1:1, destination = topic://subject, transactionId = null, expiration = 0, timestamp = 1468514947451, arrival = 0, brokerInTime = 1468514947451, brokerOutTime = 1468514947451, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6c7fa1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false}
MyMessageListener
张三

 

文章来历:http://firezhfox.iteye.com/blog/1754310

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表千亿集团立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章