# activemq配置
# 一、基本配置
# 1. pom文件添加如下配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
# 2. yml 文件添加如下内容
spring:
activemq:
broker-url: failover:(tcp://mqIP:端口号,tcp://mqIP:端口号)?randomize=false
in-memory: true
pool:
enabled: false
password: 账号
user: 密码
# 二、 详细配置
package com.soeasycenter.api.config;
/**
* @author pth
* @time 2019/1/21
* @description
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Queue;
import java.util.ArrayList;
/**
*@ClassName ActiveMqConfig
*@Description
*@Author Pth
*@Date 2019/1/2114:09
*/
@EnableJms
@Configuration
public class ActiveMqConfig {
@Bean
public Queue queue(){
return new ActiveMQQueue("default");
}
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(3);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"账号",
"密码",
url);
ArrayList<String> list = new ArrayList<>();
list.add("com.soeasycenter.api.dto"); //扫描可进行传输的包下的实体类
list.add("com.soeasycenter.api.entity");
list.add("java");
activeMQConnectionFactory.setTrustedPackages(list);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue){
JmsTemplate jmsTemplate=new JmsTemplate();
jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列
jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
return jmsTemplate;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置连接数
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
# 三、 调用
# 案例
package com.soeasycenter.mq.base.customer;/**
* @author pth
* @time 2018/12/28
* @description
*/
import com.soeasycenter.api.constant.MqQueConstant;
import com.soeasycenter.api.dto.microservice.BatchDataExchangeDto;
import com.soeasycenter.api.dto.microservice.DataExchangeDto;
import com.soeasycenter.api.dto.solr.base.SolrOrderDetail;
import com.soeasycenter.api.service.microservice.Microservice;
import com.soeasycenter.api.utils.solr.SolrBeanUtil;
import com.soeasycenter.mq.base.service.BaseService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.*;
import java.io.Serializable;
/**
* @ClassName customerService
* @Description
* @Author Pth
* @Date 2018/12/2815:24
*/
@Service
public class BaseCustomer {
@Autowired
Microservice microservice;
@Autowired
private BaseService baseService;
@JmsListener(destination = MqQueConstant.EXCHANGE_DATA,containerFactory = "jmsQueueListener")
public void dataExchange(ObjectMessage objectMessage, Session session) throws JMSException {
try {
Serializable object = objectMessage.getObject();
System.out.println("监听EXCHANGE_DATA:" + object);
DataExchangeDto dataExchangeDto = new DataExchangeDto();
BeanUtils.copyProperties(object,dataExchangeDto);
System.out.println(dataExchangeDto.getCoUser());
microservice.recieveData(dataExchangeDto);
objectMessage.acknowledge();
}catch (Exception e){
e.printStackTrace();
System.out.println(e.getMessage());
session.recover();
}
}
@JmsListener(destination = MqQueConstant.BATCH_EXCHANGE_DATA,containerFactory = "jmsQueueListener")
public void batchDataExchange(ObjectMessage objectMessage, Session session) throws JMSException {
try {
Serializable object = objectMessage.getObject();
BatchDataExchangeDto batchDataExchangeDto = new BatchDataExchangeDto();
BeanUtils.copyProperties(object,batchDataExchangeDto);
microservice.batchRecieveData(batchDataExchangeDto);
objectMessage.acknowledge();
}catch (Exception e){
e.printStackTrace();
session.recover();
}
}
@JmsListener(destination = MqQueConstant.UPDATE_DETAIL)
public void updateDetail(ObjectMessage objectMessage, Session session) throws JMSException {
Serializable object = objectMessage.getObject();
System.out.println("监听UPDATE_DETAIL:" + object);
SolrOrderDetail solrDetail = new SolrOrderDetail();
SolrBeanUtil.typeConversion(object,solrDetail);
try {
baseService.updateDetail(solrDetail);
objectMessage.acknowledge();
}catch (Exception e){
e.printStackTrace();
session.recover();
}
}
}
TIP
@JmsListener(destination = MqQueConstant.BATCH_EXCHANGE_DATA,containerFactory = "jmsQueueListener")
destination填队列名
containerFactory填第二步中注册进spring容器中的消息监听器连接工厂类的别名