开发者社区> 问答> 正文

动态查询准备和执行spark

在Spark中,这个json是在数据帧(DF)中,现在我们必须导航到表(在基于cust的json中),我们必须读取第一个表块并且必须准备sql查询。例如: SELECT CUST_NAME FROM CUST WHERE CUST_ID =112

我们必须在Database&store中执行此查询,结果是json文件。

{

 "cust": "Retails",
 "tables": [
    {
         "Name":"customer",
         "table_NAME":"cust",
         "param1":"cust_id",  
         "val":"112",
         "op":"cust_name"
    },
    {
         "Name":"sales",
         "table_NAME":"sale",
         "param1":"country",  
         "val":"ind",
         "op":"monthly_sale"
     }]

}

root |-- cust: string (nullable = true)

  |-- tables: array (nullable = true) 
  | |-- element: struct (containsNull = true) 
  | | |-- Name: string (nullable = true) 
  | | |-- op: string (nullable = true) 
  | | |-- param1: string (nullable = true) 
  | | |-- table_NAME: string (nullable = true) 
  | | |-- val: string (nullable = true) 

第二块表格相同。例如: SELECT MONTHLY_SALE FROM SALE WHERE COUNTRY = 'IND'

必须在DB中执行此查询并且必须将此结果存储在上面的json文件中。

这样做的最佳方法是什么?

展开
收起
社区小助手 2018-12-05 15:15:41 1691 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这是我实现这一目标的方式。对于这整个解决方案,我使用了spark-shell。这些是一些先决条件:

    从json-serde下载这个jar

    将zip文件解压缩到任何位置

    现在使用此命令运行spark-shell

    spark-shell --jars path/to/jars/json-serde-cdh5-shim-1.3.7.3.jar,path/to/jars/json-serde-1.3.7.3.jar,path/to/jars/json-1.3.7.3.jar
    你的Json文件:

    {
    "cust": "Retails",
    "tables": [

    {
         "Name":"customer",
         "table_NAME":"cust",
         "param1":"cust_id",  
         "val":"112",
         "op":"cust_name"
    },
    {
         "Name":"sales",
         "table_NAME":"sale",
         "param1":"country",  
         "val":"ind",
         "op":"monthly_sale"
     }]

    }
    折叠版:

    {"cust": "Retails","tables":[{"Name":"customer","table_NAME":"cust","param1":"cust_id","val":"112","op":"cust_name"},{"Name":"sales","table_NAME":"sale","param1":"country","val":"ind","op":"monthly_sale"}]}
    我把这个json放在这个/tmp/sample.json中

    现在去spark-sql部分:

    基于json模式创建表

    sql("CREATE TABLE json_table(cust string,tables array>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'")
    现在将json数据加载到表中

    sql("LOAD DATA LOCAL INPATH '/tmp/sample.json' OVERWRITE INTO TABLE json_table")
    现在我将使用蜂巢侧视图概念横向视图

    val ans=sql("SELECT myCol FROM json_table LATERAL VIEW explode(tables) myTable as myCol").collect
    返回结果的架构:

    ans.printSchema
    root
     |-- table: struct (nullable = true)
     |    |-- Name: string (nullable = true)
     |    |-- table_NAME: string (nullable = true)
     |    |-- param1: string (nullable = true)
     |    |-- val: string (nullable = true)
     |    |-- op: string (nullable = true)

    ans.show的结果

     ans.show
     +--------------------+
     |               table|
     +--------------------+
     |[customer,cust,cu...|
     |[sales,sale,count...|
     +--------------------+

    现在我假设可以有两种类型的数据,例如cust_id是Number类型,country是String类型。我正在添加一种方法来根据它的值来识别数据类型。例如

    def isAllDigits(x: String) = x forall Character.isDigit
    注意:您可以使用自己的方式识别此信息

    7.现在基于json数据创建查询

    ans.foreach(f=>{

    val splitted_string=f.toString.split(",")
    val op=splitted_string(4).substring(0,splitted_string(4).size-2)
    val table_NAME=splitted_string(1)
    val param1 = splitted_string(2)
    val value = splitted_string(3)
    if(isAllDigits(value)){
    println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"="+value)
    }else{
    println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"='"+value+"'")
    }
    })
    这是我得到的结果:

    SELECT cust_name FROM cust WHERE cust_id=112
    SELECT monthly_sale FROM sale WHERE country='ind'

    2019-07-17 23:18:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载