Skip to content

Spring 整合 ActiveMQ

之前介绍的内容也很重要,他更灵活,他支持各种自定义功能,可以满足我们工作中复杂的需求。很多 ActiveMQ 的功能,我们要看官方文档或者博客,这些功能大多是在上面代码的基础上修改完善的。如果非要把这些功能强行整合到 Spring,就有些缘木求鱼了。我认为另一种方式整合 Spring 更好,就是将上面的类注入到 Spring 中,其他不变。这样既能保持原生的代码,又能集成到 Spring。

下面的 Spring 和 SpringBoot 整合 ActiveMQ 也重要,他给我们提供了一个模板,简化了代码,减少我们工作中遇到坑,能够满足开发中 90% 以上的功能。

添加依赖

xml
<dependencies>
    <!--  activemq  所需要的jar 包-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.3</version>
    </dependency>
    <!--  activemq 和 spring 整合的基础包 -->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.1</version>
    </dependency>
    <!-- activemq连接池 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.10</version>
    </dependency>
    <!-- spring支持jms的包 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.3.10</version>
    </dependency>
    <!-- Spring核心依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.3.10</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.10</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>5.3.10</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-orm</artifactId>
        <version>5.3.9</version>
    </dependency>
</dependencies>

ActiveMQ 配置文件

applicationContext.xml:其中 20-24 行代码是队列,26-29 行代码是主题,目前是队列,如果需要主题,去 36 行代码修改为主题的 beanid 即可。

xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">

    <!--  开启包的自动扫描  -->
    <context:component-scan base-package="com.eight.spring"/>
    <!--  配置生产者  -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <!--  正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供      -->
            <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.199.27:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>

    <!--  这个是队列目的地,点对点的Queue  -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--    通过构造注入Queue名    -->
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>

    <!--  这个是队列目的地,  发布订阅的主题Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>

    <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--    传入连接工厂    -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--    传入目的地    -->
        <property name="defaultDestination" ref="destinationQueue"/>
        <!--    消息自动转换器    -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

</beans>

队列的生产者

java
public class SpringProduce {
    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        SpringProduce produce = context.getBean(SpringProduce.class);
        produce.jmsTemplate.send(session -> {
            TextMessage message = session.createTextMessage("spring和ActiveMQ整合");
            return message;
        });
    }
}

队列的消费者

java
public class SpringConsumer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        SpringConsumer consumer = context.getBean(SpringConsumer.class);
        String message = (String) consumer.jmsTemplate.receiveAndConvert();
        System.out.println("收到的消息:" + message);
    }
}

消费者的监听类

java
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if(message instanceof TextMessage) { 
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

image-20220730153727202

Spring Boot 整合 ActiveMQ

Queue 生产者

引入依赖

xml
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot整合activemq的jar包-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>2.5.5</version>
    </dependency>
</dependencies>

application.yml 内容

yml
server:
  port: 8888
spring:
  activemq:
    broker-url: tcp://192.168.199.27:61616  # url
    user: admin
    password: admin
  jms:
    pub-sub-domain: false   # 目的地是queue还是topic, false(默认) = queue    true =  topic
    
myqueue: boot-activemq-queue # 自定义消息队列名字

配置目的地的 bean,也可以把下面的方法放到启动类里

java
@Configuration
@EnableJms  // 开启适配 JMS
public class ConfigBean {

    @Value("${myqueue}")
    private String myQueue;
    
    @Bean
    public ActiveMQQueue queue(){
        return new ActiveMQQueue(myQueue);
    }
}

队列生产者代码,代码功能:每隔三秒发送一次消息

java
@Component
public class QueueProducer {
    
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Value("${myqueue}")
    private String myQueue;

    public void productMessage(){
        jmsMessagingTemplate.convertAndSend(myQueue, UUID.randomUUID().toString().substring(0,6));
    }
    
    // 定时发送,3 秒发送一次
    @Scheduled(fixedDelay = 3000)
    public void fixedDelay() {
        productMessage();
    }
    
}

主启动类,记得开启 @EnableScheduling 注解,否则上方的 @Scheduled(fixedDelay = 3000) 没有用

java
@SpringBootApplication
@EnableScheduling // 开启任务定时调度
public class ProducerMain7777 {

    public static void main(String[] args) {
        SpringApplication.run(ProducerMain7777.class, args);
    }
}

单元测试(非必须)

java
@SpringBootTest
public class QueueTest {
    @Autowired
    private QueueProducer queueProducer;
    @Test
    public void test(){
        queueProducer.productMessage();
    }
    @Test
    public void test2(){
        queueProducer.fixedDelay();
    }
}

queue 消费者

pom.xml 和 application.yml 文件和上面 Queue 生产者 的一致。

新增消费者消费信息代码,监听消息队列,指定队列名字(配置文件自定义),一旦队列名里有人发布消息,就能即刻获取

java
@Component
public class QueueConsumer {
    // 注册一个监听器。Destination 指定监听的主题
    @JmsListener(destination = "${myqueue}")
    public void receive(TextMessage message) throws JMSException {
        System.out.println("收到的消息:" + message.getText());
    }
}

Topic 生产者

Queue 生产者 代码基本一样

application.yml (具体看第 9 行代码)

yml
server:
  port: 7777
spring:
  activemq:
    broker-url: tcp://192.168.199.27:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true   # 目的地是 Queue 还是 Topic,false(默认)指的是 queue,true 指的是 topic
    
mytopic: boot-activemq-topic

ConfigBean 的 ActiveMQQueue 变为 ActiveMQTopic

java
@Configuration
@EnableJms  // 适配 JMS
public class ConfigBean {

    @Value("${mytopic}")
    private String myTopic;

    @Bean
    public ActiveMQTopic topic(){
        return new ActiveMQTopic(myTopic);
    }
}

队列生产者代码

java
@Component
public class QueueProducer {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Value("${mytopic}")
    private String myTopic;
    public void productMessage(){
        jmsMessagingTemplate.convertAndSend(myTopic, UUID.randomUUID().toString().substring(0,6));
    }
    // 定时发送,3秒发送一次
    @Scheduled(fixedDelay = 3000)
    public void fixedDelay() {
        productMessage();
    }
}

Topic 消费者

pom.xml 和 application.yml 与 Queue 消费者 一样。

消费者消费信息代码

java
@Component
public class Topic_Consummer {
    @JmsListener(destination = "${mytopic}")
    public void receive(TextMessage textMessage) throws  Exception{
        System.out.println("消费者受到订阅的主题:"+textMessage.getText());
    }
}