前言
本文将详细描述Spark的整体架构,读者需具备一定的Spark基础知识,至少了解Spark的RDD和DAG概念。
Spark 架构图
术语说明:
Driver的主要功能如下:
根据官网,Spark应用在Yarn上启动有两种模式:在集群模式下,Spark driver在应用主进程(Application Master)中运行;在客户端模式下,driver在客户端进程中运行。
yarn-cluster模式下
在yarn-cluster模式下,客户端将用户程序提交到Spark集群后便与集群断开连接,客户端仅负责提交任务。在这种模式下,AM和driver是同一实体,但官网描述为driver运行在AM内部,可以理解为AM包含了driver的功能,类似于driver运行在AM中。此时,AM既能向资源管理器申请并分配资源,又能完成driver的RDD划分和任务提交等工作。
Executor是Spark任务(task)的执行单元,运行在worker节点上,但不等于worker,实际上它是一组计算资源(CPU核心、内存)的集合。一个worker节点上的内存和CPU由多个Executor共享。
spark.executor.instances = spark.max.cores / spark.executor.cores
集群中Executor的数量由spark.max.cores和spark.executor.cores共同决定。其中,spark.cores.max是指Spark程序需要的总核心数,spark.executor.cores是指每个Executor需要的核心数。
指定并行任务数量的参数:
spark.default.parallelism=1000
参数说明:该参数用于设置每个stage的默认任务数量。这个参数非常重要,如果未设置,可能直接影响Spark作业的性能。
参数调优建议:Spark作业的默认任务数量设置为500到1000个较为合适。许多用户常犯的错误是不设置此参数,导致Spark根据底层HDFS的block数量自动设置任务数量,默认是一个HDFS block对应一个任务。通常,Spark默认设置的数量偏少(如几十个任务),如果任务数量过少,会导致之前设置的Executor参数无效。假设Executor进程有多个,内存和CPU资源充足,但任务只有一个或十个,那么90%的Executor进程可能没有任务执行,资源浪费严重。
Spark官网建议的设置原则是,将该参数设置为num-executors * executor-cores的2到3倍较为合适。例如,Executor的总CPU核心数量为300个,设置1000个任务是合理的,可以充分利用Spark集群的资源。
DAG调度器(DAG: 有向无环图)
Executor:进程——运行在工作节点上,负责运行Task。
Task:Executor的工作单元,也称为任务。
Job:用户提交的作业,包含多个Task。
Stage:是Job的基本调用单元,Job根据宽窄依赖划分为不同的Stage,一个Stage中包含一个或多个相同类型的Task。
一个Application由一个Driver和多个Job构成,一个Job由多个Stage构成,一个Stage由多个没有Shuffle关系的Task组成。
JVM堆空间下Spark的内存分配
任何Spark进程都是一个JVM进程,因此可以配置其堆大小(-Xmx和-Xms)。但进程如何使用堆内存和为何需要它?以下是JVM堆空间下Spark的内存分配情况:
默认情况下,Spark进程的堆空间为512MB。为了安全考虑并避免OOM,Spark只允许使用90%的堆空间,Spark使用spark.storage.safetyFraction配置该值(默认是0.9)。作为一个内存计算工具,Spark可以在内存中存储数据。通过阅读http://0x0fff.com/spark-misconceptions/,会发现Spark不是真正的内存工具,它只是将内存用作LRU缓存,因此大量内存被用来缓存正在计算的数据,这部分占用安全堆的60%,Spark使用spark.storage.memoryFraction控制该值。如果想知道Spark中能缓存多少数据,可以统计所有Executor的堆大小,乘以safeFraction和memoryFraction,默认是54%,这就是Spark可用于缓存数据的堆大小。
该部分介绍shuffle的内存使用情况,它通过堆大小 spark.shuffle.safetyFraction spark.shuffle.memoryFraction计算。spark.shuffle.safetyFraction的默认值是0.8,spark.shuffle.memoryFraction的默认值是0.2,因此最多只能使用堆空间的16%用于shuffle。关于如何使用这块内存,参考https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala。然而,Spark通常使用这块内存用于shuffle中的其他任务,当执行shuffle时,有时需要对数据进行排序,需要缓冲排序后的数据(注意不能改变LRU缓冲中的数据,因为后面可能需要重用),这需要大量的RAM存储排序后的数据块。如果没有足够的内存用于排序,可以参考外排的实现,一块一块地排序,然后最终合并。
最后要讲到的一块内存是"unroll",该块内存用于unroll,计算如下:spark.storage.unrollFraction spark.storage.memoryFraction spark.storage.safetyFraction。当我们需要在内存中展开数据块时使用它。为什么需要展开?因为Spark允许以序列化和非序列化两种方式存储数据,序列化后的数据无法直接使用,因此使用时必须展开。这部分内存占用缓存的内存,所以如果需要内存用于展开数据时,如果此时内存不够,Spark LRU缓存中的数据会被删除一些块。
YARN模式下的JVM堆内存
现在应该清楚了解Spark如何使用JVM中的堆内存了,现在切换到集群模式,当启动一个Spark集群时,如何看待它?以下是YARN模式下的架构:
当在YARN集群上运行时,YARN的ResourceMananger用于管理集群资源,每个节点上的NodeManager用于控制节点上的资源。从YARN的角度来看,每个节点被视为可分配的资源池。当向ResourceManager请求资源时,它返回一些NodeManager信息,这些NodeManager将为你提供执行容器,每个执行容器就是满足请求的堆大小的JVM进程,JVM进程的位置由ResourceMananger管理,不能自己控制。如果一个节点有64GB的内存被YARN管理(通过yarn.nodemanager.resource.memory-mb配置),当请求10个4GB内存的Executor时,这些Executor可能运行在同一个节点上。
在YARN上启动Spark集群时,可以指定:
执行器的数量(-num-executors 或 spark.executor.instances),每个执行器使用的内存(-executor-memory 或 spark.executor.memory),每个执行器使用的CPU核心数(-executor-cores 或 spark.executor.cores),每个任务执行使用的核心数(spark.task.cpus),驱动程序应用使用的内存(-driver-memory 和 spark.driver.memory)
当在集群上执行应用时,作业会被切分成stages,每个stage切分成tasks,每个task单独调度。可以将Executor的JVM进程看作任务执行池,每个Executor有:
spark.executor.cores / spark.task.cpus
个执行槽。例子:集群有12个节点运行YARN的NodeManager,每个节点有64GB内存和32个CPU核心,每个节点可以启动2个Executor,每个Executor使用26GB内存,剩余内存用于系统和其他服务,每个Executor有12个CPU核心用于执行任务,这样整个集群有:
12 machines * 2 executors per machine * 12 cores per executor / 1 core = 288
个任务执行槽,这意味着Spark集群可以同时运行288个任务,整个集群用于缓存数据的内存有:
0.9 * spark.storage.safetyFraction * 0.6 * spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB.
到目前为止,我们已经了解了Spark如何使用JVM的内存以及集群上的执行槽是什么,但还没有讨论任务的一些细节,这将在另一篇文章中详细说明。基本上,任务是Spark的一个工作单元,作为Executor的JVM进程中的一个线程执行,这也是Spark作业启动时间快的原因,在JVM中启动一个线程比启动一个单独的JVM进程快(在Hadoop中执行MapReduce应用会启动多个JVM进程)。
Spark抽象:partition
Spark处理的所有数据都会被切分成partition,一个partition是什么以及如何确定?partition的大小完全依赖于数据源。Spark中大部分用于读取数据的方法都可以指定生成的RDD中的partition数量。当从HDFS上读取一个文件时,会使用Hadoop的InputFormat来处理,默认情况下InputFormat返回的每个InputSplit会映射到RDD中的一个partition。大部分存储在HDFS上的文件,每个数据块会生成一个InputSplit,每个数据块大小为64MB或128MB。因为HDFS上的数据块边界是按字节计算的(64MB一个块),但在处理时,它又要按记录进行切分。对于文本文件来说,切分的字符是换行符;对于sequence文件来说,是块结束。如果是压缩文件,整个文件都被压缩了,不能按行进行切分,整个文件只有一个InputSplit,这样Spark中也会只有一个partition,在处理时需要手动进行repartition。
Hive on Spark调优:参数配置样例
set hive.execution.engine=spark; set spark.executor.memory=4g; set spark.executor.cores=2; set spark.executor.instances=40; set spark.serializer=org.apache.spark.serializer.KryoSerializer;
之前在Hive on Spark上运行100GB的数据量需要跑十几个小时,查看CPU和内存监控,发现POWER_TEST阶段(依次执行30个查询)CPU只用了百分之十几,也就是没有充分利用整个集群的性能,导致运行速度很慢。因此,调整参数以使整个集群发挥最大性能显得尤为重要。
Spark作业运行原理
详细原理见上图。我们使用spark-submit提交一个Spark作业后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU核心。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU核心。
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
task的执行速度是与每个Executor进程的CPU核心数量直接相关的。一个CPU核心同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU核心数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
以上就是图文详解 Spark 总体架构 [禅与计算机程序设计艺术]的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号