Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变_weixin_30703911的博客-程序员宅基地

技术标签: 大数据  

1、为什么引入Backpressure

      默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate
”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。
2、Backpressure
      Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
2.1 Streaming架构如下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)



2.2 BackPressure执行过程如下图所示:
  在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).

3、BackPressure 源码解析
3.1 RateController类体系
RatenController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:

**
 * A StreamingListener that receives batch completion     updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
  /**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
  val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
  newRate.foreach { s =>
    rateLimit.set(s.toLong)
    publish(getLatestRate())
  }
}
def getLatestRate(): Long = rateLimit.get()

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
  processingEnd <- batchCompleted.batchInfo.processingEndTime
  workDelay <- batchCompleted.batchInfo.processingDelay
  waitDelay <- batchCompleted.batchInfo.schedulingDelay
  elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}

3.2 RateController的注册
JobScheduler启动时会抽取在DStreamGraph中注册的所有InputDstream中的rateController,并向ListenerBus注册监听. 此部分代码如下:

def start(): Unit = synchronized {
   if (eventLoop != null) return // scheduler has already been started

   logDebug("Starting JobScheduler")
   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
   override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

   override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
 }
 eventLoop.start()

 // attach rate controllers of input streams to receive batch completion updates
 for {
   inputDStream <- ssc.graph.getInputStreams
   rateController <- inputDStream.rateController
 } ssc.addStreamingListener(rateController)</span>

 listenerBus.start()
 receiverTracker = new ReceiverTracker(ssc)
 inputInfoTracker = new InputInfoTracker(ssc)
 receiverTracker.start()
 jobGenerator.start()
 logInfo("Started JobScheduler")
}

3.3 BackPressure执行过程分析
BackPressure 执行过程分为BatchCompleted事件触发时机和事件处理两个过程
3.3.1 BatchCompleted触发过程
对BatchedCompleted的分析,应该从JobGenerator入手,因为BatchedCompleted是批次处理结束的标志,也就是JobGenerator产生的作业执行完成时触发的,因此进行作业执行分析。
Streaming 应用中JobGenerator每个Batch Interval都会为应用中的每个Output Stream建立一个Job, 该批次中的所有Job组成一个Job Set.使用JobScheduler的submitJobSet进行批量Job提交。此部分代码结构如下所示

 /** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)

  // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
  // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

其中,sumitJobSet会创建固定数量的后台线程(具体由“spark.streaming.concurrentJobs”指定),去处理Job Set中的Job. 具体实现逻辑为:

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

其中JobHandler用于执行Job及处理Job执行结果信息。当Job执行完成时会产生JobCompleted事件. JobHandler的具体逻辑如下面代码所示:

当Job执行完成时,向eventLoop发送JobCompleted事件。EventLoop事件处理器接到JobCompleted事件后将调用handleJobCompletion 来处理Job完成事件。handleJobCompletion使用Job执行信息创建StreamingListenerBatchCompleted事件并通过StreamingListenerBus向监听器发送。实现如下:

private def handleJobCompletion(job: Job, completedTime: Long) {
   val jobSet = jobSets.get(job.time)
   jobSet.handleJobCompletion(job)
   job.setEndTime(completedTime)
   listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
   logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
   if (jobSet.hasCompleted) {
     jobSets.remove(jobSet.time)
     jobGenerator.onBatchCompletion(jobSet.time)
     logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
     jobSet.totalDelay / 1000.0, jobSet.time.toString,
     jobSet.processingDelay / 1000.0
   ))
 listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
 }
 job.result match {
   case Failure(e) =>
       reportError("Error running job " + job, e)
   case _ =>
 }
}

3.3.2、BatchCompleted事件处理过程
StreamingListenerBus将事件转交给具体的StreamingListener,因此BatchCompleted将交由RateController进行处理。RateController接到BatchCompleted事件后将调用onBatchCompleted对事件进行处理。

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
  val elements = batchCompleted.batchInfo.streamIdToInputInfo

  for {
    processingEnd <- batchCompleted.batchInfo.processingEndTime
    workDelay <- batchCompleted.batchInfo.processingDelay
    waitDelay <- batchCompleted.batchInfo.schedulingDelay
    elems <- elements.get(streamUID).map(_.numRecords)
  } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

onBatchCompleted会从完成的任务中抽取任务的执行延迟和调度延迟,然后用这两个参数用RateEstimator(目前存在唯一实现PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate并发布。代码如下:

/**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
  val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
  newRate.foreach { s =>
    rateLimit.set(s.toLong)
    publish(getLatestRate())
  }
}

其中publish()由RateController的子类ReceiverRateController来定义。具体逻辑如下(ReceiverInputDStream中定义):

/**
   * A RateController that sends the new rate to receivers, via the receiver tracker.
   */
 private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
  extends RateController(id, estimator) {
  override def publish(rate: Long): Unit =
    ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}

publish的功能为新生成的rate 借助ReceiverTracker进行转发。ReceiverTracker将rate包装成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint

/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long):   Unit = synchronized {
  if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
  }
}

ReceiverTrackerEndpoint接到消息后,其将会从receiverTrackingInfos列表中获取Receiver注册时使用的endpoint(实为ReceiverSupervisorImpl),再将rate包装成UpdateLimit发送至endpoint.其接到信息后,使用updateRate更新BlockGenerators(RateLimiter子类),来计算出一个固定的令牌间隔。

其中RateLimiter的updateRate实现如下:

/**
  * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
  * {
      {
      {spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
  *
  * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
  */
 private[receiver] def updateRate(newRate: Long): Unit =
   if (newRate > 0) {
   if (maxRateLimit > 0) {
     rateLimiter.setRate(newRate.min(maxRateLimit))
   } else {
     rateLimiter.setRate(newRate)
   }
 }

setRate的实现如下:

public final void setRate(double permitsPerSecond) {
  Preconditions.checkArgument(permitsPerSecond > 0.0
    && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex) {
    resync(readSafeMicros());
    double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;  //固定间隔
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }
}

到此,backpressure反压机制调整rate结束。

4.流量控制点
  当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。

  /**
   * Push a single data item into the buffer.
   */
  def addData(data: Any): Unit = {
      if (state == Active) {
         waitToPush()  //获取令牌
        synchronized {
          if (state == Active) {
            currentBuffer += data
          } else {
            throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
          }
        }
      } else {
        throw new SparkException(
    "Cannot add data as BlockGenerator has not been started or has been stopped")
}

其令牌投放采用令牌桶机制进行, 原理如下图所示:

令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
  Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。


  令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
  Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。

 

转载于:https://www.cnblogs.com/itboys/p/6486089.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_30703911/article/details/99553981

智能推荐

Enhancement of SSD by concatenating_feature maps for object detection 论文笔记_五十岁的少女的博客-程序员宅基地

background任务:通过有效的特征映射提高传统SSD的性能关键词:Rainbow concatenation​Introduction传统的SSD的特征金字塔对小目标的检测效果不是很好,而且直接通过层级进行预测,彼此的layers没有信息交互,会降低模型的准确度。​所以作者通过对SSD中的预测层级进行,pooling和反卷积,使各个层级有信息交互,使得模型的准确率上生,但是,检测速率也有一定的下降​Model整体结构Rainbow concatenation主要是通过po.

Adreno OpenGL ES 3.1 介绍(5)_weixin_38498942的博客-程序员宅基地

Adreno OpenGL ES 3.1 介绍(5)4.2.2多样本纹理–演示4.2.2.1设置多样本纹理4.2.2.2使用多样本纹理作为渲染目标4.2.2.3将多样本纹理数据传输到后台缓冲区4.2.2多样本纹理–演示该演示展示了一个旋转的线框立方体,该立方体首先被渲染为多样本纹理,然后被渗入后台缓冲区。 创建了许多不同的多样本纹理,每个纹理每个纹理像素拥有不同数量的样本。 用户可以在这些纹理之间切换,以查看视觉质量方面的差异。与前面的示例一样,此演示中使用的着色器很简单:顶点着色器将模型-视图-

curl 错误:unable to verify the first certificate 解决办法_u010912615的博客-程序员宅基地

CURL 错误:UNABLE TO VERIFY THE FIRST CERTIFICATE 解决办法我在阿里云申请的免费ssl证书到期了,又重新申请了新的免费证书,部署完毕后浏览 事情是这样的:我在阿里云申请的免费ssl证书到期了,又重新申请了新的免费证书,部署完毕后浏览器访问 https 网站正常,但...

label行间距_weixin_33670786的博客-程序员宅基地

NSMutableAttributedString * attributedString1 = [[NSMutableAttributedString alloc] initWithString:testString];NSMutableParagraphStyle * paragraphStyle1 = [[NSMutableParagraphSt...

模拟移动机器人控制(四)_nevermoredanny的博客-程序员宅基地

clc;close all;clearhold onaxis([0,200,0,200]);[X,Y] = ginput(1);C= pi/2;P= [X,Y,C];L= 4;V= 4;Pl=[X-L/2,Y];Pr=[X+L/2,Y];plot((Pl(1)+Pr(1))/2,(Pl(2)+Pr(2))/2,'*');plot(Pl(1),Pl(2),'o');plot

决策树和基于决策树的模型构建_sljwy的博客-程序员宅基地_基于决策树的模型

http://graphviz.readthedocs.io/en/stable/manual.html#installationgraphviz 安装包下载地址 https://www.graphviz.org/download/将 graphviz 的安装位置添加到系统环境变量使用pip install graphviz安装 graphviz python 包使用pip install pydotplus安装 pydotplus python 包决策树模型可视化以iris数据为例。训练一个分

随便推点

4大典型安全漏洞是怎么来的?如何解决?_simplilearn圣普伦的博客-程序员宅基地

关于防御网络攻击,最好的防御就是强有力的出击。首先,必须使用适当的工具和流程(如漏洞扫描程序和威胁检测技术)来识别潜在的漏洞和威胁。确定漏洞和威胁后,分析确定它们的优先级,按重要性顺序消除或减轻它们。

必需品、消费品与奢侈品_weixin_34194702的博客-程序员宅基地

  一般来说,消费分为三个层面,即是消费品、奢侈品、必需品。这三个层面没有明显的分界线,不同收入层次的人对这三个层面定义不同,如一个月收入在平均生活指数下的人来说,买一瓶海飞丝就是奢侈品;而对于白领阶层的人来说,这可以就是消费品;而对于月收入数十万的人来说,这可能是必需品。  但不管对谁来说,普通的毛巾,牙膏,这些都是必需品。而用纯金打造的指甲剪,对谁来说,应该都是奢侈品。  必需品是满足日常生活...

常见缓存读写策略_January丶的博客-程序员宅基地_缓存的读写策略

缓存读写策略我们都知道缓存大多数情况下是用来减轻数据库压力的。缓存读写策略就是在进行数据读/写时以何种策略读写缓存和数据库,即:读请求时先读缓存还是数据库,缓存中数据不存在怎么办,写请求时先更新数据库还是缓存,同步更新还是异步更新等一系列问题的方案。Cache Aside PatternCache Aside Pattern 译为旁路缓存模式。该模式以数据库为主,缓存为辅。主要策略如下:读请求在读请求时,先查询缓存:1. 缓存中存在,直接返回;2. 缓存中不存在,查询数据库,然后将结果写入缓存。

java语言网站无源代码二次开发_【杰表sdk(web报表二次开发包,java语言) 2.8和Eclipse哪个好用】杰表sdk(web报表二次开发包,java语言) 2.8和Eclipse对比-ZO..._谁顾的博客-程序员宅基地

杰表sdk可以用来开发基于web的报表系统,使用杰表sdk可以使杰表系统系统更好的集成到您的项目中,本开发包通过几个不同环境下的代码示例,来说明如何使用杰表API,来新建,修改,发布,浏览,打印报表。如何使用杰表提供的接口,来自定义报表元素,自定义JavaBean数据源,以及如何监听杰表服务器来生成日志。1设置开发环境2 在java swing 应用程序中使用杰表2.1...

推荐文章

热门文章

相关标签