Spark高级数据分析· 6LSA

简介: ![](http://img3.douban.com/lpic/s28323514.jpg) [潜在语义分析](http://www.atatech.org/articles/48374) ``` wget http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream.xml.bz2

潜在语义分析

wget http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream.xml.bz2

1 获取数据

def readFile(path: String, sc: SparkContext): RDD[String] = {
  val conf = new Configuration()
  conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
  conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
  val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable],
    classOf[Text], conf)
  rawXmls.map(p => p._2.toString)
}

//Returns a (title, content) pair
def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = {
  val page = new EnglishWikipediaPage()
  WikipediaPage.readPage(page, pageXml)
  if (page.isEmpty || !page.isArticle || page.isRedirect ||
      page.getTitle.contains("(disambiguation)")) {
    None
  } else {
    Some((page.getTitle, page.getContent))
  }
}

val pages = readFile("hdfs:///user/ds/Wikipedia/", sc)
   .sample(false, sampleSize, 11L)

val plainText = pages.filter(_ != null).flatMap(wikiXmlToPlainText)

2 词形归并

def createNLPPipeline(): StanfordCoreNLP = {
  val props = new Properties()
  props.put("annotators", "tokenize, ssplit, pos, lemma")
  new StanfordCoreNLP(props)
}

def isOnlyLetters(str: String): Boolean = {
  // While loop for high performance
  var i = 0
  while (i < str.length) {
    if (!Character.isLetter(str.charAt(i))) {
      return false
    }
    i += 1
  }
  true
}

def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP)
  : Seq[String] = {
  val doc = new Annotation(text)
  pipeline.annotate(doc)
  val lemmas = new ArrayBuffer[String]()
  val sentences = doc.get(classOf[SentencesAnnotation])
  for (sentence <- sentences.asScala;
       token <- sentence.get(classOf[TokensAnnotation]).asScala) {
    val lemma = token.get(classOf[LemmaAnnotation])
    if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) {
      lemmas += lemma.toLowerCase
    }
  }
  lemmas
}

val stopWords = sc.broadcast(loadStopWords("stopwords.txt")).value

val lemmatized = plainText.mapPartitions(iter => {
  val pipeline = createNLPPipeline()
  iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))}
})

3 TF-IDF

def documentTermMatrix(docs: RDD[(String, Seq[String])], stopWords: Set[String], numTerms: Int,
     sc: SparkContext): (RDD[Vector], Map[Int, String], Map[Long, String], Map[String, Double]) = {
   val docTermFreqs = docs.mapValues(terms => {
     val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) {
       (map, term) => map += term -> (map.getOrElse(term, 0) + 1)
     }
     termFreqsInDoc
   })

   docTermFreqs.cache()
   val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap()

   val docFreqs = documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms)
   println("Number of terms: " + docFreqs.size)
   saveDocFreqs("docfreqs.tsv", docFreqs)

   val numDocs = docIds.size

   val idfs = inverseDocumentFrequencies(docFreqs, numDocs)

   // Maps terms to their indices in the vector
   val idTerms = idfs.keys.zipWithIndex.toMap
   val termIds = idTerms.map(_.swap)

   val bIdfs = sc.broadcast(idfs).value
   val bIdTerms = sc.broadcast(idTerms).value

   val vecs = docTermFreqs.map(_._2).map(termFreqs => {
     val docTotalTerms = termFreqs.values.sum
     val termScores = termFreqs.filter {
       case (term, freq) => bIdTerms.contains(term)
     }.map{
       case (term, freq) => (bIdTerms(term), bIdfs(term) * termFreqs(term) / docTotalTerms)
     }.toSeq
     Vectors.sparse(bIdTerms.size, termScores)
   })
   (vecs, termIds, docIds, idfs)
 }

 def documentFrequencies(docTermFreqs: RDD[HashMap[String, Int]]): HashMap[String, Int] = {
   val zero = new HashMap[String, Int]()
   def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int])
     : HashMap[String, Int] = {
     tfs.keySet.foreach { term =>
       dfs += term -> (dfs.getOrElse(term, 0) + 1)
     }
     dfs
   }
   def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int])
     : HashMap[String, Int] = {
     for ((term, count) <- dfs2) {
       dfs1 += term -> (dfs1.getOrElse(term, 0) + count)
     }
     dfs1
   }
   docTermFreqs.aggregate(zero)(merge, comb)
 }

 def documentFrequenciesDistributed(docTermFreqs: RDD[HashMap[String, Int]], numTerms: Int)
     : Array[(String, Int)] = {
   val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)
   val ordering = Ordering.by[(String, Int), Int](_._2)
   docFreqs.top(numTerms)(ordering)
 }

 def trimLeastFrequent(freqs: Map[String, Int], numToKeep: Int): Map[String, Int] = {
   freqs.toArray.sortBy(_._2).take(math.min(numToKeep, freqs.size)).toMap
 }

 def inverseDocumentFrequencies(docFreqs: Array[(String, Int)], numDocs: Int)
   : Map[String, Double] = {
   docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap
 }

4 奇异值分解

termDocMatrix.cache()
val mat = new RowMatrix(termDocMatrix)
val svd = mat.computeSVD(k, computeU=true)
def topTermsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
    numTerms: Int, termIds: Map[Int, String]): Seq[Seq[(String, Double)]] = {
  val v = svd.V
  val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
  val arr = v.toArray
  for (i <- 0 until numConcepts) {
    val offs = i * v.numRows
    val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
    val sorted = termWeights.sortBy(-_._1)
    topTerms += sorted.take(numTerms).map{case (score, id) => (termIds(id), score)}
  }
  topTerms
}

def topDocsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
    numDocs: Int, docIds: Map[Long, String]): Seq[Seq[(String, Double)]] = {
  val u  = svd.U
  val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
  for (i <- 0 until numConcepts) {
    val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId
    topDocs += docWeights.top(numDocs).map{case (score, id) => (docIds(id), score)}
  }
  topDocs
}

val topConceptTerms = topTermsInTopConcepts(svd, 10, 10, termIds)
val topConceptDocs = topDocsInTopConcepts(svd, 10, 10, docIds)
for ((terms, docs) <- topConceptTerms.zip(topConceptDocs)) {
  println("Concept terms: " + terms.map(_._1).mkString(", "))
  println("Concept docs: " + docs.map(_._1).mkString(", "))
  println()
}

5 相关度

import breeze.linalg.{DenseMatrix => BDenseMatrix, DenseVector => BDenseVector,
SparseVector => BSparseVector}

def topTermsForTerm(normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = {
  // Look up the row in VS corresponding to the given term ID.
  val termRowVec = new BDenseVector[Double](row(normalizedVS, termId).toArray)

  // Compute scores against every term
  val termScores = (normalizedVS * termRowVec).toArray.zipWithIndex

  // Find the terms with the highest scores
  termScores.sortBy(-_._1).take(10)
}

def topDocsForDoc(normalizedUS: RowMatrix, docId: Long): Seq[(Double, Long)] = {
  // Look up the row in US corresponding to the given doc ID.
  val docRowArr = row(normalizedUS, docId)
  val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr)

  // Compute scores against every doc
  val docScores = normalizedUS.multiply(docRowVec)

  // Find the docs with the highest scores
  val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId

  // Docs can end up with NaN score if their row in U is all zeros.  Filter these out.
  allDocWeights.filter(!_._1.isNaN).top(10)
}

def topDocsForTerm(US: RowMatrix, V: Matrix, termId: Int): Seq[(Double, Long)] = {
  val termRowArr = row(V, termId).toArray
  val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr)

  // Compute scores against every doc
  val docScores = US.multiply(termRowVec)

  // Find the docs with the highest scores
  val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
  allDocWeights.top(10)
}

多词项查询

def termsToQueryVector(terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double])
  : BSparseVector[Double] = {
  val indices = terms.map(idTerms(_)).toArray
  val values = terms.map(idfs(_)).toArray
  new BSparseVector[Double](indices, values, idTerms.size)
}

def topDocsForTermQuery(US: RowMatrix, V: Matrix, query: BSparseVector[Double])
  : Seq[(Double, Long)] = {
  val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray)
  val termRowArr = (breezeV.t * query).toArray

  val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr)

  // Compute scores against every doc
  val docScores = US.multiply(termRowVec)

  // Find the docs with the highest scores
  val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
  allDocWeights.top(10)
}
目录
相关文章
|
5月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
7月前
|
机器学习/深度学习 数据可视化 算法
数据分析和机器学习的11个高级可视化图表介绍
可视化是一种强大的工具,用于以直观和可理解的方式传达复杂的数据模式和关系。它们在数据分析中发挥着至关重要的作用,提供了通常难以从原始数据或传统数字表示中辨别出来的见解。
118 0
|
17天前
|
消息中间件 数据挖掘 Kafka
《区块链公链数据分析简易速速上手小册》第5章:高级数据分析技术(2024 最新版)(上)
《区块链公链数据分析简易速速上手小册》第5章:高级数据分析技术(2024 最新版)(上)
45 1
|
5月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
78 0
|
5月前
|
SQL 分布式计算 数据挖掘
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
49 0
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
|
5月前
|
分布式计算 数据挖掘 关系型数据库
Spark综合练习——电影评分数据分析
Spark综合练习——电影评分数据分析
26 0
|
5月前
|
SQL 分布式计算 数据可视化
Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】
Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】
|
7月前
|
机器学习/深度学习 数据可视化 算法
数据分析和机器学习的11个高级可视化图表介绍
数据分析和机器学习的11个高级可视化图表介绍
|
7月前
|
数据挖掘 索引 Python
【100天精通Python】Day60:Python 数据分析_Pandas高级功能-数据透视表pivot_table()和数据交叉表crosstab()常用功能和操作
【100天精通Python】Day60:Python 数据分析_Pandas高级功能-数据透视表pivot_table()和数据交叉表crosstab()常用功能和操作
112 0
|
7月前
|
存储 并行计算 数据挖掘
【100天精通Python】Day59:Python 数据分析_Pandas高级功能-多层索引创建访问切片和重塑操作,pandas自定义函数和映射功能
【100天精通Python】Day59:Python 数据分析_Pandas高级功能-多层索引创建访问切片和重塑操作,pandas自定义函数和映射功能
71 2