Spark Streaming Dynamic Resource Allocation

简介: The goal is to make processing time infinitely close to duration by reducing/increasing resource in spark streaming . And we also hope having a reas

Problem Statement

DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.  
Spark DRA works when there are some executors being idle for  removeExecutorInterval time,then they will be removed or when there is a backlog of pending tasks waiting to be rescheduled,then new executors will be added。This mechanism works fine for Long-Time stage。 Spark Streaming is known as  micro batch processing,hence most of time the duration of batch is small. It will cause churn when removing executors at the end of the batch and adding executors at the beginning of next batch. 
 In real-time data processing, data slowly  increase or slowly down,so the adjacent batches have almost the same data to process.
The perfect result of spark streaming is processing time equal to duration. So the key concept is reducing/increasing resource until processing time infinitely close to duration .

Goals

The goal is to make processing time  infinitely close to duration by reducing/increasing resource in  spark streaming . And we also hope having a reasonable stable time  after short-time adjustment which means resource adjustment rarely  happens

Design Summary

The same as spark DRA, Spark Streaming DRA is also  disabled,but can be enabled by a new Spark conf property spark.streaming.dynamicAllocation.enabled. All related Spark Streaming DRA properties introduced will bepackaged in the spark.streaming.dynamicAllocation.* namespace

Adding Executors

The job in Spark Streaming is  time-sensitive  .  Once  there is a batch delayed it triggers Adding-Executors action. This Action  will request  the number of spark.streaming.dynamicAllocation.maxExecutors executors from Yarn immediately in  greedy way since we hope we can eliminate the delay as soon as possible ,at the same time,we also should take care  of the situation when requesting resource with multi rounds  may have resource scarcity  in shared cluster.
Removing Executors
As mentioned before, Spark Streaming,  time-sensitive , so the action removing executors  is not allowed to consume too much time of duration. So executors marked as removed will be added to pendingToRemove set so new tasks will not be launched in them ,then we acknowledge yarn to kill them asynchronously.

Algorithm of DRA

To calculate the number of executor should be removed every round,the formula is used as following
val totalRemoveExecutorNum = Math.round(
currentExecutors * (
(duration.toDouble - processDuration) / duration - reserveRate)
)
val actualShouldRemoveExecutorNumInThisRound = totalRemoveExecutorNum / releaseRounds
In this formula, we suppose processing time  has a strong relationship with executor number,however,they are not  linear relationship. To fix this, we add some new parameters like  reserveRate and releaseRounds to make sure we fit this non-linear relationship .
We also provide a  heuristic strategy to reduce executor number. This formula will be calculate in every round, so every round the number will be readjusted circularly ,and finally actualShouldRemoveExecutorNumInThisRound will converge to zero .

Implementation of DRA

New classes should be added in spark streaming module:
package org.apache.spark.streaming
private[spark] class StreamingExecutorAllocationManager(client: ExecutorAllocationClient,
                                                        duration: Long,
                                                        steamingListenerBus: StreamingListenerBus,
                                                        listenerBus: LiveListenerBus,
                                                        conf: SparkConf) extends Logging {
  allocationManager =>
......
}

private class StreamingExecutorAllocationListener extends SparkListener {
......
}

private class StreamingSchedulerListener extends StreamingListener {
.....
}
StreamingExecutorAllocationManager is initialed in StreamingContext 
Class affected in spark core module:
package org.apache.spark
private[spark] trait ExecutorAllocationClient {
   //blackList who can remove executors immediately make Yarn remove them asynchronously possible
  def addExecutorToPendingStatus(executorId: String): Unit = {}

  //cause spark streaming is initial late then spark core module,
  // we need to know how many executors we already have when 
  // spark streaming is initaled
  def executors(): List[String] = { List() }
}
SparkContext and CoarseGrainedSchedulerBackend both are affected because they are extend from ExecutorAllocationClient.
JobScheduler is also should be modified so before batch job submitted we can adjust resource.
package org.apache.spark.streaming.scheduler
private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)

      //before job submitted,we should compute resource actually required
      ssc.executorAllocationManager match {
        case Some(eam)=>  eam.run()
        case None => //do nothing
      }
   .........
    }
  }
......}

DRA Available Properties

Enable DRA:
spark.streaming.dynamicAllocation.enabled=true
Upper/Lower bound for the number of executors if dynamic allocation is enabled.
spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50
More message will be printed in log if DRA is enabled. Default is false
spark.streaming.dynamicAllocation.debug=true
Rounds(Batches) will be used to release the resource calculated in current round. Default is 5
spark.streaming.dynamicAllocation.releaseRounds=5
The number of rounds(Batches) should be remembered. We can increase this number if batch processing time is unstable . this number affects processDuration in    (duration.toDouble - processDuration) .Default is 1 
spark.streaming.dynamicAllocation.rememberBatchSize=1
DRA delays to  work when  the specific number of rounds has been submitted. Default is 10
spark.streaming.dynamicAllocation.delay.rounds=10
Make sure the resource  is more than reserveRate * current number of Executors. More useful than minExecutorNumber. Default is 0.2
spark.streaming.dynamicAllocation.reserveRate=0.2
目录
相关文章
|
5月前
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
39 0
|
7月前
|
canal 分布式计算 关系型数据库
大数据Spark Streaming实时处理Canal同步binlog数据
大数据Spark Streaming实时处理Canal同步binlog数据
114 0
|
7月前
|
消息中间件 分布式计算 Kafka
大数据Spark Streaming Queries 2
大数据Spark Streaming Queries
83 0
|
4月前
|
分布式计算 大数据 Apache
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
64 0
|
5天前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
39 0
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的容错性与高可用性
Spark Streaming的容错性与高可用性
|
3月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
3月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
3月前
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作