一、dependency
<properties>
<activemq.version>5.15.4</activemq.version>
<xbean-spring.version>4.8</xbean-spring.version>
<spring-jms.version>5.0.7.RELEASE</spring-jms.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean-spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-jms.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
</dependencies>
二、activemq.properties
active.config.brokerURL=failover:(tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)
active.config.username=admin
active.config.password=admin123
active.destination.queue.name=queue.test01
active.destination.topic.name=topic.test01
三、spring-activemq-producer.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath*:activemq.properties" ignore-unresolvable="true" /> <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html --> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--是否在每次尝试重新发送失败后,增长这个等待时间 --> <property name="useExponentialBackOff" value="true"></property> <!--重发次数,默认为6次 这里设置为1次 --> <property name="maximumRedeliveries" value="1"></property> <!--重发时间间隔,默认为1秒 --> <property name="initialRedeliveryDelay" value="1000"></property> <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value --> <property name="backOffMultiplier" value="2"></property> <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第 二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 --> <property name="maximumRedeliveryDelay" value="1000"></property> </bean> <!-- activemq连接工厂 --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${active.config.brokerURL}</value> </property> <property name="userName"> <value>${active.config.username}</value> </property> <property name="password"> <value>${active.config.password}</value> </property> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/> </bean> <!-- 连接池 --> <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <constructor-arg name="activeMQConnectionFactory" ref="jmsFactory" /> </bean> <!-- 消费发送和接收模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg name="connectionFactory" ref="pooledJmsFactory" /> </bean> <!-- destination:queue和topic --> <bean id="queueDest" class="org.apache.activemq.command.ActiveMQQueue" autowire="constructor"> <constructor-arg value="${active.destination.queue.name}" /> </bean> <bean id="topicDest" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg value="${active.destination.topic.name}" /> </bean> <!--业务自定义 queue--> <bean id="queueProducer" class="com.demo.activemq.QueueActivemqProducer"> <constructor-arg name="jmsTemplate" ref="jmsTemplate" /> <constructor-arg name="destination" ref="queueDest" /> </bean> <!--业务自定义 topic--> <bean id="topicProducer" class="com.demo.activemq.TopicActivemqProducer"> <constructor-arg name="jmsTemplate" ref="jmsTemplate" /> <constructor-arg name="destination" ref="topicDest" /> </bean></beans>
四、spring-activemq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath*:activemq.properties" ignore-unresolvable="true" /> <!-- activemq连接工厂 --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${active.config.brokerURL}</value> </property> <property name="userName"> <value>${active.config.username}</value> </property> <property name="password"> <value>${active.config.password}</value> </property> </bean> <!-- 连接池 --> <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <constructor-arg name="activeMQConnectionFactory" ref="jmsFactory" /> </bean> <!-- 消费发送和接收模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg name="connectionFactory" ref="pooledJmsFactory" /> </bean> <!-- destination:queue和topic --> <bean id="queueDest" class="org.apache.activemq.command.ActiveMQQueue" autowire="constructor"> <constructor-arg value="${active.destination.queue.name}" /> </bean> <bean id="topicDest" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg value="${active.destination.topic.name}" /> </bean> <!-- 业务处理器 --> <bean id="QueueHandler" class="com.demo.activemq.QueueHandler" /> <!-- 业务实现的监听器 --> <bean id="msgListener" class="com.demo.activemq.CustomerMsgListener"> <constructor-arg name="businessHandler" ref="QueueHandler"/> </bean> <!-- 消费者整合监听器 --> <bean id="queueConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory" /> <property name="destination" ref="queueDest" /> <property name="messageListener" ref="msgListener" /> </bean> <!-- 消费者整合监听器 --> <bean id="topicConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory" /> <property name="destination" ref="topicDest" /> <property name="messageListener" ref="msgListener" /> </bean></beans>
五、相关业务实现类
producer相关类:
import org.springframework.jms.core.JmsTemplate;import javax.jms.Destination;public abstract class AbstractActivemqProducer { private JmsTemplate jmsTemplate; private Destination destination; public AbstractActivemqProducer(JmsTemplate jmsTemplate, Destination destination) { this.jmsTemplate = jmsTemplate; this.destination = destination; } public void send(String msg){ jmsTemplate.convertAndSend(destination, msg); }}public class QueueActivemqProducer extends AbstractActivemqProducer { public QueueActivemqProducer(JmsTemplate jmsTemplate, Destination destination) { super(jmsTemplate, destination); }}public class TopicActivemqProducer extends AbstractActivemqProducer { public TopicActivemqProducer(JmsTemplate jmsTemplate, Destination destination) { super(jmsTemplate, destination); }}
customer相关类:
public class CustomerMsgListener implements MessageListener {
private BusinessHandler businessHandler;
public CustomerMsgListener(BusinessHandler businessHandler) {
this.businessHandler = businessHandler;
}
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
businessHandler.handle(((TextMessage) message).getText() );
}
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
businessHandler.handle(mapMessage.getString("key01") );
businessHandler.handle(mapMessage.getString("key02") );
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public interface BusinessHandler {
void handle(String msg);
}
public class QueueHandler implements BusinessHandler {
@Override
public void handle(String msg) {
System.out.println("msg = [" + msg + "]");
}
}
六、测试
public class XmlActivemqTest {
public static void main(String[] args) {
customerXml();
}
public static void producerXml(){
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:spring-activemq-producer.xml");
AbstractActivemqProducer queueActivemqProducer = context.getBean(QueueActivemqProducer.class);
queueActivemqProducer.send("this is a test");
}
public static void customerXml(){
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:spring-activemq-customer.xml");
}
}
参考地址:
http://activemq.apache.org/spring-support.html
http://docs.spring.io/spring/docs/2.5.x/reference/jms.html#jms-mdp
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。