开发者社区> 问答> 正文

在Spark Stream中保存PairRdd时出错[重复]

我试图在spark流中保存我的Pair Rdd但在最后一步保存时出错。

这是我的示例代码

def main(args: Array[String]) {

val inputPath = args(0)
val output = args(1)
val noOfHashPartitioner = args(2).toInt

println("IN Streaming ")
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration;
//hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

val ssc = new org.apache.spark.streaming.StreamingContext(sc, Seconds(60))
val input = ssc.textFileStream(inputPath)

val pairedRDD = input.map(row => {
  val split = row.split("\\|")
  val fileName = split(0)
  val fileContent = split(1)
  (fileName, fileContent)
})
import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
//print(pairedRDD)

pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

}
我在保存时已经到了最后一步。一定在这里遗漏了什么,得到的错误就像

value partitionBy不是org.apache.spark.streaming.dstream.DStream [(String,String)]的数据

展开
收起
社区小助手 2018-12-12 13:58:30 2590 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    pairedRDD是DStream[(String, String)]不是类型RDD[(String,String)]。该方法partitionBy不适用于DStreams。

    也许看看foreachRDD应该可用DStream。

    编辑:更多的上下文解释textFileStream将在指定的路径上设置目录监视,并且只要有新文件将流内容。所以这就是流方面的来源。那是你要的吗?或者你只是想“按原样”阅读目录的内容一次?然后readTextFiles会返回一个非流容器。

    2019-07-17 23:20:08
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载