大数据学习之spark,一篇文章让你学会spark

大数据学习之spark

Spark简介

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Spark的Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少了磁盘数据交互,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法。

Spark的组件可以分为SparkCore快速执行引擎,支持多种语言;SparkStreaming流式计算框架;SparkGraph图形计算框架;MLbase机器学习;SparkSql使用sql处理业务。本文谈谈SparkCore,SparkSql,SparkStreaming和Spark优化相关知识。

SparkCore

Spark系统架构

1.Master ( standalone 模式):资源管理的主节点(进程)。

2.Cluster Manager :在集群上获取资源的外部服务(例如: standalone ; yarn ; mesos )。

3.Worker ( standalone 模式):资源管理的从节点(进程)或者说是是管理本机资源的进程。

4.Application :基于 Spark 的用户程序,包含 driver 程序和运行在集群上的 executor 程序,即一个完整的 spark 应用 。

5.Dirver ( program ):用来连接工作进程( worker )的程序 。

6.Executor :是在一个 worker 进程所管理的节点上为某 Application 启动的一个个进程,这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executors 。

7.Task :被发送到 executor 上的工作单元。

8.Job :包含很多任务( Task )的并行计算,和 action 算子对应。

9.Stage :一个 job 会被拆分成很多组任务,每组任务被称为 Stage (就像 MapReduce 分为MapTask 和ReduceTask 一样)。

Spark相关名词概念

Partition

由于Spark RDD 是一种分布式的数据集,数据量很大,因此要它被切分并存储在各个结点的分区当中。

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)。

哈希分区是根据hash值进行分区,这样如果数据hash值的数目相差太大时会导致数据倾斜,为了避免造成这种情况我们可以通过范围分区(基于蓄水池初样算法)使得分区中的数据均匀。

RDD

RDD(Resilient Distributed Dataset) 弹性分布式数据集。

RDD的5大属性:

1.RDD 是由多个 partition 组成的。

2.RDD中每一个task运行在自己的partition上。

3.RDD需要依赖其他的RDD

4.分区器必需作用在 (K,V) 格式的 RDD 上。

5.RDD默认会寻找最优的计算位置(计算向数据靠拢,尽量减少数据的拉取操作)

Lineage血统:RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。

宽窄依赖和stage

窄依赖:父 RDD 和子 RDD 的 partition 之间的关系是一对一的。或者父 RDD 和子 RDD 的 partition 关系是
多对一的。不会有 shuffle 的产生。

宽依赖:父 RDD 与子 RDD 的 partition 之间的关系是一对多。会有 shuffle 的产生。

stage阶段切割规则:从后往前,遇到宽依赖就切割 stage (有n个宽依赖就有n+1个stage)。

spark算子

单文件算子:

转换算子(本质就是函数), 转换 算子是延迟执行,也叫懒加载执行(需要行动算子执行才会执行)

常见 的转换算子有filter ,map ,flatMap,sample,reduceByKey ,sortByKey / sortBy 等

行动算子, 行动算子是触发执行,一个 application 应用程序中有几个 行动算子执行,就有几个 job 运行;

count ,take(n) ,first ,foreach ,collect等;

控制算子:cache,persist,checkpoint等;

这里要聊下cache和checkpoint的异同点:他们都是做 RDD 持久化的。cache是在触发action之后,把数据写入到内存或者磁盘中,不会截断血缘关系 。checkpoint 也是在触发action之后,执行任务,单独再启动一个job,负责写入数据到hdfs中,会截断血缘关系。

多文件算子:

转换算子join,union,intersection,subtract,mapPartitions,mapPartition,distinct,cogroup等

行动算子foreachPartition等

任务执行流程

启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master 掌握了集群资源情况。此时当 我们 提交一个 Application 后,集群会根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图。每个Application中有一个或多个行动算子,一个行动算子就是一个job任务,每个任务Spark 会在 Driver 端创建两个对象DAGScheduler 和 TaskScheduler ,DAGScheduler 是任务调度的高层调度器,主要作用就是将 DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage ,然后将这些 Stage 以 TaskSet 的形式提交给 TaskScheduler ( TaskScheduler 是任务调度的低层调度器,这里 TaskSet 其实就是一个封装多个 task 任务的集合,也就是 stage 中的并行的 task 任务)。TaskSchedule 会遍历 TaskSet 集合,拿到每个 task 后会将 task 发送到 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。task 在 Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试3次。如果重试3次依然失败,那么这个 task 所在的 stage 就失败了。stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到 TaskScheduler , Stage 默认重试4次。如果重试4次以后依然失败,那么这个 job 就失败了。 job 失败了, Application 就失败了。

这里面的关系可以这样理解:

每个Application有一个或多个job,每个job中有多个stage,每个stage中有多个RDD,每个RDD中有多个分区,每个stage最后一个RDD有多少分区就有多少task。

在窄依赖stage中第一个分区到最后一个分区的计算过程称之为pipeline 管道计算模式。

Driver端所在节点的位置由任务提交方式决定,在clint模式下,在哪个节点启动进程,哪个节点就是Driver端,生产环境下不建议使用,会使Driver端在同一节点导致客户端网卡流量暴增的问题。在cluster模式下,会随机在集群一台节点启动 Driver 进程。

SparkSQL

SparkSQL的由来

SparkSQL的前身是shark,Shark 是基于 Spark 计算框架之上且兼容 Hive 语法的 SQL 执行引擎,由于底层的计算采用了Spark ,性能比 MapReduce 的 Hive 普遍快2倍以上,当数据全部加载在内存的话,将快10倍以上,因此 Shark 可以作为交互式查询应用服务来使用。 SparkSQL 产生的根本原因是其完全脱离了 Hive 的限制,能够在 scala / java 中写 SQL 语句。支持简单的 SQL 语法检查,能够在 SQL 中写 Hive 语句访问Hive 数据,并将结果取回作为 RDD 使用。

Dataset与DataFrame

可以简单的把DataFrame理解成RDD+表元数据信息,把数据文本通过构建case class样例类或StructType来实现DataFrame,进而可以通过sql语句进行RDD的操作,代替了使用scala编写代码的操作。

Dataset可以理解为DataFrame的加强版。

这里具体的DSL操作和hive中的hsql操作差不多,也就基本是以前的sql操作,这里就不展开细说了。

SparkStreaming

SparkStreaming简介

SparkStreaming是准实时数据流处理框架(默认是5秒计算一次,也可自己设置)。实时数据的来源可以是Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,在接受数据同时可以使用高级功能的复杂算子来处理流数据。最终处理后的数据可以存放在文件系统,数据库等,方便实时展现。

编程模型DStream

DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。

DStream你可以理解为对RDD的增强,Spark Streaming 计算还是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上关系。Spark Streaming 并没有直接让用户使用RDD而是自己抽象了一套DStream的概念。DStream 和 RDD 是包含的关系,你可以理解为Java里的装饰模式,也就是DStream 是对RDD的增强,但是行为表现和RDD是基本上差不多的。

DSeram的操作可以类比RDD的操作。这里要注意的是RDD是一批一批数据进行操作,而Dstrean是每隔一端时间处理数据,这里要关注一下Dstream的窗口操作,它的一个窗口会包括多个执行时间段。

SparkStreaming集成Kafka

我们把生产者的数据放入kafka中,由kafka作为中间件,SparkStreaming来消费并计算处理数据。

Kafka与Spark Streaming集成时有两种方法:旧的基于receiver的方法,新的基于direct stream的方法。

基于receiver的方法:

这里就是kafka向SparkStreaming不停地推送数据,这样的话当SparkStreaming处理不了这些数据的话,多余的数据会存储在WAL文件中,这样的话kafka的削峰就失去了意义,当大量的数据积压处理不了时,会导致SparkStreaming集群down掉。所以这种方法不适于处理大量快速产生的数据。

基于direct stream的方法:

这里就是SparkStreaming根据自己处理数据的能力去拉取Kafka中的数据,这样流程大量简化了,而且还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。

这里我们就需要自己来管理offset:

1.CheckPoint

checkpoint会记录每个批次的状态持久化到HDFS中,如果机器发生故障,或者程序故障停止,下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理

但这有个很大的弊端就是更新版本时checkpoint不能保持,这样的话会出现新旧2个checkpoint,这时无论选择哪一个checkpoint要么丢数据,要么数据重复。可以说非常的鸡肋,这只适用于版本不更新的项目。

2.Kafka Itself

这就是将kafka的offset全部交给kafka管理。

但遇到数据峰值很高的时候会导致kafka集群的磁盘io特别高,这样的话是非常不安全的。

3.Your own data store

这就是我们自己写代码管理offset,把每批次offset存储到一个外部的存储系统。

当流式项目停止后再次启动,会首先从外部存储系统读取是否记录的有偏移量,如果有的话,就读取这个偏移量,然后把偏移量集合传入到KafkaUtils.createDirectStream中进行构建InputSteam,这样的话就可以接着上次停止后的偏移量继续处理,然后每个批次中仍然的不断更新外部存储系统的偏移量,这样以来就能够无缝衔接了,无论是故障停止还是升级应用,都是透明的处理,除了相对麻烦,这无疑是可迭代和安全的方法了。

SparkStreaming反压机制

旧版本:

计算过程中出现batch processing time > batch interval的情况时,可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率。

这样的弊端是当集群处理能力高于maxRate,会造成资源浪费。

新版本:

通过反压机制(back-pressure)来动态控制数据接收速率来适配集群数据处理能力。

反压机制中的重要组件:RateController,RateEstimator,RateLimiter,令牌桶机制。

Spark 优化

资源调优

动态分配资源,给正在运行的Application分配更多的资源。

并行度调优

合理调整partition(task)的数量,减少资源的浪费进而提高spark任务的运行效率。

代码调优

RDD

避免创建重复的RDD,应该不会有人一直重复创建吧。

对多次使用的RDD持久化到内存,基于内存中的数据的操作不需要从磁盘文件中读取数据,进而提高了性能。

算子

持久化算子推荐使用checkpoint,持久化到HDFS中,砍断RDD间联系。

尽量避免使用shuffle类的算子,shuffle类算子会导致磁盘间大量的io操作。

如果一定要使用shuffle类算子尽量使用map-sid预聚合操作,combiner局部聚合,这样)可以降低shuffle write写磁盘的数据量,shuffle read拉取数据量的大小,reduce端聚合的次数。

尽量使用高性能的算子,比如使用reduceByKey替代groupByKey,mapPartition替代map,foreachPartition替代foreach。多使用repartition和coalesce算子来操作分区数量,分区数量太多会造成磁盘间大量的io操作,分区数量太少会造成task任务并行度不够,造成资源浪费。

使用广播变量

尤其是当变量大时,没必要每个Executor中都存放这个变量,我们可以把变量放在Driver中,每个Executor都可以使用到,这样大量减少了内存的使用。

使用Kryo优化序列化性能

Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

使用高性能的库fastutil

fastutil提供的集合类比我们平时使用的JDK的原生的Map、List、Set内存占用更小,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度。

数据本地化

task要计算的数据所处位置的级别依次是:

PROCESS_LOCAL:在本进程的内存中。

NODE_LOCAL:在本节点所在的磁盘上或在本节点其他进程的内存中。

NO_PREFtask:在关系型数据库中,如mysql。

RACK_LOCAL:在同机架的不同节点的磁盘或者进程的内存中.

ANY:跨机架。

我们要尽量使要计算的数据在前面的级别中(计算向数据靠拢,可以节省大量的计算资源),因此我们可以适当地增加每次发送task的等待时间。

内存调优

提高Executor总体内存的大小, 降低储存内存比例或者降低聚合内存比例。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>