线程池运用实例——一次错误的多线程程序设计以及修复过程

  1. 云栖社区>
  2. 博客>
  3. 正文

线程池运用实例——一次错误的多线程程序设计以及修复过程

boxti 2017-05-02 15:44:00 浏览853

写在前面的话 

写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。

1 背景

有这么一个需求,我们的系统(后面简称:A系统)需要在后台执行一个报表导出任务,在这个任务的执行过程中需要通过CORBA调用其他系统(后面简称:B系统)的一个(也有可能是多个)接口去查询报表,待结果返回后,将这些结果写入Excel。这个需求是不是很简单?套用网上一些FutureTask或者线程池的例子一两小时就能搞定这个需求。当时我也是这样认为的,可谁想,这是一个巨大的坑….

2 初始设计

用过CORBA的同学会知道,如同数据库连接一样,CORBA的连接数也是是有限的,如果一个接口调用的时间过长,就会长时间占用CORBA有限的连接数,当这种长时间的同步调用过多时就会造成整个系统CORBA调用的阻塞,进而造成系统停止响应。由于查询操作很耗时,为了避免这种情况的发生,这个接口被设计成了一个异步接口。任务的执行流程就会是这样:任务开始执行,接着调用这个接口并且通过CORBA向B系统订阅一个事件,然后任务进入等待状态,当B系统执行完成后,会向A系统发送一个事件告知执行的结果,任务收到事件后重新开始执行直到结束,如图:

既然说到了事件,那么很自然而然的就想到了使用回调的方式去响应事件,并且为了避免事件超时(也就是长时间没有接收到事件)导致任务长时间等待,我还使用了一个定时的任务去检查任务的状态。所以我的程序看起来就像这样:

IEventFuture.java

public interface IEventFuture {
    void onEventReceived(Event event);
}

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final int INITIALIZED = 0;
    private static final int RUNNING = 1;
    private static final int COMPLETED = 2;
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();
    private volatile int state = INITIALIZED;

    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        this.state = RUNNING;
        try {
            systemBSer.doQuery();
            subscribeEvent();
            startTaskTimeoutMonitorTask();
            Future future = createEventFuture();
            future.get();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.state = COMPLETED;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private Future createEventFuture() {
        FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                while (state != COMPLETED) {

                }
                return null;
            }
        });

        new Thread(listenFuture).start();
        return listenFuture;
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
        // do something on task timeout.
        //   ....
        // end

        // set task to completed to end task.
        this.state = COMPLETED;
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

3 升级改进

由于做这个需求的关系,我开始阅读一些关于JAVA多线程编程的一下教程,在阅读到关于闭锁的内容时,我突然灵光一现,这玩意不正好可以代替我那个丑陋的使用循环来让任务进入等待状态的实现么?然后我的程序就变成了这样:

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();

    private CountDownLatch endGate = new CountDownLatch(1);
    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();
            endGate.await();
            startTaskTimeoutMonitorTask();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.endGate.countDown();
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end

// set task to completed to end task.
        this.endGate.countDown();
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

4 问题浮现

正在我为我使用高大上的闭锁代替循环沾沾自喜的时候,测试大爷告诉我,任务经常莫名其妙的失败,并且日志中没有任何异常。开始,这让我觉得很不可思议,因为我已经在call()方法处处理了所有的异常,任务失败时至少也应该有个日志啥的吧。这个问题一直困扰着我,直到有一天分析日志我突然发现任务执行的工作线程(也就是call()方法所在的线程)和接收到事件后的回调并不是同一个线程。这就意味着在查询到报表结果后,所有写Excel,分发结果等等的操作都是在事件回调的线程中执行的,那么一旦这里发生异常原来call()中的catch块自然无法捕获,然后异常就被莫名其妙的吞掉了。好吧,我承认我之前对线程池也就了解点皮毛,对多线程也仅仅是有个概念,想当然的认为在线程池中可以Hold住任务的一切,包括响应这个任务在执行过程中创建的其他线程运行时发生的异常。而且更严重的是按照原来的实现,只有当整个任务执行完成(包括写完Excel)后,才会释放那个闭锁,所以一旦事件回调发生异常,那么整个任务都无法终止。在线程池中发生一个任务永远无法终止的后果,你懂的。

5 重新设计

痛定思痛,我决定重新梳理这个任务的流程。这个需求的难点就是在如何监听并响应B系统给我们发送的事件,实际上,这是一个很经典的生产者–消费者问题,而阻塞队列正好是解决这类问题的利器。重新设计的事件响应流程就变成:当B系统发送事件的时候,事件回调线程会往阻塞队列里面填充一个事件。在另一方面,任务调用完B系统的查询接口后,就开始从阻塞队列中取事件,当事件队列为空的时候,取事件的线程(也就是线程池执行任务的工作线程)会被阻塞。并且,阻塞队列的取操作可以设置超时时间,所以当取到的事件对象为空时,就意味着事件超时了,这样就省去了使用定时任务定时检查任务状态的工作。重新设计的程序是这样的:

EventProxy.java

public class EventProxy implements IEventFuture {
    private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;

    @Override
    public void onEventReceived(Event event) {
        eventQueue.offer(event);
    }

    public Event getEvent() throws InterruptedException {
        return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
    }
}

ExportRptTask.java

public class ExportRptTask3 implements Callable<Void> {

    private SystemBSer systemBSer = new SystemBSer();
    private EventProxy eventProxy = new EventProxy();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();

            Event event = eventProxy.getEvent();
            if (event != null) {
                processEvent(event);
            } else {
                onTaskTimeout();
            }
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
        }
        return null;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(eventProxy);
    }

    private void processEvent(Event event) {
// do something on receive event.
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

6 总结

相信各位并发编程的大牛们能在一瞬间就可以把我的程序(包括改进后的)批得体无完肤,不过我还是想分享下我在这个过程中的收获。

  • 在动手写程序前,请先理解你的需求,特别是要注意用已有的模型去识别问题,在本例中,我就是没有识别响应事件的流程其实是个生产者–消费者问题导致了后面的错误
  • 请充分的了解你需要使用的技术和工具。比如,使用线程池你就要了解线程池的工作原理,这样你才能正确的使用这些技术。做技术切忌想当然。
  • 在使用线程池时,重要的操作尽量放在任务的主线程中执行(也就是call()/run()方法所在的线程),否则线程池本身难以对任务进行控制。
  • 如果一定要在任务中再创建新的线程,请确保任务主线程是任务最后退出的线程。切忌不要使用外部线程直接调用任务类的方法,在本例中我就犯了这样的错误。
  • 文章转自 并发编程网-ifeve.com