本文写于2017年7月6日
Java消息服务回顾(JMS)
七大组件
两种模型
点对点或队列模型
- 只有一个消费者将获得消息
- 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
- 每一个成功处理的消息都由接收者签收
发布者订阅者模型
- 多个消费者可以获得消息
- 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
ActiveMQ是一个开源的JMS服务提供者
名词解释
- Broker: JMS服务端,接收与保存生产者发送的消息内容,并推送给消费者使用;
- 持久化消息:持久性属于消息的一个属性,生产者在发送消息时,可以指定消息属性为持久化消息,
broker
会将此消息持久化到文件或数据库之后,再推送给消费者;这样即使broker重启也不会丢失消息;JMS
规范中,默认消息发送就是持久化的; - 非持久化消息:顾名思义消息不会在
broker
中持久化到文件,重启可能会产生消息丢失,非持久化消息在异步发送时,性能比持久化消息快20倍(有待考证); - 同步发送:生产者发送消息到
broker
时,需要同步等待broker
的响应;如果你不开启事务、并且发送持久化消息,borker
将在消息持久化完成之后,才给生产者发送确认消息;这样做的好处是,broker
一旦确认收到消息,消息将不会被丢失;可以在连接工厂上设置,也可以在单个连接上设置; - 异步发送:生产者发送消息到
broker
时,不用同步等待broker
的响应;异步发送效率高,可能会出现消息丢失;可以在连接工厂上设置,也可以在单个连接上设置; - 非持久订阅:消息模型一定是
发布者/订阅者
模型,只有在消费者在线的情况下,才会收到生产者发送到某个topic
的消息,消费者处于离线状态时,这个时间段的消息将不会被收到,即使消费者重新在线也将无法接受到; - 持久订阅:消息模型一定是发布者/订阅者模型,消费者向
Broker
注册一个自己身份的标识(ClientID+订阅者名字
),当这个消费者离线时,broker
会为这个标识保存发送到某个topic
的消息,当消费者重新在线时,会根据自己标识获取到离线期间的消息;
Connector Types
Client Failover
failover:(tcp://host1:61616,tcp://host2:61616)?randomize=false
与Spring整合
组件依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.8.RELEASE</version>
</dependence>
<dependence>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.12</version>
</dependency>
配置生产者
<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- 自动装载com.aqlu包下的所有Bean -->
<context:component-scan base-package="com.aqlu"/>
<!-- 加载配置文件 -->
<context:property-placeholder system-properties-mode="OVERRIDE" ignore-resource-not-found="true"
location="classpath:config.properties"/>
<!-- 定义消息的目的地址(queue)-->
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TestQueue"/><!-- queue名字 -->
</bean>
<!-- 定义消息的目的地址(topic) -->
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="testTopic"/><!-- topic名字 -->
</bean>
<!-- 定义消息的目的地址(虚拟topic)-->
<bean id="testVirtualTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="VirtualTopic.testTopic"/><!-- topic名字,注意这里必须要带上VirtualTopic. -->
</bean>
<!-- 生产者连接工厂 -->
<amq:connectionFactory id="producerConnectionFactory" brokerURL="${producer.brokerUrl}"
useAsyncSend="true"/>
<!-- 配置具有session缓存给你的连接工厂 -->
<bean id="producerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="producerConnectionFactory"/>
<property name="sessionCacheSize" value="${producer.sessionCacheSize}"/>
</bean>
<!-- 简单消息转换器,能够对String、byte[]、Map、Serializable类型的消息自动转换 -->
<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<!--配置生产者消息发送模板-->
<bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="producerCachingConnectionFactory"/> <!-- 指定连接工厂 -->
<property name="explicitQosEnabled" value="true"/><!-- 是否启用Qos; 只有开启Qos时,设置的deliveryMode, priority, timeToLive才能生效 -->
<property name="deliveryPersistent" value="true"/><!-- 设置消息是否需要持久化, 默认为PERSISTENT -->
<property name="priority" value="4"/><!-- 设置优先级, 默认为4 越高优先级越高。kahaDB只支持3种优先级,<4,=4,>4 -->
<property name="timeToLive" value="${producer.timeToLive}"/><!-- 消息有效时间,单位毫秒,默认是不过期;超过消息有效期的消息
将被broker“删除”;强烈建议明确设置此参数,目前我司的业务场景消息如果在一小时还未被处理的话,大部分情况都没有价值了 -->
<property name="messageConverter" ref="simpleMessageConverter"/><!-- 指定消息转换器 -->
</bean>
</beans>
生产者(消息发送者)代码
package com.aqlu.demo.amq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
import java.io.Serializable;
/**
* Created by aqlu on 14-6-25.
*/
@Component
public class MessageSender {
@Autowired
@Qualifier("producerJmsTemplate")
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("testTopic")
private Destination destination;
/**
* 发送消息
* @param message 消息对象
*/
public void send(final Serializable message) {
try{
jmsTemplate.convertAndSend(destination, message);
}catch(JmsException e){
// .... TODO
}
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("application_provider.xml");
MessageSender sender = context.getBean(MessageSender.class);
sender.send("Hello broker!");
}
}
配置消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- 自动装载com.aqlu包下的所有Bean -->
<context:component-scan base-package="com.aqlu"/>
<!-- 加载配置文件 -->
<context:property-placeholder system-properties-mode="OVERRIDE" ignore-resource-not-found="true"
location="classpath:config.properties"/>
<!-- 定义消息的目的地址(queue)-->
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="TestQueue"/><!-- queue名字 -->
</bean>
<!-- 定义消息的目的地址(topic) -->
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="testTopic"/><!-- topic名字 -->
</bean>
<!-- 定义消息的目的地址(虚拟topic)-->
<bean id="testVirtualQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="VirtualTopicConsumers.${consumer.clientId}.VirtualTopic.testTopic"/><!-- 前缀根据broker配置-->
</bean>
<!-- 消费者连接工厂; borkerURL: broker地址; useAsyncSend:是否使用异步发送;clientID:客户端标识;-->
<amq:connectionFactory id="consumerConnectionFactory" brokerURL="${consumer.brokerUrl}"
clientID="${consumer.clientId}"/>
<!-- 配置具有session缓存给你的连接工厂 -->
<bean id="consumerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="consumerConnectionFactory"/>
<property name="sessionCacheSize" value="${consumer.sessionCacheSize}"/>
</bean>
<!-- 配置消费者监听容器 -->
<bean id="taskContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerConnectionFactory"/> <!-- 指定连接工厂-->
<property name="destination" ref="testTopic"/> <!-- 目的地址,queue或者topic -->
<property name="messageListener" ref="messageConsumer"/> <!-- 指定消息消费监听对象,实现MessageListener 接口 -->
<property name="concurrentConsumers" value="${consumer.concurrentConsumers}"/><!-- 配置监听对象的并发数 -->
<property name="maxConcurrentConsumers" value="${consumer.maxConcurrentConsumers}"/><!-- 配置监听对象的最大并发数 -->
<!--<property name="clientId" value="sss"/>--> <!-- 客户端标识,不配的话会使用连接工程配置的clientID,连接工厂也没陪的话会自动删除一个 -->
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/><!-- 在使用Spring的情况下,AUTO确认模式会在
消息进入业务方法前进行回应,Client则会在之后,DUL_OK可以延迟消息回应并批量处理,但这可能导致消息重复发送。 -->
<property name="sessionTransacted" value="false"/> <!-- 是否开启事务,默认false-->
<property name="subscriptionDurable" value="true"/><!-- 是否持久化订阅,此属性仅在订阅topic时有效;持久化订阅时,监听对象的并发
数只能为1;在使用queue消费时,请注释掉,否则会影响启动 -->
<property name="durableSubscriptionName" value="node1"/><!-- 持久化订阅者名称;持久化订阅开启才生效,持久化订阅时必配;在使用
queue消费时,请注释掉,否则会影响启动 -->
</bean>
</beans >
消费者(消息接收者)代码
package com.aqlu.demo.amq;
import org.springframework.stereotype.Component;
import javax.jms.*;
/**
* Created by aqlu on 14-6-25.
*/
@Component("messageConsumer")
public class MessageConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println(Thread.currentThread().getName() + "接受到消息: " + ((TextMessage) message).getText());
}else if(message instanceof ObjectMessage){
System.out.println(Thread.currentThread().getName() + "接受到消息: " + ((ObjectMessage) message).getObject());
}else{
System.out.println(Thread.currentThread().getName() + "不能识别此消息:" + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("application_consumer.xml");
}
}
业务使用建议
消息内容
- 消息内容不宜过大,建议尽量使用简单文本消息,每个消息体大小不超过1kb;如有大的消息需要传递,建议使用业务设计避免开,譬如:货品状态发生变化时,消息体里面仅保存货品id以及变化后的状态,如需其他货品信息可以由业务根据货品id调接口查询;
- 合理配置消息有效期;
发送方式
- 建议所有发送方式设置为持久化发送;非持久化消息在
broker
重启或故障时,会有消息丢失的可能; - 对消息生产者来说,异步发送会会极大的提高系统的吞吐量;ActiveMQ缺省是采用异步发送发送,但按照JMS规范,如果不开启事务且发送持久化消息时,会强制采用同步方式发送,在这种情况下,每一次发送都是同步的,而且阻塞到收到
broker
的应答,这个应答保证了broker
已经成功地将消息持久化,而且不会丢失,但是这样作也严重地影响了性能。如果你的系统可以容忍少量的小事丢失,也可以指定在不开启事务时采用异步方式
发送持久化消息
;配置方式如下:配置BorkerURL参数: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.useAsyncSend=true 配置连接工厂useAsyncSend属性: <amq:connectionFactory id="producerConnectionFactory" brokerURL="${producer.brokerUrl}" useAsyncSend="true"/>
- 建议消息生产者对于重要消息根据自己业务建立补发机制,针对发送消息异常情况进行补发;
消费方式
- 建议消费者在
MessageListener
的实现类中,异步处理自己的业务逻辑,以达到快消费;处理流程示例: - 在生产者产能过剩且消费者处理能力很快的情况下,可以通过设置消费者预取值来提升性能;配置方式如下:
配置所有消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.all=50 配置所有queue消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.queuePrefetch=1000 配置所有topic消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.topicPrefetch=1000
- 建议消费端处理接口采用幂等设计;
Consumer数量配置
MessageListenerContainer
中允许定义并发的consumer
数量,在Queue
模式下,通过增大并发consumer
数量可以提高消费能力,而且Queue
中的消息只会被消费一次;但在Topic
模式下,增大并发consumer
并不会提升消费能力,因为在Topic
模式中,每个consumer
都是独立的订阅者,即每个consumer
都会收到全量的topic
消息,通常情况下concurrentConsumers
与maxConcurrentConsumers
都配置为1;
模型选择
- 如何实现消费者在集群环境下,不重复消费
Topic
消息?- 设置集群中的所有
Consumer
的ClientID
相同;可以在ConnectionFactory
中配置,也可以在MessageListenerContainer
中配置;ActiveMQ
会保证同一时刻、只有唯一的ClientID
的consumer
连接上Broker
,直到这个consumer
关闭,集群中的其他consumer
才能连接;优点:生产、消费速度较均衡,broker
压力小;弱点:消费者为冷备,压力集中在一个负载上;推荐使用; - 使用
virtualtopic
;生产者发送特殊的Topic
消息到broker
,broker
负责给topic
的订阅者创建Queue
,并将消息放入queue
,然后消费者从Queue
中消费;优点:可以配置多个消费者,消费速度快;弱点:生产速度慢,特别是同步发送持久化消息时,broker
压力大;
- 设置集群中的所有
- 如何实现消费者下线后,恢复上线时能接受到离线期间的
Topic
消息?- 消费者配置持久化订阅;持久化订阅需要消费者同时满足几个条件:①指定
ClientID
;②concurrentConsumers
与maxConcurrentConsumers
都配置为1;③开启subscriptionDurable
与设定durableSubscriptionName
;推荐使用; - 使用
virtualtopic;
- 消费者配置持久化订阅;持久化订阅需要消费者同时满足几个条件:①指定
延时与定时投递
ActiveMQ提供了一种broker
端消息定时调度机制,能够满足用户这样的场景:有时候不希望消息马上被broker投递出去,而是想要60秒
之后发给消费者,或者我们想让消息每隔一定时间段投递一次,一共投递指定的次数;虽然ActiveMQ提供了满足这样场景的功能,但不推荐使用;原因如下:
- 基于目前的AMQ设计,发送持久化延时消息时可能会导致某些持久化文件无法正常清除;
- 额外增大了broker的负担;
- 在使用新的leveldb高可靠方案时,由于这些信息依然还存在单独的kahaDB中,apache已明确声明暂不支持;
如有延时与定时投递的需求场景,建议业务层使用JDK自带的DelayQueue
来实现,生产者在到达投递时间后再将消息投递到broker
中;