线程池运用实例——一次错误的多线程程序设计以及修复过程

简介:

写在前面的话 

写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。

1 背景

有这么一个需求,我们的系统(后面简称:A系统)需要在后台执行一个报表导出任务,在这个任务的执行过程中需要通过CORBA调用其他系统(后面简称:B系统)的一个(也有可能是多个)接口去查询报表,待结果返回后,将这些结果写入Excel。这个需求是不是很简单?套用网上一些FutureTask或者线程池的例子一两小时就能搞定这个需求。当时我也是这样认为的,可谁想,这是一个巨大的坑….

2 初始设计

用过CORBA的同学会知道,如同数据库连接一样,CORBA的连接数也是是有限的,如果一个接口调用的时间过长,就会长时间占用CORBA有限的连接数,当这种长时间的同步调用过多时就会造成整个系统CORBA调用的阻塞,进而造成系统停止响应。由于查询操作很耗时,为了避免这种情况的发生,这个接口被设计成了一个异步接口。任务的执行流程就会是这样:任务开始执行,接着调用这个接口并且通过CORBA向B系统订阅一个事件,然后任务进入等待状态,当B系统执行完成后,会向A系统发送一个事件告知执行的结果,任务收到事件后重新开始执行直到结束,如图:

既然说到了事件,那么很自然而然的就想到了使用回调的方式去响应事件,并且为了避免事件超时(也就是长时间没有接收到事件)导致任务长时间等待,我还使用了一个定时的任务去检查任务的状态。所以我的程序看起来就像这样:

IEventFuture.java

public interface IEventFuture {
    void onEventReceived(Event event);
}

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final int INITIALIZED = 0;
    private static final int RUNNING = 1;
    private static final int COMPLETED = 2;
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();
    private volatile int state = INITIALIZED;

    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        this.state = RUNNING;
        try {
            systemBSer.doQuery();
            subscribeEvent();
            startTaskTimeoutMonitorTask();
            Future future = createEventFuture();
            future.get();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.state = COMPLETED;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private Future createEventFuture() {
        FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                while (state != COMPLETED) {

                }
                return null;
            }
        });

        new Thread(listenFuture).start();
        return listenFuture;
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
        // do something on task timeout.
        //   ....
        // end

        // set task to completed to end task.
        this.state = COMPLETED;
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

3 升级改进

由于做这个需求的关系,我开始阅读一些关于JAVA多线程编程的一下教程,在阅读到关于闭锁的内容时,我突然灵光一现,这玩意不正好可以代替我那个丑陋的使用循环来让任务进入等待状态的实现么?然后我的程序就变成了这样:

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();

    private CountDownLatch endGate = new CountDownLatch(1);
    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();
            endGate.await();
            startTaskTimeoutMonitorTask();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.endGate.countDown();
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end

// set task to completed to end task.
        this.endGate.countDown();
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

4 问题浮现

正在我为我使用高大上的闭锁代替循环沾沾自喜的时候,测试大爷告诉我,任务经常莫名其妙的失败,并且日志中没有任何异常。开始,这让我觉得很不可思议,因为我已经在call()方法处处理了所有的异常,任务失败时至少也应该有个日志啥的吧。这个问题一直困扰着我,直到有一天分析日志我突然发现任务执行的工作线程(也就是call()方法所在的线程)和接收到事件后的回调并不是同一个线程。这就意味着在查询到报表结果后,所有写Excel,分发结果等等的操作都是在事件回调的线程中执行的,那么一旦这里发生异常原来call()中的catch块自然无法捕获,然后异常就被莫名其妙的吞掉了。好吧,我承认我之前对线程池也就了解点皮毛,对多线程也仅仅是有个概念,想当然的认为在线程池中可以Hold住任务的一切,包括响应这个任务在执行过程中创建的其他线程运行时发生的异常。而且更严重的是按照原来的实现,只有当整个任务执行完成(包括写完Excel)后,才会释放那个闭锁,所以一旦事件回调发生异常,那么整个任务都无法终止。在线程池中发生一个任务永远无法终止的后果,你懂的。

5 重新设计

痛定思痛,我决定重新梳理这个任务的流程。这个需求的难点就是在如何监听并响应B系统给我们发送的事件,实际上,这是一个很经典的生产者–消费者问题,而阻塞队列正好是解决这类问题的利器。重新设计的事件响应流程就变成:当B系统发送事件的时候,事件回调线程会往阻塞队列里面填充一个事件。在另一方面,任务调用完B系统的查询接口后,就开始从阻塞队列中取事件,当事件队列为空的时候,取事件的线程(也就是线程池执行任务的工作线程)会被阻塞。并且,阻塞队列的取操作可以设置超时时间,所以当取到的事件对象为空时,就意味着事件超时了,这样就省去了使用定时任务定时检查任务状态的工作。重新设计的程序是这样的:

EventProxy.java

public class EventProxy implements IEventFuture {
    private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;

    @Override
    public void onEventReceived(Event event) {
        eventQueue.offer(event);
    }

    public Event getEvent() throws InterruptedException {
        return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
    }
}

ExportRptTask.java

public class ExportRptTask3 implements Callable<Void> {

    private SystemBSer systemBSer = new SystemBSer();
    private EventProxy eventProxy = new EventProxy();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();

            Event event = eventProxy.getEvent();
            if (event != null) {
                processEvent(event);
            } else {
                onTaskTimeout();
            }
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
        }
        return null;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(eventProxy);
    }

    private void processEvent(Event event) {
// do something on receive event.
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

6 总结

相信各位并发编程的大牛们能在一瞬间就可以把我的程序(包括改进后的)批得体无完肤,不过我还是想分享下我在这个过程中的收获。

  • 在动手写程序前,请先理解你的需求,特别是要注意用已有的模型去识别问题,在本例中,我就是没有识别响应事件的流程其实是个生产者–消费者问题导致了后面的错误
  • 请充分的了解你需要使用的技术和工具。比如,使用线程池你就要了解线程池的工作原理,这样你才能正确的使用这些技术。做技术切忌想当然。
  • 在使用线程池时,重要的操作尽量放在任务的主线程中执行(也就是call()/run()方法所在的线程),否则线程池本身难以对任务进行控制。
  • 如果一定要在任务中再创建新的线程,请确保任务主线程是任务最后退出的线程。切忌不要使用外部线程直接调用任务类的方法,在本例中我就犯了这样的错误。
  • 文章转自 并发编程网-ifeve.com
目录
相关文章
|
11天前
|
Java 程序员 数据库
Java线程池让使用线程变得更加高效
使用一个线程需要经过创建、运行、销毁三大步骤,如果业务系统每个线程都要经历这个过程,那会带来过多不必要的资源消耗。线程池就是为了解决这个问题而生,需要时就从池中拿取,使用完毕就放回去,池化思想通过复用对象大大提高了系统的性能。线程池、数据库连接池、对象池等都采用了池化技术,下面我们就来学习下线程池的核心知识、面试重点~
51 5
Java线程池让使用线程变得更加高效
|
1天前
|
存储 安全 Java
【探索Linux】P.21(多线程 | 线程同步 | 条件变量 | 线程安全)
【探索Linux】P.21(多线程 | 线程同步 | 条件变量 | 线程安全)
5 0
|
1天前
|
算法 安全 Linux
【探索Linux】P.20(多线程 | 线程互斥 | 互斥锁 | 死锁 | 资源饥饿)
【探索Linux】P.20(多线程 | 线程互斥 | 互斥锁 | 死锁 | 资源饥饿)
4 0
|
1天前
|
存储 安全 Linux
【探索Linux】P.19(多线程 | 线程的概念 | 线程控制 | 分离线程)
【探索Linux】P.19(多线程 | 线程的概念 | 线程控制 | 分离线程)
4 0
|
1天前
|
消息中间件 监控 前端开发
面试官:核心线程数为0时,线程池如何执行?
线程池是 Java 中用于提升程序执行效率的主要手段,也是并发编程中的核心实现技术,并且它也被广泛的应用在日常项目的开发之中。那问题来了,如果把线程池中的核心线程数设置为 0 时,线程池是如何执行的? 要回答这个问题,我们首先要了解在正常情况下,线程池的执行流程,也就是说当有一个任务来了之后,线程池是如何运行的? ## 1.线程池的执行流程 正常情况下(核心线程数不为 0 的情况下)线程池的执行流程如下: 1. **判断核心线程数**:先判断当前工作线程数是否大于核心线程数,如果结果为 false,则新建线程并执行任务。 2. **判断任务队列**:如果大于核心线程数,则判断任务队列是否
面试官:核心线程数为0时,线程池如何执行?
|
9天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
9天前
|
缓存 安全 Java
多线程--深入探究多线程的重点,难点以及常考点线程安全问题
多线程--深入探究多线程的重点,难点以及常考点线程安全问题
|
9天前
|
数据采集 安全 Java
Python的多线程,守护线程,线程安全
Python的多线程,守护线程,线程安全
|
13天前
|
监控 Java 调度
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
|
13天前
|
设计模式 Java
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现