1. activemq是什么
ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS1.1规范(Java Message Service),是消息驱动中间件软件(MOM)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。
2. 是什么MOM
- 中间件的分类
- 基于远程过程调用 (Remote Procedure Call, RPC)的中间件。
- 基于对象请求代理 (Object Request Broker, ORB) 的中间件。
- 面向消息的中间件或基于 MOM 的中间件。
2. 什么是JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
-
JMS 消息传送模式
JMS消息传送模式
- 客户端 A、C 和 D之间的消息传送说明了点对点模式(P2P)。客户端使用此模式向队列目的地发送一条消息,只有一个接收者能够从该目的地获得该消息。访问该目的地的其他任何接收者都不能获得该消息。
- 客户端 B、E 和 F之间的消息传送说明了发布/订阅模式(publish-subscribe)。客户端使用此广播模式向主题目的地发送一条消息,任意数量的使用方订户都可以从该目的地检索此消息。每个订户都获得此消息的一个副本。
3. JMS实现--ActiveMQ
ActiveMQ的消息传递模式
- P2P (点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。
- Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
代码案例
//创建session会话
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//创建一个消息队列 session.createQueue("jms.test.topic")--P2P模式
Destination destination = session.createTopic("jms.test.topic");
//创建消息生产者
MessageProducer producer = session.createProducer(destination);
//消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < messageNum; i++) {
producer.send(session.createTextMessage("Message Producer:" + i));
}
//提交会话
session.commit();
//创建session会话
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//创建一个消息队列 session.createQueue("jms.test.topic")--P2P模式
Destination destination = session.createTopic("jms.test.topic");
//创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (message != null){
System.out.println("Message Consumer:"+message.getText());
}else {
break;
}
}
session.commit();
4. springboot集成activemq
5. activemq消息持久化
消息持久化是保证消息不丢失的重要方式。ActiveMQ提供了以下三种的消息存储方式:
- 基于KahaDB的消息存储方式,这种方式是现在的默认存储方式。它提供了容量的提升和恢复能力。
- 基于JDBC的消息存储方式-数据存储于数据库中。
- Memory 消息存储-基于内存的消息存储。
- KahaDB
KahaDB是目前默认的存储方式,这种方式的消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。KahaDB存储配置的配置在conf/activemq.xml,如下:
<broker brokerName="broker" ... >
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="100mb"/>
</persistenceAdapter>
...
</broker>
日志存储路径为:\data\kahadb
存储目录
从上图可以看出,该目录下共有四个文件:
(1) db.data
它是消息的索引文件。本质上是B-Tree的实现,使用B-Tree作为索引指向db-.log里面存储的消息。
(2)db.redo
该文件主要用来进行消息恢复。
(3)db-.log
该文件用来存储消息的内容。对于一个消息而言,不仅仅有消息本身的数据(message data),而且还有(Destinations、订阅关系、事务...等方面的信息)data log以日志形式存储消息,而且新的数据总是以APPEND的方式追加到日志文件末尾。因此,消息的存储是很快的。比如,对于持久化消息,Producer把消息发送给Broker,Broker先把消息存储到磁盘中(enableJournalDiskSyncs配置选项),然后再向Producer返回Acknowledge。Append方式在一定程度上减少了Broker向Producer返回Acknowledge的时间。
(4) lock
主要是用来存放一些锁的信息。
另外,一些KahaDB的配置选项如下:
1)indexWriteBatchSize: 默认值1000,当Metadata Cache中更新的索引到达了1000时,才同步到磁盘上的Metadata Store中。不是每次更新都写磁盘,而是批量更新写磁盘。
2)indexCacheSize: 默认值10000,(number of index pages cached in memory),在内存中最多分配多个页面来缓存index。缓存的index越多,命中的概率就越大,检索的效率就越高。
3)journalMaxFileLength: 默认值32MB,当存储的消息达到32MB时,新建一个新文件来保存消息。
4)enableJournalDiskSyncs: 默认值true,默认采用同步写磁盘,即消息先存储到磁盘中再向Producer返回ACK。
5)cleanupInterval: 默认值30000ms,当消息被消息者成功消费之后,Broker就可以将消息删除了。
6)checkpointInterval: 默认值5s,每隔5s将内存中的Index(Metadata Cache)更新到磁盘的Index文件中(Metadata Store)。
7)director: KahaDB存放的路径,默认值activemq-data。
- JDBC消息存储
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<!--配置数据库连接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=UTF-8" />
<property name="username" value="esb" />
<property name="password" value="123456"/>
</bean>
注意:此处需要上传数据库驱动包到/activemq/lib下。
-
重启ActiveMQ后,在数据库中就可以看到有如下几张表:
表结构
从图中可以看出,数据库中有activemq_acks、activemq_lock、activemq_msgs三张表。
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录最后消费过的消息的ID
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker。
Memory消息存储
Memory消息存储主要是存储所有的持久化的消息在内存中。使用这种方式必须注意设置你的broker所在的JVM和内存限制。
配置方式如下:
<broker ... persistent="false" ...></broker>
因是基于内存的,也就是说当服务器重启后,存储在内存中的所有消息都会丢失。













网友评论