线程池运用实例——一次错误的多线程程序设计以及修复过程

简介:

写在前面的话 

写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。

1 背景

有这么一个需求,我们的系统(后面简称:A系统)需要在后台执行一个报表导出任务,在这个任务的执行过程中需要通过CORBA调用其他系统(后面简称:B系统)的一个(也有可能是多个)接口去查询报表,待结果返回后,将这些结果写入Excel。这个需求是不是很简单?套用网上一些FutureTask或者线程池的例子一两小时就能搞定这个需求。当时我也是这样认为的,可谁想,这是一个巨大的坑….

2 初始设计

用过CORBA的同学会知道,如同数据库连接一样,CORBA的连接数也是是有限的,如果一个接口调用的时间过长,就会长时间占用CORBA有限的连接数,当这种长时间的同步调用过多时就会造成整个系统CORBA调用的阻塞,进而造成系统停止响应。由于查询操作很耗时,为了避免这种情况的发生,这个接口被设计成了一个异步接口。任务的执行流程就会是这样:任务开始执行,接着调用这个接口并且通过CORBA向B系统订阅一个事件,然后任务进入等待状态,当B系统执行完成后,会向A系统发送一个事件告知执行的结果,任务收到事件后重新开始执行直到结束,如图:

既然说到了事件,那么很自然而然的就想到了使用回调的方式去响应事件,并且为了避免事件超时(也就是长时间没有接收到事件)导致任务长时间等待,我还使用了一个定时的任务去检查任务的状态。所以我的程序看起来就像这样:

IEventFuture.java


1 public interface IEventFuture {
2     void onEventReceived(Event event);
3 }

ExportRptTask.java


01 public class ExportRptTask implements Callable<Void>, IEventFuture {
02     private static final int INITIALIZED = 0;
03     private static final int RUNNING = 1;
04     private static final int COMPLETED = 2;
05     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
06     private Date lastUpdate = new Date();
07     private volatile int state = INITIALIZED;
08  
09     private Timer timer = new Timer();
10     private SystemBSer systemBSer = new SystemBSer();
11  
12     private int eventId = -1;
13  
14     @Override
15     public Void call() throws Exception {
16         this.state = RUNNING;
17         try {
18             systemBSer.doQuery();
19             subscribeEvent();
20             startTaskTimeoutMonitorTask();
21             Future future = createEventFuture();
22             future.get();
23         } catch (Throwable t) {
24             onTaskError(t);
25         } finally {
26             EventManager.unsubscribe(this.eventId);
27             timer.cancel();
28         }
29         return null;
30     }
31  
32     @Override
33     public void onEventReceived(Event event) {
34         this.lastUpdate = new Date();
35 // start to write excel
36 // .....
37 // end to write excel
38         this.state = COMPLETED;
39     }
40  
41     private void subscribeEvent() {
42         this.eventId = EventManager.subscribe(this);
43     }
44  
45     private Future createEventFuture() {
46         FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {
47  
48             @Override
49             public Void call() throws Exception {
50                 while (state != COMPLETED) {
51  
52                 }
53                 return null;
54             }
55         });
56  
57         new Thread(listenFuture).start();
58         return listenFuture;
59     }
60  
61     private void startTaskTimeoutMonitorTask() {
62         timer.scheduleAtFixedRate(new TimerTask() {
63             @Override
64             public void run() {
65  
66                 if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
67                     onTaskTimeout();
68                 }
69             }
70         }, 0, 15 * 60 * 1000);
71     }
72  
73     private void onTaskTimeout() {
74         // do something on task timeout.
75         //   ....
76         // end
77  
78         // set task to completed to end task.
79         this.state = COMPLETED;
80     }
81  
82     private void onTaskError(Throwable t) {
83 // do something to handle error.
84     }
85 }

3 升级改进

由于做这个需求的关系,我开始阅读一些关于JAVA多线程编程的一下教程,在阅读到关于闭锁的内容时,我突然灵光一现,这玩意不正好可以代替我那个丑陋的使用循环来让任务进入等待状态的实现么?然后我的程序就变成了这样:

ExportRptTask.java


01 public class ExportRptTask implements Callable<Void>, IEventFuture {
02     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
03     private Date lastUpdate = new Date();
04  
05     private CountDownLatch endGate = new CountDownLatch(1);
06     private Timer timer = new Timer();
07     private SystemBSer systemBSer = new SystemBSer();
08  
09     private int eventId = -1;
10  
11     @Override
12     public Void call() throws Exception {
13         try {
14             systemBSer.doQuery();
15             subscribeEvent();
16             endGate.await();
17             startTaskTimeoutMonitorTask();
18         } catch (Throwable t) {
19             onTaskError(t);
20         } finally {
21             EventManager.unsubscribe(this.eventId);
22             timer.cancel();
23         }
24         return null;
25     }
26  
27     @Override
28     public void onEventReceived(Event event) {
29         this.lastUpdate = new Date();
30 // start to write excel
31 // .....
32 // end to write excel
33         this.endGate.countDown();
34     }
35  
36     private void subscribeEvent() {
37         this.eventId = EventManager.subscribe(this);
38     }
39  
40     private void startTaskTimeoutMonitorTask() {
41         timer.scheduleAtFixedRate(new TimerTask() {
42             @Override
43             public void run() {
44  
45                 if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
46                     onTaskTimeout();
47                 }
48             }
49         }, 0, 15 * 60 * 1000);
50     }
51  
52     private void onTaskTimeout() {
53 // do something on task timeout.
54 //   ....
55 // end
56  
57 // set task to completed to end task.
58         this.endGate.countDown();
59     }
60  
61     private void onTaskError(Throwable t) {
62 // do something to handle error.
63     }
64 }

4 问题浮现

正在我为我使用高大上的闭锁代替循环沾沾自喜的时候,测试大爷告诉我,任务经常莫名其妙的失败,并且日志中没有任何异常。开始,这让我觉得很不可思议,因为我已经在call()方法处处理了所有的异常,任务失败时至少也应该有个日志啥的吧。这个问题一直困扰着我,直到有一天分析日志我突然发现任务执行的工作线程(也就是call()方法所在的线程)和接收到事件后的回调并不是同一个线程。这就意味着在查询到报表结果后,所有写Excel,分发结果等等的操作都是在事件回调的线程中执行的,那么一旦这里发生异常原来call()中的catch块自然无法捕获,然后异常就被莫名其妙的吞掉了。好吧,我承认我之前对线程池也就了解点皮毛,对多线程也仅仅是有个概念,想当然的认为在线程池中可以Hold住任务的一切,包括响应这个任务在执行过程中创建的其他线程运行时发生的异常。而且更严重的是按照原来的实现,只有当整个任务执行完成(包括写完Excel)后,才会释放那个闭锁,所以一旦事件回调发生异常,那么整个任务都无法终止。在线程池中发生一个任务永远无法终止的后果,你懂的。

5 重新设计

痛定思痛,我决定重新梳理这个任务的流程。这个需求的难点就是在如何监听并响应B系统给我们发送的事件,实际上,这是一个很经典的生产者–消费者问题,而阻塞队列正好是解决这类问题的利器。重新设计的事件响应流程就变成:当B系统发送事件的时候,事件回调线程会往阻塞队列里面填充一个事件。在另一方面,任务调用完B系统的查询接口后,就开始从阻塞队列中取事件,当事件队列为空的时候,取事件的线程(也就是线程池执行任务的工作线程)会被阻塞。并且,阻塞队列的取操作可以设置超时时间,所以当取到的事件对象为空时,就意味着事件超时了,这样就省去了使用定时任务定时检查任务状态的工作。重新设计的程序是这样的:

EventProxy.java


01 public class EventProxy implements IEventFuture {
02     private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
03     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
04  
05     @Override
06     public void onEventReceived(Event event) {
07         eventQueue.offer(event);
08     }
09  
10     public Event getEvent() throws InterruptedException {
11         return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
12     }
13 }

ExportRptTask.java


01 public class ExportRptTask3 implements Callable<Void> {
02  
03     private SystemBSer systemBSer = new SystemBSer();
04     private EventProxy eventProxy = new EventProxy();
05  
06     private int eventId = -1;
07  
08     @Override
09     public Void call() throws Exception {
10         try {
11             systemBSer.doQuery();
12             subscribeEvent();
13  
14             Event event = eventProxy.getEvent();
15             if (event != null) {
16                 processEvent(event);
17             } else {
18                 onTaskTimeout();
19             }
20         } catch (Throwable t) {
21             onTaskError(t);
22         } finally {
23             EventManager.unsubscribe(this.eventId);
24         }
25         return null;
26     }
27  
28     private void subscribeEvent() {
29         this.eventId = EventManager.subscribe(eventProxy);
30     }
31  
32     private void processEvent(Event event) {
33 // do something on receive event.
34     }
35  
36     private void onTaskTimeout() {
37 // do something on task timeout.
38 //   ....
39 // end
40     }
41  
42     private void onTaskError(Throwable t) {
43 // do something to handle error.
44     }
45 }

6 总结

相信各位并发编程的大牛们能在一瞬间就可以把我的程序(包括改进后的)批得体无完肤,不过我还是想分享下我在这个过程中的收获。

  • 在动手写程序前,请先理解你的需求,特别是要注意用已有的模型去识别问题,在本例中,我就是没有识别响应事件的流程其实是个生产者–消费者问题导致了后面的错误
  • 请充分的了解你需要使用的技术和工具。比如,使用线程池你就要了解线程池的工作原理,这样你才能正确的使用这些技术。做技术切忌想当然。
  • 在使用线程池时,重要的操作尽量放在任务的主线程中执行(也就是call()/run()方法所在的线程),否则线程池本身难以对任务进行控制。
  • 如果一定要在任务中再创建新的线程,请确保任务主线程是任务最后退出的线程。切忌不要使用外部线程直接调用任务类的方法,在本例中我就犯了这样的错误。
目录
相关文章
|
7月前
|
Java
多线程学习(三)多线程开发带来的问题与解决方法
多线程学习(三)多线程开发带来的问题与解决方法
74 1
|
7月前
|
Java
多线程开发带来的问题与解决方法
多线程开发带来的问题与解决方法
53 0
|
3月前
|
缓存 Java
Java线程问题:什么是可见问题?如何解决?
Java线程问题:什么是可见问题?如何解决?
22 1
|
4月前
在程序运行过程中,线程的状态是什么?进来看看就通透了
在程序运行过程中,线程的状态是什么?进来看看就通透了
32 0
如何处理JDK线程池内线程执行异常?讲得这么通俗,别还搞不懂
本篇 《如何处理 JDK 线程池内线程执行异常》 这篇文章适合哪些小伙伴阅读呢? 适合工作中使用线程池却不知异常的处理流程,以及不知如何正确处理抛出的异常
|
Java
多线程相关面试题:并行和并发的区别、线程和进程、线程的创建方式、运行状态
多线程相关面试题:并行和并发的区别、线程和进程、线程的创建方式、运行状态
106 0
java线程的三种创建方式详细分析(全)
目录前言1. 继承Thread类2. 实现Runnable 接口3. 通过Callable和Future创建线程4. 总结 前言 关于线程这部分内容讲的比较多,可看我之前的文章 【操作系统】线程与进程的深入剖析(全) 【操作系统】守护线程和守护进程的区别 对于线程Thread的分析 也可看我之前的文章 java之Thread类详细分析(全) java之Thread类实战模板(全) 多线程中run()和start()的异同详细分析(全) 对于创建方式的汇总以及方式 可看我这篇文章的总结对比 1. 继承
java线程的三种创建方式详细分析(全)
【多线程】面试官:如何利用线程工具,防止多线程同时操作一个资源?
通过前面的学习,知道了线程的利与弊,正确的使用多线程,会尽最大的可能去压榨我们系统的资源,从而提高效率,但是如果不合理使用线程,可能会造成副作用,给系统带来更大的压力,进一步的思考,如何才能防止多线程操作一个资源?
|
安全 测试技术
操作系统实验之多线程操作之读者优先与写者优先第二版
操作系统实验之多线程操作之读者优先与写者优先第二版
操作系统实验之多线程操作之读者优先与写者优先第二版
C++多线程 并行与并发 了解进程和线程 浅显的进行传参,调用
C++多线程 并行与并发 了解进程和线程 浅显的进行传参,调用
C++多线程 并行与并发 了解进程和线程 浅显的进行传参,调用