Java多线程编程简明教程(2) - ForkJoin模式

简介: Future实现了从单任务到多任务的转变,而Fork-Join模式是一种充分利用多核的模式。

Fork-Join模式

说起Fork-Join模式,我们不免联想起了Map-Reduce.它们的原理都是分治法,就是将一个大问题划分成若干个小问题,如果这些小问题之间互相不影响的话,就可以并发去执行. 最后,统一将各小问题的结果汇总起来,就是这个大问题的结果.
这个任务最适合处理像一棵树一样的问题.

ForkJoinPool

Fork-Join模式不再是只管一个后台作务,而是有多个任务并发执行. 这时我们前面学到的简单的线程池执行器的功能就显得不足了.这时候JDK 7开始为我们提供了ForkJoinPool.
ForkJoinPool不但自动计算开多大的线程池合适,而且提供了称为工作窃取算法的算法来管理这些任务. 如果有的线程空闲, ForkJoinPool会从其它线程的队列尾中窃取一个任务给空闲线程来运行.而正常的线程是从任务队列头中取任务,二者不会有冲突.

RecusiveTask

如同FutureTask一样,Fork-Join模式也有自己的Task类ForkJoinTask. 不过一般我们都是从ForkJoinTask的子类RecursiveTask来继承. 通过重载RecursiveTask类的compute方法,来实现Fork-Join的逻辑.
在compute方法里, 要实现两件事, 顾名思义, Fork-Join就是要先fork出RecursiveTask对象的子任务,然后将它们join在一起.

Fork-Join模式10分钟速成教程

我们先写个copy二叉树结构的简单任务学习一下如何利用Fork-Join框架来实现功能.

先实现一个最简单的二叉树节点,带左右孩子,一个字符串吧:

public class BinaryTree {
    public static class Node{
        public Node leftChild;
        public Node rightChild;
        public String content;
        public Node(String ct){
            content = ct;
        }
    }

然后实现一个RecursiveTask的子类,重载它的compute方法.

    public static class NodeCopyTask extends RecursiveTask<Node>{
        Node mNode;
        public NodeCopyTask(Node node){
            mNode = node;
        }
        @Override
        protected Node compute() {
            if(mNode==null)
                return null;

下面我们开始实现分叉, 对于左右子树分别fork出一个子任务. 这两个子任务又会分叉出它的的子任务,直至结束.

            NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
            taskLeft.fork();
            NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
            taskRight.fork();

fork之后, 任务就在后台开始运行了. 这时候我们开始构造我们的左右子树的父节点:

            Node node = new Node(mNode.content);

实际问题中一般不会这么简单.主线任务完成了之后,就是等待子任务交活儿,将它们组装在一起:

            node.leftChild = taskLeft.join();
            node.rightChild = taskRight.join();
            return node;
        }
    }

核心功能实现完了,下面我们写个主函数让它运行起来吧. 先构造一个被复制的对象.

    public static void main(String[] args){
        Node node = new Node("Hello,Fork-Join");
        node.leftChild = new Node("Left");
        node.rightChild = new Node("Right");

下面我们前面介绍的主角之一 - ForkJoinPool粉墨登场. 没什么复杂的设置,直接new一个就好:

        ForkJoinPool forkJoinPool = new ForkJoinPool();

ForkJoinPool有了之后, 再创建一个我们的RecursiveTask的对象, 然后调用ForkJoinPool的submit方法将其提交, 这又是一个Future模式了. 最后我们通过这个FutureTask的get方法获取结果就一切OK了.

        NodeCopyTask task = new NodeCopyTask(node);
        Future<Node> future = forkJoinPool.submit(task);
        try {
            Node node2 = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

整理一下上面的步骤:

  1. 实现一个RecursiveTask的子类,重载compute方法实现fork-join逻辑
  2. 合理划分任务,调用递归的RecursiveTask子类,fork出每个子任务
  3. 通过join方法获取子任务的值,并将它们组合到一起
  4. 构造ForkJoinPool线程池
  5. 创建第一步的子类的对象,通过Future模式,提交到ForkJoinPool线程中运行
  6. 获取Future的值,即可得到Fork-Join的结果.

总结一下,把刚才拆散的代码整合在一起:

public class BinaryTree {
    public static class Node{
        public Node leftChild;
        public Node rightChild;
        public String content;
        public Node(String ct){
            content = ct;
        }
    }

    public static class NodeCopyTask extends RecursiveTask<Node>{
        Node mNode;
        public NodeCopyTask(Node node){
            mNode = node;
        }
        @Override
        protected Node compute() {
            if(mNode==null)
                return null;

            NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
            taskLeft.fork();
            NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
            taskRight.fork();

            Node node = new Node(mNode.content);
            node.leftChild = taskLeft.join();
            node.rightChild = taskRight.join();
            return node;
        }
    }

    public static void main(String[] args){
        //TODO: construct a real tree
        Node node = new Node("Hello,Fork-Join");
        node.leftChild = new Node("Left");
        node.rightChild = new Node("Right");

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        NodeCopyTask task = new NodeCopyTask(node);
        Future<Node> future = forkJoinPool.submit(task);
        try {
            Node nodeNew = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

注意事项

  1. 现在这个阶段,暂时先不要共享内存,通过复制成不变的对象传递给子任务.返回值也创建新对象,当然可以使用对象池等技术.
  2. 暂时不要使用跨任务的容器,我们还没有经过相关的训练,时机还不成熟.
  3. 暂时不要使用其他的同步机制,我们的知识储备暂时还不够.
  4. 要注意任务中的异常会接收不到,一定在任务中处理好可能出现的异常. 否则发生了异常,在主任务中却收不到,会感到很奇怪.
  5. 注意I/O操作,建议目前阶段在Fork-Join之前将I/O操作提前做好.

尽管有一些限制,但是Fork-Join框架还是给我们带来了很大的便利. 按照Fork-Join设计好的代码,在将来计算核数增加时,会自动给我们的代码获得性能提高.

不变模式

在结束这个快餐教程之前,我们得再次强调一下内存共享的风险. 请初学的同学们一定要重视起来.目前我们还没有学习Java对象模型和容器的安全用法, 所以目前阶段最安全的就是不共享任何状态.
只读的对象是不会引起线程安全问题的.我们所有的跨任务的数据传递,暂时都只传递不变的对象.
这样的限制可能会带来一些不便和一些性能损失.但是,它是线程安全的,对于开发人员是种投入小见效快的好事情. 如果暂时还不能满意你的需求,我们会继续学习,从此开始,没有快餐式的速成教程了,我们要经过一段非常扎实的训练.

Android的特别注意事项

请大家注意,Java中的Fork-Join并没有办法处理Android的UI线程等问题, 如果需要运行在UI线程, 区分主线程和工作线程等, 还请参考上节我们分析AsyncTask中的做法, 该使用Handler的还是要用Handler. 后面我们还会详情说细节.

目录
相关文章
|
9天前
|
IDE Oracle Java
java基础教程(1)-Java概述和相关名词解释
【4月更文挑战第1天】Java是1995年Sun Microsystems发布的高级编程语言,以其跨平台特性著名。它介于编译型和解释型语言之间,通过JVM实现“一次编写,到处运行”。Java有SE、EE和ME三个版本,分别针对标准、企业及嵌入式应用。JVM是Java虚拟机,确保代码在不同平台无需重编译。JRE是运行环境,而JDK包含开发工具。要安装Java开发环境,可从Oracle官网下载JDK,设置JAVA_HOME环境变量并添加到PATH。
|
9天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
9天前
|
算法 Java 开发者
Java中的多线程编程:概念、实现与性能优化
【4月更文挑战第9天】在Java编程中,多线程是一种强大的工具,它允许开发者创建并发执行的程序,提高系统的响应性和吞吐量。本文将深入探讨Java多线程的核心概念,包括线程的生命周期、线程同步机制以及线程池的使用。接着,我们将展示如何通过继承Thread类和实现Runnable接口来创建线程,并讨论各自的优缺点。此外,文章还将介绍高级主题,如死锁的预防、避免和检测,以及如何使用并发集合和原子变量来提高多线程程序的性能和安全性。最后,我们将提供一些实用的性能优化技巧,帮助开发者编写出更高效、更稳定的多线程应用程序。
|
7天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
1天前
|
安全 Java
java多线程(一)(火车售票)
java多线程(一)(火车售票)
|
1天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
2天前
|
存储 安全 Java
Java中的容器,线程安全和线程不安全
Java中的容器,线程安全和线程不安全
9 1
|
3天前
|
存储 Java
Java基础教程(7)-Java中的面向对象和类
【4月更文挑战第7天】Java是面向对象编程(OOP)语言,强调将事务抽象成对象。面向对象与面向过程的区别在于,前者通过对象间的交互解决问题,后者按步骤顺序执行。类是对象的模板,对象是类的实例。创建类使用`class`关键字,对象通过`new`运算符动态分配内存。方法包括构造函数和一般方法,构造函数用于对象初始化,一般方法处理逻辑。方法可以有0个或多个参数,可变参数用`类型...`定义。`this`关键字用于访问当前对象的属性。
|
3天前
|
缓存 监控 Java
Java并发编程:线程池与任务调度
【4月更文挑战第16天】Java并发编程中,线程池和任务调度是核心概念,能提升系统性能和响应速度。线程池通过重用线程减少创建销毁开销,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。任务调度允许立即或延迟执行任务,具有灵活性。最佳实践包括合理配置线程池大小、避免过度使用线程、及时关闭线程池和处理异常。掌握这些能有效管理并发任务,避免性能瓶颈。

热门文章

最新文章