Spark-SparkSql

  1. 云栖社区>
  2. 博客>
  3. 正文

Spark-SparkSql

小金子 2016-05-09 22:30:05 浏览754
展开阅读全文
  1. SparkSql
    允许spark执行sql语句,hivesql,scala的描述的基于关系的查询。其实是封装了新的RDD-SchemaRDD,由行对象组成,有一个模式描述每列的数据类型。SchemaRDD与关系型数据库的表很相似,可以通过存在的RDD/Parquet文件/Json文件/用Hive中的数据HiveSql创建。其中相关功能入口是SQLContext()及其子类。
    如HiveContext可以用HiveQL分析器查询访问HiveUDFs的能力、从Hive中读取数据。SparkSQL的数据源相关操作可以通过SchemaRDD接口来操作,可以是被当作一般的RDD,也可以注册成临时表,在上面进行sql查询。
    有两种创建SchemaRDD的方式,一是已经知道了模式,基于反射推断。二是不知道模式,采取实现接口方法,构造一个模式。
//指定模式
val schema = StructType("name age".split(',').map(fieldName=>StructField(fieldName,StringType,true)))
vak rowRdd = sc.textFile("文件地址").map(_.split(',')).map(p=>Row(p(0),p(1).trim))
val peopleSchemaRdd = sqlContext.applySchema(rowRdd ,schema )
peopleSchemaRdd.registerTable("people")

或者

  DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
 schemaPeople.registerTempTable("people");
 Parquet是柱状的格式,binnaryAsString标记sparksql将二进制文件解释成字符串。cacheMetadata打开缓存提高静态数据的查询速度。comperssion.codec是设置文件的压缩算法(snappy、gzip、lzo)。filterPushdown是该文件过滤器的pushdown优化。

对于SparkSql的性能调优可以通过缓存数据和打开一些设置选项来调优。
如cacheTable缓存柱状格式的表spark会只浏览需要的列并且自动的去压缩数据减少内存的使用以及垃圾回收的压力。uncacheTable()可以删除临时表,spark.sql.inMemoryColumarStorage.compressed 基于数据的统计信息每列自动的选择一个压缩算法,
spark.sql.inMemoryColumarStorage.batchSize柱状缓存的批数据大小,越大的数据可以提高内存的利用率和压缩效率,但是OOM是个问题啊,据说spark 2.0的钨丝计划会解决spark申请内存的管理问题。
2. 实例

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
  public static class Person implements Serializable {
    private String name;
    private int age;
    public String getName() {
      return name;
    }
    public void setName(String name) {
      this.name = name;
    }
    public int getAge() {
      return age;
    }
    public void setAge(int age) {
      this.age = age;
    }
  }

  public static void main(String[] args) throws Exception {
    /**
     * 初始化
     */
    SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[*]");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(ctx);
    System.out.println("=== Data source: RDD ===");
    /**
     * 加载本地文件转换成Bean
     */
    JavaRDD<String> a = ctx.textFile("resources/people.txt");
    System.out.println(a.toDebugString());//rdd的(血统)其实就是RDD得的转换
    JavaRDD<Person> people = ctx.textFile("resources/people.txt").map(
      new Function<String, Person>() {
        @Override
        public Person call(String line) {
          String[] parts = line.split(",");
          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
          return person;
        }
      });
    //注册表 javabean形RDD即对象
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    // SQL
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }

  /*  System.out.println("=== Data source: Parquet File ===");
    // DataFrames can be saved as parquet files, maintaining the schema information.
    schemaPeople.write().parquet("testdata/people.parquet");

    // Read in the parquet file created above.
    // Parquet files are self-describing so the schema is preserved.
    // The result of loading a parquet file is also a DataFrame.
    DataFrame parquetFile = sqlContext.read().parquet("testdata/people.parquet");

    //Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("testdata/parquetFile");
    DataFrame teenagers2 =
      sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
          return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }*/
    /**
     * 读取本地json文件
     */
    System.out.println("=== Data source: JSON Dataset ===");
    String path = "resources/people.json";
    // Because the schema of a JSON dataset is automatically inferred, to write queries,
    DataFrame peopleFromJsonFile = sqlContext.read().json(path);
    peopleFromJsonFile.printSchema();
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType
    peopleFromJsonFile.registerTempTable("people");
    DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) { return "Name: " + row.getString(0); }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }
    /**
     * 测试
     */
    List<String> jsonData = Arrays.asList(
          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
    DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
    peopleFromJsonRDD.printSchema();
    // root
    //  |-- address: StructType
    //  |    |-- city: StringType
    //  |    |-- state: StringType
    //  |-- name: StringType
    peopleFromJsonRDD.registerTempTable("people2");
    DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
    List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0) + ", City: " + row.getString(1);
      }
    }).collect();
    for (String name: nameAndCity) {
      System.out.println(name);
    }
    ctx.stop();
  }
}

网友评论

登录后评论
0/500
评论
小金子
+ 关注