Spark学习七 ——总体流程分析_4spark任务执行过程有哪些?a.基于rdds构建dagb.dag scheduler将dag切分-程序员宅基地

Spark总体流程简述

在这里插入图片描述1.构建DAG(调用RDD上的方法)在driver段
2.DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中生成的Task以TaskSet的形式给TaskScheduler,在driver段
3.TaskScheduler调度Task(根据资源情况将Task调度到相应的Executor中),在driver段
4.Executor接收Task,然后将Task丢入到线程池中执行,executor中

常见术语

Application:表示你的应用程序

Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext

Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。

Cluter Manager:指的是在集群上获取资源的外部服务。目前有三种类型

Standalone : spark原生的资源管理,由Master负责资源的分配

Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架

Hadoop Yarn: 主要是指Yarn中的ResourceManager

Worker: 集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点

Task: 被送到某个Executor上的工作单元,但hadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责

Job: 包含多个Task组成的并行计算,往往由Spark Action触发生成, 一个Application中往往会产生多个Job

Stage: 每个Job会被拆分成多组Task, 作为一个TaskSet, 其名称为Stage,Stage的划分和调度是有DAGScheduler来负责的,Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方

DAGScheduler: 根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法,如下图
在这里插入图片描述TASKSedulter: 将TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。下图展示了TaskScheduler的作用
在这里插入图片描述

构建DAG

什么是DAG
  • DAG:有向无环图

  • DAG 是一组顶点和边的组合。顶点代表了 RDD, 边代表了对 RDD 的一系列操作。

  • DAG的数量由触发action次数决定

  • DAG是开始于通过SparkContext创建的RDD,结束于触发Action,调用run Job,就是一个完整的DAG就形成了。一旦触发Action就形成了一个完整的DAG。

  • 一个RDD只是描述了数据计算过程中的一个环节,而DGA由一到多个RDD组成,描述了数据计算过程中的所有环节(过程)

DAG 解决了什么问题

DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程),即描述RDD被转化的过程

DAG 的出现主要是为了解决 Hadoop MapReduce 框架的局限性。那么 MapReduce 有什么局限性呢?

主要有两个:

  • 每个 MapReduce 操作都是相互独立的,HADOOP不知道接下来会有哪些Map Reduce。
  • 每一步的输出结果,都会持久化到硬盘或者 HDFS 上。

当以上两个特点结合之后,我们就可以想象,如果在某些迭代的场景下,MapReduce 框架会对硬盘和 HDFS 的读写造成大量浪费。

而且每一步都是堵塞在上一步中,所以当我们处理复杂计算时,会需要很长时间,但是数据量却不大。

所以 Spark 中引入了 DAG,它可以优化计算计划,比如减少 shuffle 数据。

切分STAGE

DAGScheduler: 根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),将其进行切分,根据shullfe(宽依赖),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法

为什么要切分Stage?

一个复杂的业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)如果有shuffle,那么就意味着前面阶段产生的结果后,才能执行下一个阶段,下一个阶段的计算要依赖上一个阶段的数据。在同一个Stage中,会有多个算子,可以合并在一起,我们称其为pipeline(流水线:严格按照流程、顺序执行)

由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

shuffle过程可能需要完成以下过程
  • 重新进行数据分区

  • 数据传输

  • 数据压缩

  • 磁盘I/O

RDD依赖关系和宽窄依赖

RDD依赖关系,也就是有依赖的RDD之间的关系,比如RDD1------->RDD2(RDD1生成RDD2),RDD2依赖于RDD1。这里的生成也就是RDDtransformation操作

张泽立

  • 窄依赖(也叫narrow依赖)

父分区只有一个箭头,发往一个分区,子分区的数据来自部分分区

从父RDD角度看:一个父RDD只被一个子RDD分区使用。父RDD的每个分区最多只能被一个Child RDD的一个分区使用

从子RDD角度看:依赖上级RDD的部分分区 精确知道依赖的上级RDD分区,会选择和自己在同一节点的上级RDD分区,没有网络IO开销,高效。如map,flatmap,filter

  • 宽依赖(也叫shuffle依赖/wide依赖)

父分区只有多个箭头,发往多个分区,子分区的数据来自全部分区

从父RDD角度看:一个父RDD被多个子RDD分区使用。父RDD的每个分区可以被多个Child RDD分区依赖

从子RDD角度看:依赖上级RDD的所有分区 无法精确定位依赖的上级RDD分区,相当于依赖所有分区(例如reduceByKey) 计算就涉及到节点间网络传输

根本区分方式:是根据分区器是否一样(分区数量一样和分区规则一样)
或者说是否发生 shuffle(洗牌)

Spark之所以将依赖分为narrow和 shuffle:

(1) narrow dependencies可以支持在同一个集群Executor上,以pipeline管道形式顺序执行多条命令,例如在执行了map后,紧接着执行filter。分区内的计算收敛,不需要依赖所有分区的数据,可以并行地在不同节点进行计算。所以它的失败恢复也更有效,因为它只需要重新计算丢失的parent partition即可,

(2)shuffle dependencies 则需要所有的父分区都是可用的,必须等RDD的parent partition数据全部ready之后才能开始计算,可能还需要调用类似MapReduce之类的操作进行跨节点传递。从失败恢复的角度看,shuffle dependencies 牵涉RDD各级的多个parent partition。

步骤详解:

1.从代码构建DAG图

Spark的计算发生在RDD的Action操作,而对Action之前的所有Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。
Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。

2.将DAG划分为Stage核心算法,通过DAGSchedule
Application多个job多个Stage:

Spark Application中可以因为不同的Action触发众多的job,一个Application中可以有很多的job,每个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

划分依据:

Stage划分的依据就是宽依赖,何时产生宽依赖(shuffle),reduceByKey, groupByKey等算子,会导致宽依赖的产生。

核心算法:
  1. 从后往前回溯/反向解析,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。

  2. Spark内核会从触发Action操作的那个RDD开始从后往前推,

  3. 首先会为最后一个RDD创建一个stage,

  4. 然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD。

  5. 然后依次类推,继续倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。

3.提交Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler通过TaskScheduler接口提交任务集。
这个任务集最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个任务集的生命周期,
对于DAGScheduler来说,提交调度阶段的工作到此就完成了。
而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的任务到对应的Executor节点上进行运算。

在这里插入图片描述

spark总体流程

在这里插入图片描述在这里插入图片描述在这里插入图片描述

总体流程详解

在这里插入图片描述

  1. 构建Spark Application的运行环境
  val conf = new SparkConf();
conf.setAppName("test01")
conf.setMaster("local")
val sc = new SparkContext(conf)
  1. 启动SparkContext,SparkContext(Driver)向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动与master建立连接

  2. master和worker进行RPC通信,让worker启动executor

  3. 各个worker启动executor,然后和executor进行通信,汇报自己的状态(具体流程参考之前的博客)

  4. RDD触发 Action后,会根据最后这个RDD从后往前推断依赖关系,遇到 shuffler就切分 Stage,会递归切分,递归的出口是某个RDD没有父RDD了(上文有算法详情)

/指定以后从哪里读取数据创建RDD(弹性分布式数据集)
    val lines: RDD[String] = sc.textFile("hdfs://node-4:9000/wc1", 1)
     //分区数量
    partitionsNum=lines.partitions.length
    
    //切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))
    
    //将单词和一组合
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    
    //按key进行聚合
    val reduced:RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
   
    //排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    
    //将结果保存到HDFS中,触发action
    reduced.saveAsTextFile(args(1))
    
    //释放资源
    sc.stop()
  1. DAGSchedulert切分完 Stage后,先提交前西的Stage,执行完后在提交后面的Stage.Spge会生产Task,一个 Stage会生产很多业务逻辑相同的Task,后将以TaskSet形式传给Taskscheduler,然后 TaskSheduler将Task序列化,根据资源情况,发送给各个Executor
  2. Executor接收到Task后,先将Task反序列化,然后将Task用一个实现了 Runnable接口的实现类装起来,然后将该包装类丢入到线程池,然后包装类的run方法就会被执行,进而调用Task的计算逻辑
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/heartless_killer/article/details/104754800

智能推荐

Spring学习02:面向切面编程(AOP)_maoritian-程序员宅基地

文章浏览阅读4.3k次,点赞10次,收藏71次。Spring学习02:面向切面编程[AOP]代码冗余与装饰器模式代码冗余现象解决代码冗余的思路: 装饰模式和动态代理动态代理的写法使用动态代理解决代码冗余现象使用SpringAOP解决代码冗余AOP相关术语使用XML配置AOP使用XML配置AOP的步骤切入点表达式环绕通知使用注解配置AOP半注解配置AOP用于声明切面的注解用于声明通知的注解用于指定切入点表达式的注解纯注解配置AOP代码冗余与装饰..._maoritian

部署虚拟机到云服务器上的方法详解-程序员宅基地

文章浏览阅读1.4k次。云服务器上可以部署虚拟机 内容精选换一换默认部署在VPC下的应用可以调用API。如果域名解析失败,则参考配置内网DNS,在当前终端节点上配置DNS服务器。配置完成后,部署在VPC下的应用可以调用API。配置DNS需要配置“/etc”目录下的resolv.conf文件,指定DNS服务器的IP地址。内网DNS服务器的IP地址与您所位于的区域相关,您可通过表1获取内网DNS服务器的I华为云VPC是用户在..._虚拟机如何映射到弹性云服务器上

sql 以log形式输出_sqlserver审计日志输出为log格式-程序员宅基地

文章浏览阅读592次。mybatis-config.xml 文件下加上下面这段配置信息就可以了:<settings> <setting name="logImpl" value="STDOUT_LOGGING" /> </setting>=------------------------------------------------------分隔符--------------..._sqlserver审计日志输出为log格式

国外部分音乐人工智能/音乐科技研究机构科研项目简介_斯坦福大学 ccrma 音乐和声学计算机研究中心-程序员宅基地

文章浏览阅读3.9k次,点赞5次,收藏17次。本文对国外部分音乐人工智能/音乐科技科研机构的科研项目与教学课程设置作简要介绍,包括英国伦敦玛丽女王大学(Queen Mary)的数字音乐中心(C4DM)、西班牙巴塞罗那庞培法布拉大学(UPF)的音乐技术研究组(MTG)、美国斯坦福大学的音乐与声学计算机研究中心(CCRMA)以及法国的声学/音乐协调研究所(IRCAM)。_斯坦福大学 ccrma 音乐和声学计算机研究中心

Eclipse插件(RCP)项目搭建_eclipse rcp-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏11次。介绍最新的Eclipse RCP 开发工具创建RCP项目的过程。_eclipse rcp

GC算法与分类(Java虚拟机)_gc淘汰机制-程序员宅基地

文章浏览阅读243次。一 . GC算法总结 1.引用计数(淘汰) 2.标记-清除 3.标记-压缩 4.复制算法 -新生代二 . 理解分代思想: 1.依据对象的存活周期进行分类,短命对象为新生代,长命对象归为老年代. 2.根据不同年代的特点,选取合适的收集算法 - 少量对象存活,适合复制算法 - 大量对象存活,适合标记清理或者标记压缩所有的算法,都..._gc淘汰机制

随便推点

imx226_IMX226CQJ-海思网络摄像芯片-程序员宅基地

文章浏览阅读994次。产品详情IMX226CQJ-海思网络摄像芯片广州上诚视视源信息科技有限公司联系人:陈先生QQ号:1695823546手机:13189717928 (威信同号)海思芯片: www.sxj3.com联系地址:深圳市福田区华强北华强电子市场海思系列HI3516DRBCV100、HI3520DRQCV200、HI3516CRBCV200、HI3518CRBCV100、HI3520DRQCV300、HI35..._imx226参数

52单片机中断_52单片机外部中断-程序员宅基地

文章浏览阅读4.6k次,点赞4次,收藏13次。一 中断的概念CPU在处理某一事件A时,发生了另一事件B请求CPU迅速去处理(中断发生);CPU暂时中断当前的工作,转去处理事件B(中断响应和中断服务);待CPU将事件B处理完毕后,再回到原来事件A被中断的地方继续处理事件A(中断返回),这一过程被称为中断。概念可能不好理解,举个例子:假如你是个老师,你在上课,突然电话响了,一看是你老婆打过来的,你不敢不接,于是你只好暂停讲课接听电话,挂完电话,你继续讲课。这个过程中实际发生了一次中断。二 中断的响应条件1.中断源有中断请求;2.此中断_52单片机外部中断

redis数据丢失及解决-程序员宅基地

文章浏览阅读105次。Redis的数据回写机制Redis的数据回写机制分同步和异步两种,同步回写即SAVE命令,主进程直接向磁盘回写数据。在数据大的情况下会导致系统假死很长时间,所以一般不是推荐的。异步回写即BGSAVE命令,主进程fork后,复制自身并通过这个新的进程回写磁盘,回写结束后新进程自行关闭。由于这样做不需要主进程阻塞,系统不会假死,一般默认会采用这个方法。个人感觉方法2采用fork主..._redis set get丢失

图表点编辑数据无反应_word插入图表无法编辑数据-程序员宅基地

文章浏览阅读5.7k次。Word2007文档中的图表功能相对于Word2003的图表工具Microsoft Graph而言,应用更灵活,功能更强大。要想充分发挥图表功能,用户应当在Word2007文档中创建图表,而不是在Word97~Word2003兼容文档中使用图表功能。在Word2007文档中创建图表的步骤如下所述:第1步,打开Word2007文档窗口,切换到“插入”功能区。在“插图”分组中单击“图表”按钮,如图20..._word点编辑数据无反应

BP神经网络公式推导-程序员宅基地

文章浏览阅读2k次。一、m-p神经元模型:神经网络是由许多个神经云所构成,而一个单独的神经元的结构如上图所示。若一个神经元有n个输入xxx,每一个输入都要配有一个权值www,神经元的总输入为:∑i=1nwixi\sum_{i=1}^n w_ix_ii=1∑n​wi​xi​生物的神经元需要收到一定程度的刺激才可被激活,所以用θ\thetaθ表示神经元的阈值,当总输入高于此值,该神经元才可被激活。神经元的输出yyy取决于激发函数y=f(x)y=f(x)y=f(x),神经元总输入减去阈值得到的值:∑i=1nwixi−θ\su_bp神经网络公式

用python制作“简易”比心表情包-程序员宅基地

文章浏览阅读4k次,点赞9次,收藏34次。# 比心表情包import cv2import numpy as npimport matplotlib.pyplot as plt# 创建一幅白色的图片img = np.zeros((500, 500, 3), np.uint8) + 255# 画头部(用圆来代替)cv2.circle(img, (100, 200), 20, (0, 0, 255), 1)# 画眼睛(用...