type
status
date
slug
summary
tags
category
icon
password
ActiveMQ安装(Docker环境下)
首先在docker搜索ActiveMQ,发现webcenter/activemq 这个版本使用是最多的
然后选择版本,下载activeMQ
查看镜像,配置好activemq容器的后台端口(61616)和前台端口(8161)的映射关系,并启动它
浏览器访问,如下图,便安装完成
JMS实现消息的生产者和消费者
队列的方式
生产者:
执行后的结果:
消费者(receive()方法获取信息):
执行结果:
消费者(监听器MessageListener方式获取信息)
消费者获取消息的方式
receive() 方法:该方法是同步阻塞的,如果调用者(队列或者订阅方)没有接受到消息,就会一直等待
除此之外它有一个重载方法 receive(long timeout) ,参数表示的是 等待的超时时间(毫秒),在等待时间内没有接受到消息就自动停止
setMessageListener(MessageListener listener) 方法:该方法是异步非阻塞的,而方法参数类型是MessageListener接口,这个接口里只定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数监听器。当消息到达之后,会自动调用onMessage方法获取信息
消费者的三种情况
- 先启动生产者,生产消息,然后再启动一个消费者,是否可以正常消费信息 ?
- 答案:可以的
- 还是先启动生产者,生产消息,再分别启动两个消费者,后一个启动的消费者可以消费到消息吗 ?
- 答案:否,后一个启动的消费者是消费不到的消息,消息都被前一个启动的消息者消费完了
- 先启动两个消费者,再启动生产者生产6条消息,结果如何 ?
- 答案:消息被平分,两个消费者一人一半,分别消费3条消息
发布订阅方式
生产者:
发布订阅的方式和队列方式, 从生产者的代码可以发现,两者是一样的,唯一不同的就是,创建目的地的方法不一样,队列是createQueue(String queueName),而发布订阅是createTopic(String topicName)。同样的,消费者的代码和队列也是一样的
消费者
不过要注意的是,发布订阅的模式,要先启用订阅方,也就是消费者方,再启动发布方(生产者)。否则,就类似微信公众号一样,没有人订阅,生产方发布消息是没有人接受的到,会被丢弃。这种则是废消息。
队列和发布订阅两者对比
比 较 项 目 | Topic (发布订阅)模式 | Queue (队列)模式 |
工 作 模 式 | “订阅-发布”模式,如果当前没有订阅 者,消息将会被丢弃,如果有多个订阅 者,那么这些订阅者都会收到消息。 | 负载均衡 模式,如果当前没有消费者,消息也不会丢弃,如果有多个消费者,那么一条消息 也只会发送给其中一个消费者,并且要求消费者ack信息 |
有 无 状 态 | 无状态 | Queue 数据默认会在 mq 服务器上以文件形式 保存,比如 Active MQ 一般保存在 $AMQ_HOME\data\kr-store\data下面,也可 以配置成DB存储 |
传 递 完 整 性 | 如果没有订阅者,消息会被丢弃 | 消息不会丢弃 |
处 理 效 率 | 由于消息要按照订阅者的数量进行复 制,所以处理性能会随着订阅者的增加 而明显降低,并且还要结合不同消息协 议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算 消费者再多,性能也不会有明显降低。当然不 同消息协议的具体性能也是有差异的。 |
JMS
JMS是Java EE的一个面向消息中间件的一套规范,具体需要靠厂商的产品实现
消息的三种部分
消息头
- 在生产者方,消息的API看出可以针对每一条消息设置不同的消息头属性
- 如果不想为每一条消息单独设置消息头属性,可以通过Send()方法的重载,批处理消息的消息头属性
- 从批处理消息头属性和单独设置消息头属性两者结合来看,主要介绍五个消息头属性:
JMSDestination: 消息发送的目的地,主要是指 Queue 和 Topic
JMSDeliveryMode: 持久模式和非持久模式,可设置的参数为DeliveryMode.NON_PERSISTENT(非持久) 和 DeliveryMode.PERSISTENT(持久)
一条持久性的消息:应该被传送 “一次仅仅一次”,这就意味着如果JMS 提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递消息。
一条非持久的消息:多会传送一次,这意味着服务器出现故障,该消息将永久丢失。
JMSPriority : 消息优先级,从0 ~ 9 十个级别,0 ~ 4 是普通消息,5 ~ 9是加急消息。
JMS 不要求 MQ 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认是4 级。
JMSExpiration: 可以设置消息在一定时间后过期,默认是永不过期。消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间
值。 如果timeToLive值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
JMSMessageID : 唯一识别每个消息的标识,由MQ产生。
消息体
封装具体的消息数据,发送和接收的消息体类型必须一致对应!
五种消息体格式:
TextMessage: 普通字符串消息,包含一个String 。
MapMessage: 一个Map类型的消息,key为String类型,而值为Java的基本类型。
BytesMessage : 二进制数组消息,包含一个byte[] 。
StreamMessage : Java数据流消息,用标准流操作来顺序的填充和读取。
ObjectMessage :对象消息,包含一个可序列化的 Java 对象。
消息属性
如果需要除消息头字段以外的值,那么可以使用消息属性!
识别、去重、重点标注等操作非常有用的方法!
消息属性是以属性名和属性值对的形式指定的。可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。
以Property结尾:
消息的可靠性
持久性 队列
队列是否设置为持久,可以通过setDeliveryMode(int deliveryMode)方法设置,默认为持久
持久性 发布订阅
发布方(生产者),顺序与之前有一点不同,需要设定先持久化,在启动连接
测试代码:
订阅方(消费者)测试代码:
先启动订阅方,再启动发布方,到ActiveMQ前台的Subscribers查看
启动前情况:
启动订阅方后情况:
启动生产方 后:
订阅方停止后,会被列 离线的一栏去:
如果这时,发布者再次发布消息,订阅方重启启动是否能接收到消息
再次启动订阅方(不停止)后
小结:
第一次,一定要先启动订阅方,之后启动发布方,发布消息。这时不论订阅方在不在线,都可以接受到消息,不在线,重新启动后会接受没有收到过的消息接受下来
事务
事务偏向于生产者、签收偏向于消费者
事务只有两个值,true和false
生产者一方
当值为false时,消息会被自动发送到目的地
当值为true时,需要在调用send()方法后,调用commit()方法,手动提交
消费者一方
当值为false时,消息只会被消费一次
当值为true时,消息会被重复消费,且在目的地中,消息不会出队的,即未被消费
测试:
签收机制
签收针对的是消费者一方,分为以下四种
当事务没有开启时 : 即非事务,消息的确认情况 则以签收机制为主
当事务开始时 : 即事务情况下,消息的确认以事务为主,即是否commit的为主,不用管签收机制的情况。
注:常用的签收模式 主要以自动签收(AUTO_ACKNOWLEDGE)和手动签收(CLIENT_ACKNOWLEDGE)为主
SpringBoot结合ACtiveMQ
队列模式
1、导入maven依赖
2、编写application.properties配置文件
3、编写配置类
4、编写队列的消息生产者Produce
5、编写controller类,并访问接口
查看ActiveMQ的前台,消息已经发送到队列了
6、编写队列的消费者
删除之前入队的信息,然后重新启动 启动类,再次访问接口,并查看ActiveMQ的前台和打印台
订阅发布模式
1、导入maven依赖
2、编写application.properties配置文件
3、编写配置类
4、编写主题的消息生产者(发布方)
5、编写controller类
6、编写主题的消费者(订阅方)
7、启动主启动类,访问接口,让发布方生产消息,并查看ActiveMQ的前台和打印台
定时的订阅发布模式
让发布方每过5秒,发送一次消息
添加 @EnableScheduling 注解,发现定时任务让它后台执行
修改主题的消费者(订阅方),添加LocalDateTime.now()方法,方便在控制台查看
启动程序,查看控制台打印内容,以及(同一时间点内)查看ActiveMQ的前台
ActiveMQ传输协议
在连接activemq时,默认是用TCP连接,那Activemq是否还支持其它协议吗?
查看activemq的conf文件夹下的activemq.xml
可以看出,除了支持默认的TCP协议,还支持 amqp ,stomp,mqtt,ws等协议,而且协议对应的端口号也不一样,这就是用TCP连接时,端口号是61616的原因了。
从activemq前台也可以查看出来
如果想配置其它协议,可以查看官网的文档
配置NIO协议
从官网的文档得知,配置NIO协议,只需要修改activemq.xml文件
重启activemq(重启容器),查看activemq前台
修改代码的连接方式,并测试
查看前台
ActiveMQ持久化
持久化不同于之前的持久性,持久化是以防服务器本身挂了,重启服务器也可以恢复消息
ActiveMQ目前默认的持久化方式是KahaDB,此外还有JDBC, LevelDB等,不过它们大体上的逻辑是一样,消息中心接受到消息。会把消息备份存储起来。
KahaDB(默认)
KahaDB:这个5.4版本之后出现的默认的持久化方式, KahaDB的持久化机制是基于日志文件,索引和缓存。
从activemq.xml配置文件中可以得知
它是把数据存储在activemq的data目录下的kahadb文件夹中
在这个文件夹中会有四个文件来完成消息持久化
- db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
- db.redo 用来进行消息恢复
- db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增文 件,文件名按照数字进行编号,如 db-1.log、db-2.log、.....
- lock 文件 锁,写入当前获得kahadb读写权限的broker ,用于在集群环境下的竞争处理
KahaDB配置官网链接: http://activemq.apache.org/kahadb
LevelDB
这种文件系统是从ActiveMQ 5.8 之后引进的,它和 KahaDB 非常相似,也是基于文件的本地数据库存 储形式,但是它提供比 KahaDB更快的持久性。
但它不使用自定义 B-Tree 实现来索引预写日志,而是使用基于LevelDB的索引。
可是为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的
LeavelDB 比LeavelDB 更性能更优越,但需要基于 ZooKeeper 所以这些官方还没有定论,仍旧使用 KahaDB。
默认配置:
LevelDB配置官网链接: http://activemq.apache.org/leveldb-store
JDBC
看名字就知道和数据库有关,所以JDB的持久化就是把消息备份到数据库中
由于用docker比较麻烦,所以直接改为windows本地测试
首先下载activemq的windows版本,http://activemq.apache.org/components/classic/download/
然后下载mysql的驱动包,
把mysql驱动包丢到activemq的lib下
打开activemq的conf目录下的activemq.xml文件,配置内容:
参数解释:
dataSource: 表示的是引用的数据源名称
createTablesOnStartup: 表示启动的时候是否去创建表,一般第一次为true,之后改为false
接着由于指定了数据源,所以需要在activemq.xml配置一个数据源,不过要注意位置
然后创建好数据库,数据库名字与数据源链接的要一致
测试:
Queue
启动后,分别查看数据库和前台:
启动消费者查看数据库和前台:
Queue 小结:
在点对点类型中,当DeliveryMode设置为NON_PERSISTENCE 时,消息被保存在内存中; 当DeliveryMode设置为PERSISTENCE 时,消息被保存在Broker的相应的文件或者数据库中。而且点对点类型中消息一旦被Consumer消费就从Broker中删除
Topic
先启动订阅方然后再启动发布方,查看数据库和前台:
Topic 小结:
一定要先启动消费订阅然后再启动生产,消息会被保留在 activemq_msgs表中消费完毕也不会删除,而订阅关系在 acks 表 中。
- 作者:十十乙
- 链接:https://shishiyi.cc/article/bac843aa-de8e-4de8-b7ee-a1528bc91ae6
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章