SymmetricDS 2.2.5 undeploy时symmetricScheduler job线程杀不掉分析

简介: SymmetricDS的同步机制是定时周期性同步,我们项目根据业务需要,只需要客户在UI激活一次replication时候才开始同步,所以这里我们的设计是每次激活时,用symmetric自己的deploy/undeploy机制让其在启动时候自动跑replication任务。

SymmetricDS的同步机制是定时周期性同步,我们项目根据业务需要,只需要客户在UI激活一次replication时候才开始同步,所以这里我们的设计是每次激活时,用symmetric自己的deploy/undeploy机制让其在启动时候自动跑replication任务。

 

最近遇到一个问题, 在使用中,发现当数据同步时间间隔设在1小时以内时,运行一天以上会发现大量的SymmetricSchedule线程积压在JVM中没有释放,怀疑是Symmetric 2.2.5中在undeploy时释放线程有问题。

为了解决这个问题,先看下SymmetricDS中其需要的定时任务都定义在哪里。

 

在Symmetric engine中,整个symmetricDS的入口类在AbstractSymmetricEngine,在这个类的启动方法如下:

 

    public synchronized boolean start() {

        setup();
        
        if (isConfigured()) {
                  ...
                    getTriggerRouterService().syncTriggers();
                    heartbeat(false);
                    jobManager.startJobs();
                   ...
                    started = true;
                } finally {
                    starting = false;
                }
                
                return true;
            } else {
                return false;
            }
        } else {
           ...
        }
    }
 

 

可以看到其内置的job是由jobManager启动的,再看其实现类JobManager代码片段:

 

  private List<IJob> jobs;  

  public synchronized void startJobs() {
        for (IJob job : jobs) {
            if (job.isAutoStartConfigured()) {
                job.start();
            } else {
                log.info("JobNoAutoStart", job.getName());
            }
        }
    }
 可以看到是直接调用集合jobs中的job实例,该实例在SymmetricDS 2 中是由spring注入的,其spring xml配置如下:

 

 

    <bean id="jobManager" class="org.jumpmind.symmetric.job.JobManager">
        <property name="jobs">
            <list>
                <ref bean="job.routing" />
                <ref bean="job.push" />
                <ref bean="job.pull" />
                <ref bean="job.purge.outgoing" />
                <ref bean="job.purge.incoming" />
                <ref bean="job.purge.datagaps" />
                <ref bean="job.stat.flush" />
                <ref bean="job.synctriggers" />
                <ref bean="job.heartbeat" />
                <ref bean="job.watchdog" />
            </list>
        </property>
        <property name="taskScheduler" ref="symmetricScheduler" />
    </bean>
    <bean id="job.routing" parent="job.abstract" class="org.jumpmind.symmetric.job.RouterJob">
        <property name="routingService" ref="routingService" />
        <property name="autoStartParameterName" value="start.route.job" />
    </bean>
 以上就是其内置的job,支持周期性运行和定时运行,以便进行数据库同步。这些job就是在undeploy时候没有杀掉的线程,由于我们关注的是undeploy时这些job的卸载情况,回到AbstractSymmetricEngine:

 

 

private ThreadPoolTaskScheduler taskScheduler;    
public synchronized void stop() {
        ...
        jobManager.stopJobs();
        getRouterService().stop();
        started = false;
        starting = false;
    }
    public synchronized void destroy () {
        stopJobs();
    }
 回到JobManager:

 

 

    public synchronized void stopJobs() {
        for (IJob job : jobs) {
            job.stop();
        }      
    }
 

 

jobManager直接遍历每个job并调用其stop方法。

所有job都有个超类AbstractJob,这个类的stop方法如下:

 

    public boolean stop() {
        boolean success = false;
        if (this.scheduledJob != null) {
            success = this.scheduledJob.cancel(true);
            this.scheduledJob = null;
            if (success) {
                log.info("JobCancelled", jobName);
                started = false;                
            } else {
                log.warn("JobFailedToCancel", jobName);
            }
        }
        return success;
    }
 这里只是调用了线程的cancel方法,在我们实际使用中,只是cancel并没有终结掉这些线程,这是我们JVM中的thread dump:

 

 

"symmetricScheduler-14" prio=6 tid=0x49d7f800 nid=0x13b4 waiting on condition [0x4e2ef000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x1d6873b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)

 

找到这里,我们知道这是其释放线程的一个bug,所以我直接去找了一下symmetricDS的bug track里有没有解决,发现这是2.2.*中的一个bug,在2.4.0以后的版本中已经fix,其JIRA地址在:

Tomcat shutdown hangs when SymmetricDS is deployed as a war 写道
http://jira.codehaus.org/browse/SYMMETRICDS-466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel

 

这个fix主要是在JobManager中加入了调用spring的线程池的shutdow方法:

    public synchronized void destroy () {
        stopJobs();
        if (taskScheduler != null) {
            taskScheduler.shutdown();
        }
    }

 

至此,在项目中使用symmetricDS新的代码,undeploy后线程都被杀掉,重新deploy后没有多余的job线程运行。

 

我们的启动方式:

没有在配置文件中配SymmetricDS需要的.cron项,直接让其按照线程的scheduleWithFixedDelay方式运行。该任务模式的配置在symmetric.properties文件中,在symmetric源码(AbstractJob)中可以清晰的看到如何获取该配置文件的值:

this.cronExpression = parameterService.getString(jobName + ".cron", null);

 之后在该类的启动线程方法中会有判断:

    public void start() {
        if (this.scheduledJob == null) {
            log.info("JobStarting", jobName);
            if (!StringUtils.isBlank(cronExpression)) {
                this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression));
                started = true;
            } else {

                int startDelay = randomTimeSlot.getRandomValueSeededByExternalId();
                if (this.timeBetweenRunsInMs > 0) {
                    this.scheduledJob = taskScheduler.scheduleWithFixedDelay(this,
                            new Date(System.currentTimeMillis() + startDelay),
                            this.timeBetweenRunsInMs);
                    started = true;
                } else {
                    log.error("JobFailedToSchedule", jobName);
                }
            }
        }
    }

 

 

下面是正常后的JVM 线程截图:

 

 
目录
相关文章
|
1月前
|
Linux
一个进程最多可以创建多少个线程基本分析
一个进程最多可以创建多少个线程基本分析
38 1
|
3月前
|
监控 Linux 编译器
多线程死锁检测的分析与实现(linux c)-有向图的应用
在日常的软件开发中,多线程是不可避免的,使用多线程中的一大问题就是线程对锁的不合理使用造成的死锁,死锁一旦发生,将导致多线程程序响应时间长,吞吐量下降甚至宕机崩溃,那么如何检测出一个多线程程序中是否存在死锁呢?在提出解决方案之前,先对死锁产生的原因以及产生的现象做一个分析。最后在用有向环来检测多线程中是否存在死锁的问题。
56 0
|
21天前
|
存储 算法 Linux
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
24 0
|
3月前
|
运维 监控 Java
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
学习JVM需要一定的编程经验和计算机基础知识,适用于从事Java开发、系统架构设计、性能优化、研究学习等领域的专业人士和技术爱好者。
54 5
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
|
7月前
|
负载均衡 Dubbo Java
dubbo源码v2.7分析:结构、container入口及线程模型
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架,提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。
65 0
|
8月前
|
数据采集 缓存 数据挖掘
GATK4标准分析流程 丨如何选择合适的线程和内存大小?数据预处理方法与注意事项
GATK4标准分析流程 丨如何选择合适的线程和内存大小?数据预处理方法与注意事项
|
10月前
|
缓存 安全 Java
JAVA Thread Dump分析线程竞争
JAVA Thread Dump分析线程竞争
57 0
|
10月前
|
Java 调度 C++
JAVA 分析线程池中的keepAliveTime参数具体实现
JAVA 分析线程池中的keepAliveTime参数具体实现
200 0