开发者社区 > 数据库 > NoSQL数据库 > 正文

spark根据df动态创建cassandra的表怎么搞?

spark根据df动态创建cassandra的表怎么搞?

展开
收起
小六码奴 2019-05-30 14:53:10 2771 0
1 条回答
写回答
取消 提交回答
  • 改一把spark-cassandra-connector 的CassandraSourceRelation.scala
    Schema.fromCassandra(connector, Some(tableRef.keyspace), Some(tableRef.table)).tables.headOption match {

    case Some(t) => t
    case None => createKeyspaceAndTable(connector, tableRef, userSpecifiedSchema)

    }

    /**

    * create keyspace and table if not exists
    * @param connector
    * @param tableRef
    * @param userSpecifiedSchema
    * @return
    */

    private def createKeyspaceAndTable (connector: CassandraConnector, tableRef: TableRef, userSpecifiedSchema: Option[StructType]) = {

    try {
      connector.withSessionDo {
        val keyspace = quote(tableRef.keyspace)
        val table = quote(tableRef.table)
        val structType: StructType = userSpecifiedSchema.get
        val builder = new StringBuilder
        builder.append("CREATE TABLE IF NOT EXISTS ")
        builder.append(keyspace)
        builder.append(".")
        builder.append(table)
        builder.append(" (")
        builder.append(structType.sql.replace("STRUCT<", "").replace(">", "").replace("`", "\"").replace(":", "").replace("STRING", "TEXT"))
        builder.append(",PRIMARY KEY ((")

    // val fieldsNames = structType.fieldNames
    // for (i <- 0 until fieldsNames.length) {
    // builder.append(fieldsNames(i))
    // if (i < fieldsNames.length - 1) {
    // builder.append(",")
    // }
    // }

        val fields = structType.fields
        val partitionKeyColumns: ArrayBuffer[String] = ArrayBuffer[String]()  //partition keys array
        val clusteringColumns: ArrayBuffer[String] = ArrayBuffer[String]()    //clustering keys array
        var firstFieldName = fields(0).name     //first column's name. Used as default primary key when user not specified partition key or clustering key
    
        for(i <- 0 until fields.length) {
          val comment = fields(i).getComment()
          //fetch first column name to be used as default primary key
          if(0 == structType.getFieldIndex(fields(i).name).get) {
            firstFieldName = fields(i).name
          }
          //fetch partition key
          if(comment.getOrElse("").contains("_pk")) {
            partitionKeyColumns += fields(i).name
          }
          else if(comment.getOrElse("").contains("_ck")) {
            clusteringColumns += fields(i).name         //fetch clustering key
          }
        }
        if(partitionKeyColumns.size <=0 && clusteringColumns.size <= 0) {
          builder.append(firstFieldName)
          builder.append(")")
        }
        else if(partitionKeyColumns.size <=0 && clusteringColumns.size > 0) {
          throw new IOException("Please specify partition key")
        }
        else {
          for(i <- 0 until partitionKeyColumns.size) {
            builder.append(partitionKeyColumns(i))
            if(i != partitionKeyColumns.size -1) {
              builder.append(",")
            }
          }
          builder.append(")")
          if(clusteringColumns.size > 0) {
            builder.append(",")
            for(i <- 0 until clusteringColumns.size) {
              builder.append(clusteringColumns(i))
              if(i != clusteringColumns.size -1) {
                builder.append(",")
              }
            }
          }
        }
        builder.append("))")
    
        session =>
          session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
          session.execute(builder.toString())
      }
    }catch {

    // case e:NoSuchElementException => throw new IOException("To create a table, fields definition need to be provided")

      case e:NoSuchElementException => e.printStackTrace()
    }

    }
    先查一把,如果没有则创建

    2019-07-17 23:36:24
    赞同 展开评论 打赏

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载