Spark工程开发常用函数与方法(Scala语言)

简介: import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.sql.{SaveMode, DataFrame}import scala.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, DataFrame}
import scala.collection.mutable.ArrayBuffer
import main.asiainfo.coc.tools.Configure
import org.apache.spark.sql.hive.HiveContext
import java.sql.DriverManager
import java.sql.Connection

 1 连接前台数据源 查询前台MYSQL中的数据

val DIM_COC_INDEX_INFO_DDL = s"""
CREATE TEMPORARY TABLE DIM_COC_INDEX_INFO
USING org.apache.spark.sql.jdbc
OPTIONS (
url '${mySQLUrl}',
dbtable 'DIM_COC_INDEX_INFO'
)""".stripMargin

sqlContext.sql(DIM_COC_INDEX_INFO_DDL)
val DIM_COC_INDEX_INFO = sql("SELECT * FROM DIM_COC_INDEX_INFO").cache()

  

 

2   在A表中筛选出 B表中获取的TARGET_TABLE_CODE 然后再按照DATA_SRC_CODE排序,查询出源表的集合

val sources = DIM_COC_INDEX_INFO.filter("TARGET_TABLE_CODE ='"+TARGET_TABLE_CODE+"'")
        .select("DATA_SRC_CODE").groupBy("DATA_SRC_CODE").agg(DIM_COC_INDEX_INFO("DATA_SRC_CODE")).collect

3 将表进行关联

resultIndexTableDF = resultIndexTableDF.join(SOURCE_TABLE,ALL_USERS.col(ALL_USER_JOIN_COLUMN_NAME) === SOURCE_TABLE.col(SOURCE_TABLE_JOIN_COLUMN_NAME),"left_outer")
resultIndexTableDF.dtypes.foreach(println)

4 根据条件筛选

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner")
      .join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner")
      .join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner")
      .join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner")
      .filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === CI_APPROVE_STATUS_SUCCESS_CODE
      and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2)
      and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull  //TODO   trim.length>0
      )
      and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === TABLE_DATA_CYCLE
      ).cache()

5 根据某字段对表进行排序

    val labelTargetTables = labels.groupBy("CI_MDA_SYS_TABLE.TABLE_ID","CI_MDA_SYS_TABLE.TABLE_NAME").agg(labels("CI_MDA_SYS_TABLE.TABLE_ID"),labels("CI_MDA_SYS_TABLE.TABLE_NAME")).collect

6 创建parquet格式的表 可使用schema.生成到指定的schema.

        sqlContext.sql("create table "+labelTargetTableName+" stored as parquet as select * from default."+labelTargetTableNameJson)

7 保存数据格式,可以指定生成的格式

 resultLabelTable.saveAsTable(tableName = labelTargetTableName, source="parquet", mode=SaveMode.Overwrite)

8 根据筛选查询出相应数据,由于cache方法并不属于action操作,接下来的操作需要这一步所执行的数据信息,所以这里使用collect方法,再执行遍历方法

      val r0000Labels = labelInThisTargetTable.filter("COUNT_RULES_CODE = 'R_00000'").select("CI_LABEL_INFO.LABEL_ID","COLUMN_NAME").collect
for(r0000Label <- r0000Labels){
   ........
}

  

 

  

  

目录
相关文章
|
1月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
66 11
|
7月前
|
SQL 分布式计算 Java
Apache IoTDB开发系统整合之Spark IoTDB Connecter
以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。
108 0
|
2月前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
107 2
|
2月前
|
分布式计算 数据处理 Scala
Spark 集群和 Scala 编程语言的关系
Spark 集群和 Scala 编程语言的关系
29 0
|
3月前
|
分布式计算 Java Scala
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
107 0
|
6月前
|
安全 Java 编译器
Scala语言入门:初学者的基础语法指南
作为一种在Java虚拟机(JVM)上运行的静态类型编程语言,Scala结合了面向对象和函数式编程的特性,使它既有强大的表达力又具备优秀的型态控制
27 0
|
10月前
|
缓存 分布式计算 Spark
Spark RDD开发
开发步骤
55 0
|
11月前
|
分布式计算 Ubuntu Java
使用IntelliJ Idea开发Spark Streaming流应用程序
使用IntelliJ Idea开发Spark Streaming流应用程序