颠覆大数据分析之Spark弹性分布式数据集

简介:

颠覆大数据分析之Spark弹性数据集

译者:黄经业    购书

Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解。将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下。你会发现在Hadoop MR中每次迭代都会涉及HDFS的读写,而在Spark中则要简单得多。它仅需从HDFS到Spark中的分布式共享对象空间的一次读入——从HDFS文件中创建RDD。RDD可以重用,在机器学习的各个迭代中它都会驻留在内存里,这样能显著地提升性能。当检查结束条件发现迭代结束的时候,会将RDD持久化,把数据写回到HDFS中。后续章节会对Spark的内部结构进行详细介绍——包括它的设计,RDD,以及世系等等。

图2.3  Spark中进行迭代式计算的数据共享

Spark的弹性分布式数据集

RDD这个概念跟我们讨论到的Spark的动机有关——就是能让用户操作分布式系统上的Scala集合。Spark中的这个重要的集合就是RDD。RDD可以通过在其它RDD或者稳态存储中的数据(比如说,HDFS中的文件)上执行确定性操作来进行创建。创建RDD的另一种方式就是将Scala集合并行化。RDD的创建也就是Spark中的转换操作。RDD上除了转换操作,还有其它的一些操作,比如说动作(action)。像map, filter以及join这些都是常见的转换操作。RDD有意思的一点在于它可以将自己的世系或者说创建它所需的转换序列,以及它上面的动作给存储起来。这意味着Spark程序只能拥有一个RDD引用——它知道自己的世系,包括它是如何创建的,上面执行过哪些操作。世系为RDD提供了容错性——即使它丢失了,只要世系本身被持久化或者复制了,就仍能重建整个RDD。RDD的持久化以及分块可以由程序员来指定。比如说,你可以基于记录的主键来进行分块。

在RDD上可以执行许多操作。包括count,collect以及save,它们分别可以用来统计元素总数,返回记录,以及保存到磁盘或者HDFS中。世系图中存储了RDD的转换以及动作。表2.1中列举了一系列的转换及动作。

表2.1

转换 描述
Map(function f1) 把RDD中的每个元素并行地传递给f1,并返回结果的RDD
Filter(function f2) 选取出那些传递给函数f2并返回true的RDD元素
flatMap(function f3) 和map类似,但f3返回的是一个序列,它能将单个输入映射成多个输出。
Union(RDD r1) 返回RDD r1和自身的并集
Sample(flag, p, seed) 返回RDD的百分之p的随机采样(使用种子seed)
动作 描述
groupByKey(noTasks) 只能在键值对数据上进行调用——返回的数据按值进行分组。并行任务的数量通过一个参数来指定(默认是8)
reduceByKey(function f4,noTasks) 对相同key元素上应用函数f4的结果进行聚合。第二个参数是并行的任务数
Join(RDD r2, noTasks) 将RDD r2和对象自身进行连接——计算出指定key的所有可能的组合
groupWith(RDD r3, noTasks) 将RDD r3与对象自身进行连接,并按key进行分组
sortByKey(flag) 根据标记值将RDD自身按升序或降序来进行排序
动作 描述
Reduce(function f5) 使用函数f5来对RDD的所有元素进行聚合
Collect() 将RDD的所有元素作为一个数组来返回
Count() 计算RDD的元素总数
take(n) 获取RDD的第n个元素
First() 等价于take(1)
saveAsTextFile(path) 将RDD持久化成HDFS或者其它Hadoop支持的文件系统中路径为path的一个文件
saveAsSequenceFile(path) 将RDD持久化为Hadoop的一个序列文件。只能在实现了Hadoop写接口或类似接口的键值对类型的RDD上进行调用。
动作 描述
foreach(function f6) 并行地在RDD的元素上运行函数f6

下面将通过一个例子来介绍下如何在Spark环境中进行RDD的编程。这里是一个呼叫数据记录(CDR)——基于影响力分析的应用程序——通过CDR来构建用户的关系图,并识别出影响力最大的K个用户。CDR结构包括id,调用方,接收方,计划类型,呼叫类型,持续时长,时间,日期。具体做法是从HDFS中获取CDR文件,接着创建出RDD对象并过滤记录,然后再在上面执行一些操作,比如说通过查询提取出特定的字段,或者执行诸如count的聚合操作。最终写出的Spark代码如下:

val spark = new SparkContext();

Call_record_lines = spark.textFile(“HDFS://….”);

Plan_a_users = call_record_lines.filter(_.

CONTAINS(“plana”)); // RDD上的过滤操作.

Plan_a_users.cache(); // 告诉Spark运行时,如果仍有空间,就将这个RDD缓存到内存里Plan_a_users.count();

%% 呼叫数据集处理中.

 

RDD可以表示成一张图,这样跟踪RDD在不同转换/动作间的世系变化会简单一些。RDD接口由五部分信息组成,详见表2.2。

表2.2  RDD接口

信息 HadoopRDD FilteredRDD JoinedRDD
分区类型 每个HDFS块一个分区 和父RDD一致 每个reduce任务一个
依赖类型 无依赖 和父RDD是一对一的依赖 在每一个父RDD上进行shuffle
基于父RDD来计算数据集的函数 读取对应块的数据 计算父RDD并进行过滤 读取洗牌后的数据并进行连接
位置元数据(preferredLocations) 从命名节点中读取HDFS块的位置信息 无(从父RDD中获取)
分区元数据(partitioningScheme) HashPartitioner

Spark的实现

Spark是由大概20000行Scala代码写就的,核心部分大概是14000行。Spark可以运行在Mesos, Nimbus或者YARN等集群管理器之上。它使用的是未经修改的Scala解释器。当触发RDD上的一个动作时,一个被称为有向无环图(DAG)调度器的Spark组件就会去检查RDD的世系图,同时会创建各阶段的DAG。每个阶段内都只会出现窄依赖,宽依赖所需的洗牌操作就是阶段的边界。调度器在DAG的不同阶段启动任务来计算出缺失的分区,以便重构整个RDD对象。它将各阶段的任务对象提交给任务调度器(Task Scheduler, TS)。任务对象是一个独立的实体,它由代码和转换以及所需的元数据组成。调度器还负责重新提交那些输出丢失了的阶段。任务调度器使用一个被称为延迟调度(Zaharia等 2010)的调度算法来将任务分配给各个节点。如果RDD中有指定了优先区域的话,任务会被传送给这些节点,否则会被分配到那些有分区在请求内存任务的节点上。对于宽依赖而言,中间记录会在那些包含父分区的节点上生成。这样会使得错误恢复变得简单,Hadoop MR中map输出的物化也是类似的。

Spark中的Worker组件会负责接收任务对象并在一个线程池中调用它们的run方法。它将异常或者错误报告给TaskSetManager(TSM)。TSM是任务调度器管理的一个实体——每个任务集都会对应一个TSM,用于跟踪任务的执行过程。TS是按先进先出的顺序来轮询TSM集的。通过插入不同的策略或者算法,这里仍有一定的优化空间。执行器会与其它的组件进行交互,比如说块管理器(BM),通信管理器(CM),Map输出跟踪器(MOT)。块管理器是节点用于缓存RDD并接收洗牌数据的组件。它也可以看作是每个worker中只写一次的K-V存储。块管理器和通信管理器进行通信以便获取到远端的块数据。通信管理器是一个异步网络库。MOT这个组件会负责跟踪每个map任务都在哪运行并把这些信息返回给归约器——Worker会缓存这个信息。当映射器的输出丢失了的话,会使用一个“分代ID”来将这个缓存置为无效。Spark中各组件的交互如图2.4中所示。

图2.4  Spark集群中的组件

RDD的存储可以通过下面这三种方式来完成:

  1. 作为Java虚拟机中反序列化的Java对象:由于对象就在JVM内存中,这样做的性能会更佳。
  2. 作为内存中序列化的Java对象:这么表示内存的使用率会更高,但却牺牲了访问速度。
  3. 存储在磁盘上:这样做性能最差,但是如果RDD太大以至于无法存放到内存中的话就只能这么做了。

一旦内存满了,Spark的内存管理会通过最近最少使用(LRU)策略来回收RDD。然而,属于同一个RDD的分区是无法剔除的——因为通常来说,一个程序可能会在一个大的RDD上进行计算,如果将同一个RDD中的分区剔除的话则会出现系统颠簸。

世系图拥有足够的信息来重建RDD的丢失分区。然而,考虑到效率的因素(重建整个RDD可能会需要很大的计算量),检查点仍是必需的——用户可以自主控制哪个RDD作为检查点。使用了宽依赖的RDD可以使用检查点,因为在这种情况下,计算丢失的分区会需要显著的通信及计算量。而对于只拥有窄依赖的RDD而言,检查点则不太适合。 

目录
相关文章
|
19天前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
1月前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
1月前
|
存储 消息中间件 大数据
Go语言在大数据处理中的实际应用与案例分析
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理中的实际应用,通过案例分析展示了Go语言在处理大数据时的优势和实践效果。文章首先介绍了大数据处理的挑战与需求,然后详细分析了Go语言在大数据处理中的适用性和核心技术,最后通过具体案例展示了Go语言在大数据处理中的实际应用。
|
1月前
|
数据采集 运维 数据挖掘
API电商接口大数据分析与数据挖掘 (商品详情店铺)
API接口、数据分析以及数据挖掘在商品详情和店铺相关的应用中,各自扮演着重要的角色。以下是关于它们各自的功能以及如何在商品详情和店铺分析中协同工作的简要说明。
|
2月前
|
API
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
106 0
|
12天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
13天前
|
机器学习/深度学习 人工智能 安全
Azure Databricks实战:在云上轻松进行大数据分析与AI开发
【4月更文挑战第8天】Databricks在大数据分析和AI开发中表现出色,简化流程并提高效率。文中列举了三个应用场景:数据湖分析、实时流处理和AI机器学习,并阐述了Databricks的一体化平台、云原生弹性及企业级安全优势。博主认为,Databricks提升了研发效能,无缝集成Azure生态,并具有持续创新潜力,是应对大数据挑战和加速AI创新的理想工具。
37 0
|
26天前
|
机器学习/深度学习 人工智能 数据可视化
基于Python的数据可视化技术在大数据分析中的应用
传统的大数据分析往往注重数据处理和计算,然而数据可视化作为一种重要的技术手段,在大数据分析中扮演着至关重要的角色。本文将介绍如何利用Python语言中丰富的数据可视化工具,结合大数据分析,实现更直观、高效的数据展示与分析。
|
1月前
|
存储 NoSQL 大数据
新型数据库技术在大数据分析中的应用与优势探究
随着大数据时代的到来,传统数据库技术已经无法满足海量数据处理的需求。本文将探讨新型数据库技术在大数据分析中的应用情况及其所带来的优势,为读者解析数据库领域的最新发展趋势。
|
1月前
|
存储 分布式计算 大数据
现代化数据库技术——面向大数据的分布式存储系统
传统的关系型数据库在面对大规模数据处理时遇到了诸多挑战,而面向大数据的分布式存储系统应运而生。本文将深入探讨现代化数据库技术中的分布式存储系统,包括其优势、工作原理以及在大数据领域的应用。