SpringBoot对各类框架或是插件的整合还是很友好的,就集成mq而言,相比Spring其省去了大量xml配置。自从有了SpringBoot,腰也不酸了,腿也不疼了,一口气爬上三十楼都不用休息了~

一、前置准备

  1. 官网下载并解压安装包
  2. 启动服务:服务启动方式有很多种,这里不做其它讨论。
    1
    2
    Windows:据系统位数进入bin/win64或者bin/win32目录,双击运行activemq.bat文件。
    Linux: 进入bin目录,执行命令:sh activemq start
  1. 端口占用:启动时如发现端口占用,可打开conf文件夹下的activemq.xml,找到下面这段配置修改掉对应端口。

    1
    2
    3
    4
    5
    6
    7
    8
    <transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
  2. 控制台(默认地址:http://localhost:8161;账号/密码:admin/admin)
    (1)点击下方链接后输入账号密码
    登录控制台
    (2)登录后即可看到相关菜单及相应统计
    控制台界面
    (3)Queues相关参数说明:

Name:队列名称。
Number Of Pending Messages:等待消费的消息个数。
Number Of Consumers:当前连接的消费者数目
Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减,重启mq后会清零。
Messages Dequeued:已经消费的消息数量,重启mq后会清零。

二、快速集成

  1. pom.xml配置:这里需要注意一下springboot版本不同,连接池的依赖也不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- activemq连接池 springboot2.1以上-->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>

<!-- activemq连接池 springboot2.0+及以下 -->
<!--
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
-->
  1. application.yml单机配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    spring:
    application:
    name: springboot-activemq

    # activema - start
    activemq:
    # 地址
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    pool:
    # 开启连接池
    enabled: true
    # 最大连接数
    max-connections: 20
    # 连接闲置超时时间(s)
    idle-timeout: 30
    # activema - end
  2. mq配置类:分别配置点对点模式及发布/订阅模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    /**
    * 消息配置
    * 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
    *
    * @author 丶doufu
    * @date 2019/08/15
    */
    @Configuration
    @EnableJms
    public class JmsConfig {

    /**
    * 点对点
    *
    * @author 丶doufu
    * @date 2019/08/15
    */
    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(false);
    factory.setConnectionFactory(connectionFactory);
    return factory;
    }

    /**
    * 发布/订阅
    *
    * @author 丶doufu
    * @date 2019/08/15
    */
    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    factory.setConnectionFactory(connectionFactory);
    return factory;
    }
    }

经过上面的步骤,mq所需要的环境基本就已经搭建完毕,接着我们就可以编写消息生产者/消费者等代码了。

三、消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 消息生产者服务实现类
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@Service
public class ProducerServiceImpl implements ProducerService {

@Autowired
private JmsTemplate jmsTemplate;

@Override
public void sendMsgToQueue(String destination, final String msg) {
jmsTemplate.convertAndSend(new ActiveMQQueue(destination), msg);
}

@Override
public void sendMsgToTopic(String destination, final String msg) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), msg);
}

}

四、消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.doufuplus.boot.listener;

import com.doufuplus.boot.constant.MsgConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
* 消息监听器
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@Component
public class MsgListener {

// 接收test.queue队列的消息
@JmsListener(destination = MsgConstant.QUEUE_ONE)
public void receiveQueue(String msg) {
System.out.println("[" + MsgConstant.QUEUE_ONE + "]消息:" + msg);
}


// @JmsListener如果不指定独立的containerFactory的话是只能消费queue消息
@JmsListener(destination = MsgConstant.TOPIC_QUEUE, containerFactory = MsgConstant.TOPIC_LISTENER_FACTORY)
public void receive1(String msg) {
System.out.println("[" + MsgConstant.TOPIC_QUEUE + "]消费者:receive1=" + msg);
}


@JmsListener(destination = MsgConstant.TOPIC_QUEUE, containerFactory = MsgConstant.TOPIC_LISTENER_FACTORY)
public void receive2(String msg) {
System.out.println("[" + MsgConstant.TOPIC_QUEUE + "]消费者:receive2=" + msg);
}


@JmsListener(destination = MsgConstant.TOPIC_QUEUE, containerFactory = MsgConstant.TOPIC_LISTENER_FACTORY)
public void receive3(String msg) {
System.out.println("[" + MsgConstant.TOPIC_QUEUE + "]消费者:receive3=" + msg);
}


}

五、启动测试

  1. 编写测试接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* TestController
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@RestController
public class TestController {

@Autowired
private ProducerService producerService;

/**
* 点对点
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@RequestMapping("/queue")
public Result queue(String msg) {
if (StringUtils.isBlank(msg)) {
return new Result(ResultCode.PARAM_ERROR, "msg not null.");
}
producerService.sendMsgToQueue(MsgConstant.QUEUE_ONE, msg);
return new Result(ResultCode.SUCCESS);
}


/**
* 发布/订阅
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@RequestMapping("/topic")
public Result topic(String msg) {
if (StringUtils.isBlank(msg)) {
return new Result(ResultCode.PARAM_ERROR, "msg not null.");
}
producerService.sendMsgToTopic(MsgConstant.TOPIC_QUEUE, msg);
return new Result(ResultCode.SUCCESS);
}

}
  1. 访问效果
    运行结果

六、At Last

参考资料:
第十七章:springboot 整合 activeMQ
springboot集成activemq消息队列

项目源码:GitHub (注意选择分支:mq)