《Spark核心技术与高级应用》——3.1节使用Spark Shell编写程序

简介:

本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第3章,第3.1节使用Spark Shell编写程序,作者于俊 向海 代其锋 马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.1 使用Spark Shell编写程序
要学习Spark程序开发,建议首先通过spark-shell交互式学习,加深对Spark程序开发的理解。spark-shell提供了一种学习API的简单方式,以及一个能够交互式分析数据的强大工具,在Scala语言环境下(Scala运行于Java虚拟机,因此能有效使用现有的Java库)或Python语言环境下均可使用。
3.1.1 启动Spark Shell
在spark-shell中,已经创建了一个名为sc的SparkContext对象,如在4个CPU核上运行bin/spark-shell,命令如下:

./bin/spark-shell --master local[4]
如果指定Jar包路径,命令如下:
./bin/spark-shell --master local[4] --jars testcode.jar

其中,--master用来设置context将要连接并使用的资源主节点,master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一个local地址;使用--jars可以添加Jar包的路径,使用逗号分隔可以添加多个包。进一步说明,spark-shell的本质是在后台调用了spark-submit脚本来启动应用程序。
3.1.2 加载text文件
Spark创建sc之后,可以加载本地文件创建RDD,我们以加载Spark自带的本地文件README.md文件进行测试,返回一个MapPartitionsRDD文件。

scala>val textFile= sc.textFile("f?ile:///$SPARK_HOME/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

需要说明的是,加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和f?ile://)进行标识,从本地文件读取文件直接返回MapPartitionsRDD,而从HDFS读取的文件先转成HadoopRDD,然后隐式转换成MapPartitionsRDD。
上面所说的MapPartitionsRDD和HadoopRDD都是基于Spark的弹性分布式数据集(RDD)。
3.1.3 简单RDD操作
对于RDD,可以执行Transformation返回新RDD,也可以执行Action得到返回结果。下面从f?irst和count命令开始Action之旅,示例代码如下:

// 获取RDD文件textFile的第一项
scala>textFile.f?irst()
res0: String = # Apache Spark 
// 获取RDD文件textFile所有项的计数
scala>textFile.count()
res1: Long = 98
接下来通过Transformation操作,使用f?ilter命令返回一个新的RDD,即抽取文件全部条目的一个子集,返回一个新的FilteredRDD,示例代码如下:
// 抽取含有"Spark"的子集
scala>valtext Filter = textFile.f?ilter(line >line.contains("Spark"))
textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at f?ilter at <console>:23
我们可以链接多个Transformation和Action进行操作。示例代码如下:
scala>textFile.f?ilter(line =>line.contains("Spark")).count() 
res2: Long = 18

3.1.4 简单RDD操作应用
通过简单RDD操作进行组合,来实现找出文本中每行最多单词数,词频统计等。
1.找出文本中每行最多单词数
基于RDD的Transformation和Action可以用作更复杂的计算,假设想要找到文本中每行最多单词数,可以参考如下代码:

scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res3: Int = 14
在上面这段代码中,首先将textFile每一行文本中的句子使用split(" ")进行分词,并统计分词后的单词数。创建一个基于单词数的新RDD,然后针对该RDD执行Reduce操作使用(a, b) => if (a > b) a else b函数进行比较,返回最大值。
2.词频统计
从MapReduce开始,词频统计已经成为大数据处理最流行的入门程序,类似MapReduce,Spark也能很容易地实现MapReduce,示例程序如下:
// 结合f?latMap、map和reduceByKey来计算文件中每个单词的词频
scala>val wordCount= textFile.f?latMap(line =>line.split(" ")).map(word => (word,1)). reduceByKey((a, b) => a + b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = Shuff?ledRDD[7] at reduceByKey at <console>:23
// 使用collect聚合单词计数结果
scala>wordCount.collect()
res4: Array[(String, Int)] = Array((package,1), (this,1) ,...

这里,结合f?latMap、Map和reduceByKey来计算文件中每个单词的词频,并返回(string、int)类型的键值对Shuff?ledRDD(由于reduceByKey执行时需要进行Shuff?le操作,返回的是一个Shuff?le形式的RDD,Shuff?ledRDD),最后使用Collect聚合单词计数结果。
如果想让Scala的函数文本更简洁,可以使用占位符“_”,占位符可以看作表达式里需要被“填入”的“空白”,这个“空白”在每次函数被调用时,由函数的参数填入。
当每个参数在函数文本中最多出现一次的情况下,可以使用_+_扩展成带两个参数的函数文本;多个下划线指代多个参数,而不是单个参数的重复使用。第一个下划线代表第一个参数,第二个下划线代表第二个参数,依此类推。
下面通过占位符对词频统计进行优化。

scala>val wordCount=textFile.f?latMap(_.split(" ")).map(_,1)
.reduceByKey(_+_)

Spark默认是不进行排序的,如果以排序的方法进行输出,需要进行key和value互换,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。这样就完成了数据统计和排序,具体代码如下:

scala>val wordCount= inFile.f?latMap(_.split(" ")).map(_, 1).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))

上面的代码通过第一个x=>(x._2,x._1)实现key和value互换,然后通过sortByKey(false)实现降序排列,通过第二个x=>(x._2,x._1)再次实现key和value互换,最终实现排序功能。
3.1.5 RDD缓存
Spark也支持将数据集存进一个集群的内存缓存中,当数据被反复访问时,如在查询一个小而“热”数据集,或运行像PageRank的迭代算法时,是非常有用的。举一个简单的例子,缓存变量textFilter(即包含字符串“Spark”的数据集),并针对缓存计算。

scala>textFilter.cache()
res5: textFilter.type = MapPartitionsRDD[2] at f?ilter at <console>:23
scala>textFilter.count()
res6: Long = 18

通过cache缓存数据可以用于非常大的数据集,支持跨越几十或几百个节点。

相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
157 0
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
82 0
|
3月前
|
人工智能 Shell 程序员
[oeasy]python005_退出游乐场_重启游乐场_系统态shell_应用态_quit
[oeasy]python005_退出游乐场_重启游乐场_系统态shell_应用态_quit
16 0
|
4月前
|
分布式计算 监控 Java
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
50 0
|
4月前
|
分布式计算 大数据 Java
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
48 0
|
29天前
|
Shell Linux 开发者
【Shell 命令集合 文件管理】Linux 补丁文件应用命令 patch命令使用指南
【Shell 命令集合 文件管理】Linux 补丁文件应用命令 patch命令使用指南
35 0
|
1月前
|
机器学习/深度学习 分布式计算 监控
典型的Spark应用实例
典型的Spark应用实例
40 1
|
3月前
|
分布式计算 资源调度 监控
Spark应用程序的结构与驱动程序
Spark应用程序的结构与驱动程序
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
100 0