上文SpringBoot整合Activemq(一)快速集成中我们已经把mq快速集成部署完毕,事实上在实际生产中为了防止数据丢失,我们还需要根据业务需要来选择是否要把队列数据持久化到数据库。本文以mysql为例,记录下基本持久化步骤。

一、环境配置

  1. 首先同样的,正事之前先把依赖配好,我们需要往lib文件夹下放入两个jar包:
    mysql-connector-java
    commons-dbcp2

  2. 打开conf/activemq.xml配置文件,找到<persistenceAdapter>标签,注释掉默认的kahaDB配置,加入mysql的配置如下:

    1
    2
    3
    <persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-mq" createTablesOnStartup="true" />
    </persistenceAdapter>
  3. 接着在<broker>标签外添加mysql具体配置:这里注意一下数据库连接中的参数&需要使用转义字符:&amp;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <bean id="mysql-mq" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?characterEncoding=utf-8&amp;serverTimezone=Asia/Shanghai" />
    <property name="username" value="root" />
    <property name="password" value="root" />
    <property name="initialSize" value="5" />
    <property name="maxTotal" value="100" />
    <property name="maxIdle" value="30" />
    <property name="maxWaitMillis" value="10000" />
    <property name="minIdle" value="1" />
    </bean>

启动activemq后如看到数据库生成这三张表即表示配置成功:activemq_acks、activemq_lock、activemq_msgs

二、代码配置

代码的改动并不大,setSubscriptionDurable(true);开启持久化,同时如果为topic模式再指定clientId(可根据业务命名)即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 消息配置
* 转载请注明出处,更多技术文章欢迎大家访问我的个人博客站点:https://www.doufuplus.com
*
* @author 丶doufu
* @date 2019/08/15
*/
@Configuration
@EnableJms
public class JmsConfig {

/**
* 点对点
*
* @author 丶doufu
* @date 2019/08/15
*/
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
// 开启持久化
factory.setSubscriptionDurable(true);
return factory;
}

/**
* 发布/订阅
*
* @author 丶doufu
* @date 2019/08/15
*/
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
// 开启持久化
factory.setSubscriptionDurable(true);
// 设置clientId
factory.setClientId(MsgConstant.TOPIC_QUEUE);
return factory;
}
}

这时停掉消息监听再去发送消息,就可以看到activemq_msgs表有数据了。接着再启动消息监听你就会看到消息又被拿出来正常消费掉啦。这里我就不放图了,自己去动手试试看吧。

项目源码:GitHub (分支:mq)