摘要

公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。
调研提出了三种方案。这里是相关记录和说明。

问题

运维同事对生产环境使用的ActiveMQ做了相关监控。这个监控在某个队列出现消息积压时(实际规则更复杂一些,并且正在调整)发送短信报警。运维接到短信后会通知开发负责人。开发负责人再检查系统是否在正常监听相关队列。 
但是,从过往经验来看,只有一次消息积压是业务系统故障导致的;其它情况(没有统计到具体数据,大约五六次)都是业务系统已经不再监听该队列导致的。这使得我们的运维、开发同事半夜三更火急火燎检查问题,结果发现只需要删除那个队列就可以了。 
尤其惹发起床气的是,由于线上ActiveMQ配置了消息持久化,这种消息积压其实并不会对ActiveMQ产生多大的影响,完全可以在第二天上班后再处理。
考虑到大家的睡眠质量和夫妻感情,在JIRA中,我们调研、讨论了三个方案。

方案一:ActiveMQ自带配置

在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它可以删除“没有未处理消息、并且没有消费者的Destination”。

配置示例

这个配置比较简单,在ActiveMQ的配置文件activemq.xml中,做如下改动即可。这里示例的是对queue的配置;topic配置是类似的。

<!-- 在这里加上schedulePeriodForDestinationPurge属性。 -->
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000"
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <!-- 在这里加上gcInactiveDestinations和inactiveTimoutBeforeGC两个属性 -->
                <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
</broker>

上述示例配置的含义是:这个Broker会每隔10000ms(由schedulePeriodForDestinationPurge配置指定)扫描一次标记有“gcInactiveDestinations="true"”的Queue(由于这里配置的是queue=">",因而实际是扫描所有Queue),将其中“没有未处理消息、并且没有消费者、并且此状态已超过30000ms(由inactiveTimoutBeforeGC配置指定)”的队列删除掉。有点晕。各配置项的具体说明如下。

配置项说明

以下三个配置项中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是选填配置。

schedulePeriodForDestinationPurge

这是针对Broker的配置,用于声明“扫描闲置队列的周期”,单位为毫秒。默认值为0,意为“不扫描”。 
需要说明的是,这里只能配置扫描任务的启动周期、不能配置启动延迟。也就是说,配置好了之后,ActiveMQ服务启动时会立即扫描一次;然后再按照指定时间周期性扫描。

gcInactiveDestinations

这是针对Destination的配置,用于声明当Broker扫描闲置队列时,是否扫描这个Destination(由queue="xxxx"来指定)。默认值是false。

inactiveTimoutBeforeGC

这也是针对Destination的配置,用于声明这个Destination闲置多长时间后可以被删除。单位毫秒,默认时间60s。 
这个配置必须在gcInactiveDestinations被设置为true的情况下才会生效。

方案分析

虽然上面介绍了这么多,但实际上,从第一句话中就可以看出这个方案无法解决我们的问题。因为我们的问题是要处理“有消息积压、但没有消费者的Destination”,而这个方案只能删除“没有未处理消息、并且没有消费者的Destination”。 
除此之外,这应该算是最简单可靠的一种方案了。实际上,对大多数原生Queue来说,业务系统会同时下线其生产者与消费者。这个方案可以很好的应对这种情况。

方案二:自定义ActiveMQ插件

ActiveMQ插件(plugin),也有文档中称为拦截器(Interceptor)。二者其实是相辅相成的:配置时,我们需要一个插件;执行时,我们需要一个拦截器。 
ActiveMQ官方提供了几个插件(日志、统计、时间戳等),可以参见官方说明和开发文档。我们可以参考官方示例来自定义一个插件。

配置

ActiveMQ通过解析activemq.xml中的配置,来加载一个插件的。因此我们从配置入手,逐步搞清楚插件和拦截器是如何工作的。 
activemq.xml中的配置其实很简单,如下所示:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" advisorySupport="false">
    <!-- 用plugins标签声明这是一个插件 -->
    <plugins>
        <!-- bea的语法来自spring,xml name space的声明已经说明了这一点 -->
        <bean xmlns="http://www.springframework.org/schema/beans" id="linjunPlugin" class="net.loyintean.blog.jms.manage.PlugIn"/>
    </plugins>
    <!-- 其它配置 -->
</broker>

上述配置声明了一个插件,插件类名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。 
这个类必须包含在ActiveMQ的classpath路径下。我们可以自己打一个jar包,并把jar包放到ActiveMQ的lib路径下;也可以修改相关类路径。总之要保证ActiveMQ能够加载到这个类(及其依赖类)。 
其实按照上面的配置,并不需要为插件配置一个id。不过,插件声明还有其它方式,有些是需要使用id的。这里不多说,可以参考开发文档。 
如配置中的注释所说,声明插件所使用的<bean />标签及语法来自spring。也就是说,spring中的<property />等其它标签,这里也是支持的。不过目前还没有找到对@Autowired等注解的支持方式。

插件

由于我使用的是spring boot,只需要加上一个spring-boot-starter-activemq就可以引入所需依赖jar包了。不使用spring boot的话,需要引入activemq-broker-x.y.z.jar。 
根据ActiveMQ规范,插件必须实现BrokerPlugin接口。这个接口只有一个方法:Broker installPlugin(Broker broker) throws Exception,用于在服务启动、加载插件时,获取当前启动的borker实例,并返回一个Broker实例。 
例如,上文中声明的linjunPlugin代码如下:

public class PlugIn implements BrokerPlugin {
    /**
    * @author linjun
    * @since 2017年10月30日
    * @param b
    * @return
    * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
    */
    @Override
    public Broker installPlugin(Broker b) {
        return new RemoveDestination(b);
    }
}

似乎有些莫名其妙,但从“装饰者”的角度来理解就轻松愉快了:入参broker是原生实例(当然也可能是其它插件“装饰”过的);出参则是被我们自己的插件“装饰”过的、增强版的实例。 
一般来说,启动过程不会做太多处理;处理逻辑在我们的“装饰者”中——如上面代码里的RemoveDestination。

拦截器

如上文所说,我们需要提供的是一个“装饰”过的Broker。但是Broker是一个接口,其中有超过50个方法,用于处理Broker在服务期间的各种事件(如服务启动、创建链接、消息收发、事务提交与回滚等等)。直接实现接口未免太丑陋了。ActiveMQ也考虑到了这一点,因此给我们提供了一个适配器(其实同时也是一个装饰者):BrokerFilter。它的代码如下:

public class BrokerFilter implements Broker {
    // 被“装饰”的原生实例
    protected final Broker next;
    public BrokerFilter(Broker next) {
        this.next = next;
    }
    // 省略其它接口方法,全部都直接委托给next处理。
}

借助这个适配器,我们可以专注的处理我们关注的事件。如我们的RemoveDestination,它只需要在服务启动时注册一个定时器,按需删除无人监听的队列即可。代码如下:

public class RemoveDestination extends BrokerFilter {
    private Timer timer;
    /**
     * @param next
     */
    public RemoveDestination(Broker next) {
        super(next);
        // 声明为守护线程,避免它阻塞关闭activeMQ的进程
        this.timer = new Timer(true);
    }
    @Override
    public void start() throws Exception {
        super.start();
        // DONE linjun 2017-11-01 改为定时调度
        this.timer.schedule(new TimerTask() {
            @Override
            public void run() {
                RemoveDestination.this.remove();
            }
        }, 3000, 3000);
    }
    private void remove() {
        Map<ActiveMQDestination, Destination> destinationMap = this
            .getDestinationMap();
        ConnectionContext context = BrokerSupport.getConnectionContext(this);
        destinationMap.entrySet().forEach(entry -> {
            Destination destination = entry.getValue();
            // 无人监听了
            // DONE linjun 2017-11-01 只处理queue,不处理topic
            if (destination.getDestinationStatistics().getConsumers()
                .getCount() == 0) {
                ActiveMQDestination activeMQDestination = entry.getKey();
                if (activeMQDestination.isQueue()) {
                    try {
                        this.removeDestination(context,
                            activeMQDestination, 1);
                    } catch (Exception e) {
                        // 示例代码,不要喷我直接打印堆栈
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

除了BrokerFilter这个针对Broker事件做拦截、装饰的类之外,也有针对Destination的DestinationFilter,不赘述。

特别说明

无论是BrokerFilter还是DestinationFilter,在重写父类的某个方法时,要注意调用super中的对应方法。如RemoveDestination类在覆盖start()方法时,调用了super.start()方法。 
这两个类中的每一个方法,都对应Broker或Destination的一个事件的“处理栈”。如果不调用父类方法,很可能会导致一些基础的、或关键的代码没有执行到,进而出现异常。因此,如果不是非常确定“执行到这里时必须中断当前事件”,否则一定要调用super相应方法。

方案分析

上面的代码是示例用,还可以进一步完善。但是这个方案是可以满足需求的。
不过,这个方案存在一项风险:当我们删除一个Destination时,其中所有未消费的消息也会随之被删除,即使这些消息已经做了持久化。如果有某个业务系统长时间出现故障、无法连上ActiveMQ,而ActiveMQ在此期间删除了它监听的Destination及其中消息……这个风险概率虽然小,但是影响太大。慎重起见,放弃方案二。

方案三:调整报警脚本

方案三属于运维的范畴。如JIRA中所讨论的,这个问题真正的“痛点”,并不是废弃队列,而是非紧急情况却在半夜报警。因此,由运维同事修改一下脚本,调整“没有消费者”这种问题的监控报警时间就可以了。

小结

最后选定的是方案三。方案一不能满足需求;方案二的风险较大。方案三直击痛点,干脆利落。
这件事也启示我们:做事情之前先想清楚目标,谋定而后动。