# activemq配置

# 一、基本配置

# 1. pom文件添加如下配置

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>

# 2. yml 文件添加如下内容

spring:
    activemq:
        broker-url: failover:(tcp://mqIP:端口号,tcp://mqIP:端口号)?randomize=false
        in-memory: true
        pool:
          enabled: false
        password: 账号
        user: 密码

# 二、 详细配置

package com.soeasycenter.api.config;
/**
 * @author pth
 * @time 2019/1/21
 * @description
 */
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Queue;
import java.util.ArrayList;

/**
 *@ClassName ActiveMqConfig
 *@Description
 *@Author Pth
 *@Date 2019/1/2114:09
 */
@EnableJms
@Configuration
public class ActiveMqConfig {

    @Bean
    public Queue queue(){
        return new ActiveMQQueue("default");
    }

    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次   这里设置为10次
        redeliveryPolicy.setMaximumRedeliveries(3);
        //重发时间间隔,默认为1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1);
        //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(
                        "账号",
                        "密码",
                        url);
        ArrayList<String> list = new ArrayList<>();
        list.add("com.soeasycenter.api.dto");  //扫描可进行传输的包下的实体类
        list.add("com.soeasycenter.api.entity");
        list.add("java");
        activeMQConnectionFactory.setTrustedPackages(list);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue){
        JmsTemplate jmsTemplate=new JmsTemplate();
        jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列
        jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
        return jmsTemplate;
    }

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置连接数
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }


}

# 三、 调用

# 案例


package com.soeasycenter.mq.base.customer;/**
 * @author pth
 * @time 2018/12/28
 * @description
 */

import com.soeasycenter.api.constant.MqQueConstant;
import com.soeasycenter.api.dto.microservice.BatchDataExchangeDto;
import com.soeasycenter.api.dto.microservice.DataExchangeDto;
import com.soeasycenter.api.dto.solr.base.SolrOrderDetail;
import com.soeasycenter.api.service.microservice.Microservice;
import com.soeasycenter.api.utils.solr.SolrBeanUtil;
import com.soeasycenter.mq.base.service.BaseService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;

/**
 * @ClassName customerService
 * @Description
 * @Author Pth
 * @Date 2018/12/2815:24
 */
@Service
public class BaseCustomer {
    @Autowired
    Microservice microservice;

    @Autowired
    private BaseService baseService;


    @JmsListener(destination = MqQueConstant.EXCHANGE_DATA,containerFactory = "jmsQueueListener")
    public void dataExchange(ObjectMessage objectMessage, Session session) throws JMSException {
        try {
            Serializable object = objectMessage.getObject();
            System.out.println("监听EXCHANGE_DATA:" + object);
            DataExchangeDto dataExchangeDto = new DataExchangeDto();
            BeanUtils.copyProperties(object,dataExchangeDto);
            System.out.println(dataExchangeDto.getCoUser());
            microservice.recieveData(dataExchangeDto);
            objectMessage.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println(e.getMessage());
            session.recover();
        }
    }
    @JmsListener(destination = MqQueConstant.BATCH_EXCHANGE_DATA,containerFactory = "jmsQueueListener")
    public void batchDataExchange(ObjectMessage objectMessage, Session session) throws JMSException {
        try {
            Serializable object = objectMessage.getObject();
            BatchDataExchangeDto batchDataExchangeDto = new BatchDataExchangeDto();
            BeanUtils.copyProperties(object,batchDataExchangeDto);
            microservice.batchRecieveData(batchDataExchangeDto);
            objectMessage.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
            session.recover();
        }

    }

    @JmsListener(destination = MqQueConstant.UPDATE_DETAIL)
    public void updateDetail(ObjectMessage objectMessage, Session session) throws JMSException {

            Serializable object = objectMessage.getObject();
            System.out.println("监听UPDATE_DETAIL:" + object);
            SolrOrderDetail solrDetail = new SolrOrderDetail();
            SolrBeanUtil.typeConversion(object,solrDetail);
        try {
            baseService.updateDetail(solrDetail);
            objectMessage.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
            session.recover();
        }
    }


}

TIP

@JmsListener(destination = MqQueConstant.BATCH_EXCHANGE_DATA,containerFactory = "jmsQueueListener")

destination填队列名

containerFactory填第二步中注册进spring容器中的消息监听器连接工厂类的别名

Last Updated: 2/12/2019, 10:37:06 AM