开发者社区> 问答> 正文

sparkstreaming mapWithState状态保存问题

sparkstreaming mapWithState状态保存问题,我们该如何保证sparkstreaming在任务重启后可以保证我们的可以从checkpoint中读取到之前的状态,我发现我做不到。期望得到帮助。下面是代码

package com.nuc.online

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._

object WordCount3 {

def main(args: Array[String]): Unit = {

// 本地模式运行,便于测试
val spark = SparkSession.builder()
  .appName(this.getClass.getName)
  .master("local[2]")
  .getOrCreate()

val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
ssc.checkpoint("./check")

val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

val lines = ssc.socketTextStream("hadoop102", 9999)
lines.checkpoint(Duration(6000))
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// 定义一个接收三个参数的匿名函数
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)  // 当前值加上上一个批次的该状态的值
  val output = (word, sum) // 输出单词和该单词出现的次数
  state.update(sum) // 更新当前的状态
  output // 该匿名函数的返回值为输出结果
}
// 使用 mapWithState 更新状态
val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.print()

ssc.start()
ssc.awaitTermination()

}

}

展开
收起
引领时尚s 2019-01-27 23:27:18 3235 0
1 条回答
写回答
取消 提交回答
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载