ActiveMQ 使用进阶

本文写于2017年7月6日

Java消息服务回顾(JMS)

七大组件

两种模型

点对点或队列模型

  • 只有一个消费者将获得消息
  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
  • 每一个成功处理的消息都由接收者签收

发布者订阅者模型

  • 多个消费者可以获得消息
  • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

ActiveMQ是一个开源的JMS服务提供者

名词解释

  • Broker: JMS服务端,接收与保存生产者发送的消息内容,并推送给消费者使用;
  • 持久化消息:持久性属于消息的一个属性,生产者在发送消息时,可以指定消息属性为持久化消息,broker会将此消息持久化到文件或数据库之后,再推送给消费者;这样即使broker重启也不会丢失消息;JMS规范中,默认消息发送就是持久化的;
  • 非持久化消息:顾名思义消息不会在broker中持久化到文件,重启可能会产生消息丢失,非持久化消息在异步发送时,性能比持久化消息快20倍(有待考证);
  • 同步发送:生产者发送消息到broker时,需要同步等待broker的响应;如果你不开启事务、并且发送持久化消息,borker将在消息持久化完成之后,才给生产者发送确认消息;这样做的好处是,broker一旦确认收到消息,消息将不会被丢失;可以在连接工厂上设置,也可以在单个连接上设置;
  • 异步发送:生产者发送消息到broker时,不用同步等待broker的响应;异步发送效率高,可能会出现消息丢失;可以在连接工厂上设置,也可以在单个连接上设置;
  • 非持久订阅:消息模型一定是发布者/订阅者模型,只有在消费者在线的情况下,才会收到生产者发送到某个topic的消息,消费者处于离线状态时,这个时间段的消息将不会被收到,即使消费者重新在线也将无法接受到;
  • 持久订阅:消息模型一定是发布者/订阅者模型,消费者向Broker注册一个自己身份的标识(ClientID+订阅者名字),当这个消费者离线时,broker会为这个标识保存发送到某个topic的消息,当消费者重新在线时,会根据自己标识获取到离线期间的消息;

Connector Types

Client Failover

failover:(tcp://host1:61616,tcp://host2:61616)?randomize=false

与Spring整合

组件依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>3.2.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>3.2.8.RELEASE</version>
</dependence>
<dependence>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.12</version>
</dependency>

配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
       xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.1.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 自动装载com.aqlu包下的所有Bean -->
    <context:component-scan base-package="com.aqlu"/>

    <!-- 加载配置文件 -->
    <context:property-placeholder system-properties-mode="OVERRIDE" ignore-resource-not-found="true"
                                  location="classpath:config.properties"/>


    <!-- 定义消息的目的地址(queue)-->
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="TestQueue"/><!-- queue名字 -->
    </bean>

    <!-- 定义消息的目的地址(topic) -->
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="testTopic"/><!-- topic名字 -->
    </bean>

    <!-- 定义消息的目的地址(虚拟topic)-->
    <bean id="testVirtualTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="VirtualTopic.testTopic"/><!-- topic名字,注意这里必须要带上VirtualTopic. -->
    </bean>

    <!-- 生产者连接工厂 -->
    <amq:connectionFactory id="producerConnectionFactory" brokerURL="${producer.brokerUrl}"
                           useAsyncSend="true"/>

    <!-- 配置具有session缓存给你的连接工厂 -->
    <bean id="producerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="producerConnectionFactory"/>
        <property name="sessionCacheSize" value="${producer.sessionCacheSize}"/>
    </bean>

    <!-- 简单消息转换器,能够对String、byte[]、Map、Serializable类型的消息自动转换 -->
    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

    <!--配置生产者消息发送模板-->
    <bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="producerCachingConnectionFactory"/> <!-- 指定连接工厂 -->
        <property name="explicitQosEnabled" value="true"/><!-- 是否启用Qos; 只有开启Qos时,设置的deliveryMode, priority, timeToLive才能生效 -->
        <property name="deliveryPersistent" value="true"/><!-- 设置消息是否需要持久化, 默认为PERSISTENT -->
        <property name="priority" value="4"/><!-- 设置优先级, 默认为4 越高优先级越高。kahaDB只支持3种优先级,<4,=4,>4 -->
        <property name="timeToLive" value="${producer.timeToLive}"/><!-- 消息有效时间,单位毫秒,默认是不过期;超过消息有效期的消息
将被broker“删除”;强烈建议明确设置此参数,目前我司的业务场景消息如果在一小时还未被处理的话,大部分情况都没有价值了 -->
        <property name="messageConverter" ref="simpleMessageConverter"/><!-- 指定消息转换器 -->
    </bean>
</beans>

生产者(消息发送者)代码

package com.aqlu.demo.amq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import java.io.Serializable;

/**
 * Created by aqlu on 14-6-25.
 */
@Component
public class MessageSender {

    @Autowired
    @Qualifier("producerJmsTemplate")
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("testTopic")
    private Destination destination;

    /**
     * 发送消息
     * @param message 消息对象
     */
    public void send(final Serializable message) {
        try{
            jmsTemplate.convertAndSend(destination, message);
        }catch(JmsException e){
            // ....  TODO
        }
    }

    public Destination getDestination() {
        return destination;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("application_provider.xml");
        MessageSender sender = context.getBean(MessageSender.class);
        sender.send("Hello broker!");
    }
}

配置消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
       xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.1.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 自动装载com.aqlu包下的所有Bean -->
    <context:component-scan base-package="com.aqlu"/>

    <!-- 加载配置文件 -->
    <context:property-placeholder system-properties-mode="OVERRIDE" ignore-resource-not-found="true"
                                  location="classpath:config.properties"/>

    <!-- 定义消息的目的地址(queue)-->
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="TestQueue"/><!-- queue名字 -->
    </bean>

    <!-- 定义消息的目的地址(topic) -->
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="testTopic"/><!-- topic名字 -->
    </bean>

    <!-- 定义消息的目的地址(虚拟topic)-->
    <bean id="testVirtualQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="VirtualTopicConsumers.${consumer.clientId}.VirtualTopic.testTopic"/><!-- 前缀根据broker配置-->
    </bean>

    <!-- 消费者连接工厂; borkerURL: broker地址; useAsyncSend:是否使用异步发送;clientID:客户端标识;-->
    <amq:connectionFactory id="consumerConnectionFactory" brokerURL="${consumer.brokerUrl}"
                           clientID="${consumer.clientId}"/>

    <!-- 配置具有session缓存给你的连接工厂 -->
    <bean id="consumerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="consumerConnectionFactory"/>
        <property name="sessionCacheSize" value="${consumer.sessionCacheSize}"/>
    </bean>

    <!-- 配置消费者监听容器 -->
    <bean id="taskContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerConnectionFactory"/> <!-- 指定连接工厂-->
        <property name="destination" ref="testTopic"/> <!-- 目的地址,queue或者topic -->
        <property name="messageListener" ref="messageConsumer"/> <!-- 指定消息消费监听对象,实现MessageListener 接口 -->
        <property name="concurrentConsumers" value="${consumer.concurrentConsumers}"/><!-- 配置监听对象的并发数 -->
        <property name="maxConcurrentConsumers" value="${consumer.maxConcurrentConsumers}"/><!-- 配置监听对象的最大并发数 -->
        <!--<property name="clientId" value="sss"/>--> <!-- 客户端标识,不配的话会使用连接工程配置的clientID,连接工厂也没陪的话会自动删除一个 -->
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/><!-- 在使用Spring的情况下,AUTO确认模式会在
消息进入业务方法前进行回应,Client则会在之后,DUL_OK可以延迟消息回应并批量处理,但这可能导致消息重复发送。 -->
        <property name="sessionTransacted" value="false"/> <!-- 是否开启事务,默认false-->
        <property name="subscriptionDurable" value="true"/><!-- 是否持久化订阅,此属性仅在订阅topic时有效;持久化订阅时,监听对象的并发
数只能为1;在使用queue消费时,请注释掉,否则会影响启动 -->
        <property name="durableSubscriptionName" value="node1"/><!-- 持久化订阅者名称;持久化订阅开启才生效,持久化订阅时必配;在使用
queue消费时,请注释掉,否则会影响启动 -->
    </bean>

</beans >

消费者(消息接收者)代码

package com.aqlu.demo.amq;

import org.springframework.stereotype.Component;

import javax.jms.*;

/**
 * Created by aqlu on 14-6-25.
 */
@Component("messageConsumer")
public class MessageConsumer implements MessageListener {
    @Override
    public void onMessage(Message message)  {
        try {
            if (message instanceof TextMessage) {
                System.out.println(Thread.currentThread().getName() +  "接受到消息: " + ((TextMessage) message).getText());
            }else if(message instanceof ObjectMessage){
                System.out.println(Thread.currentThread().getName() +  "接受到消息: " + ((ObjectMessage) message).getObject());
            }else{
                System.out.println(Thread.currentThread().getName() + "不能识别此消息:" + message);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("application_consumer.xml");
    }
}

业务使用建议

消息内容

  1. 消息内容不宜过大,建议尽量使用简单文本消息,每个消息体大小不超过1kb;如有大的消息需要传递,建议使用业务设计避免开,譬如:货品状态发生变化时,消息体里面仅保存货品id以及变化后的状态,如需其他货品信息可以由业务根据货品id调接口查询;
  2. 合理配置消息有效期;

发送方式

  1. 建议所有发送方式设置为持久化发送;非持久化消息在broker重启或故障时,会有消息丢失的可能;
  2. 对消息生产者来说,异步发送会会极大的提高系统的吞吐量;ActiveMQ缺省是采用异步发送发送,但按照JMS规范,如果不开启事务且发送持久化消息时,会强制采用同步方式发送,在这种情况下,每一次发送都是同步的,而且阻塞到收到broker的应答,这个应答保证了broker已经成功地将消息持久化,而且不会丢失,但是这样作也严重地影响了性能。如果你的系统可以容忍少量的小事丢失,也可以指定在不开启事务时采用异步方式发送持久化消息;配置方式如下:
     配置BorkerURL参数: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.useAsyncSend=true
     配置连接工厂useAsyncSend属性: <amq:connectionFactory id="producerConnectionFactory" brokerURL="${producer.brokerUrl}" useAsyncSend="true"/>
    
  3. 建议消息生产者对于重要消息根据自己业务建立补发机制,针对发送消息异常情况进行补发;

消费方式

  1. 建议消费者在MessageListener的实现类中,异步处理自己的业务逻辑,以达到快消费;处理流程示例:
  2. 在生产者产能过剩且消费者处理能力很快的情况下,可以通过设置消费者预取值来提升性能;配置方式如下:
     配置所有消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.all=50
     配置所有queue消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.queuePrefetch=1000
     配置所有topic消费者的预取值: failover:(tcp://host1:61616,tcp://host2:51616)?randomize=false&jms.prefetchPolicy.topicPrefetch=1000
    
  3. 建议消费端处理接口采用幂等设计;

Consumer数量配置

MessageListenerContainer中允许定义并发的consumer数量,在Queue模式下,通过增大并发consumer数量可以提高消费能力,而且Queue中的消息只会被消费一次;但在Topic模式下,增大并发consumer并不会提升消费能力,因为在Topic模式中,每个consumer都是独立的订阅者,即每个consumer都会收到全量topic消息,通常情况下concurrentConsumersmaxConcurrentConsumers都配置为1;

模型选择

  1. 如何实现消费者在集群环境下,不重复消费Topic消息?
    1. 设置集群中的所有ConsumerClientID相同;可以在ConnectionFactory中配置,也可以在MessageListenerContainer中配置; ActiveMQ会保证同一时刻、只有唯一的ClientIDconsumer连接上Broker,直到这个consumer关闭,集群中的其他consumer才能连接;优点:生产、消费速度较均衡,broker压力小;弱点:消费者为冷备,压力集中在一个负载上;推荐使用
    2. 使用virtualtopic;生产者发送特殊的Topic消息到brokerbroker负责给topic的订阅者创建Queue,并将消息放入queue,然后消费者从Queue中消费;优点:可以配置多个消费者,消费速度快;弱点:生产速度慢,特别是同步发送持久化消息时,broker压力大;
  2. 如何实现消费者下线后,恢复上线时能接受到离线期间的Topic消息?
    1. 消费者配置持久化订阅;持久化订阅需要消费者同时满足几个条件:①指定ClientID;②concurrentConsumersmaxConcurrentConsumers都配置为1;③开启subscriptionDurable与设定durableSubscriptionName推荐使用
    2. 使用virtualtopic;

延时与定时投递

ActiveMQ提供了一种broker端消息定时调度机制,能够满足用户这样的场景:有时候不希望消息马上被broker投递出去,而是想要60秒之后发给消费者,或者我们想让消息每隔一定时间段投递一次,一共投递指定的次数;虽然ActiveMQ提供了满足这样场景的功能,但不推荐使用;原因如下:

  1. 基于目前的AMQ设计,发送持久化延时消息时可能会导致某些持久化文件无法正常清除;
  2. 额外增大了broker的负担;
  3. 在使用新的leveldb高可靠方案时,由于这些信息依然还存在单独的kahaDB中,apache已明确声明暂不支持;

如有延时与定时投递的需求场景,建议业务层使用JDK自带的DelayQueue来实现,生产者在到达投递时间后再将消息投递到broker中;


   转载规则


《ActiveMQ 使用进阶》 Angus_Lu 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
ActiveMQ 高可用配置(LevelDB) ActiveMQ 高可用配置(LevelDB)
本文写于2017年8月14日ActiveMQ的LevelDB方案是在5.9.0版本开始引入,使用zookeeper来决定当前Replicas中的Master与Slave,并不采用zookeeper来存储MQ数据;其工作原理是生产者(或消费者
2019-09-10 15:19:40
下一篇 
基于SpringBoot快速发布SOAP接口 基于SpringBoot快速发布SOAP接口
WebService、SOAP、WSDL的简单回顾WebService是什么?WebService在的概念很容易引起误解,很多初级开发人员的认知WebService接口等同于Soap接口,其实这是一种错误的认知。WebService是相对于
2019-01-02 14:22:35
  目录