下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

全面介绍大数据系列之并行计算引擎Spark

作者:课课家教育     来源: http://www.kokojia.com点击数:696发布时间: 2017-05-03 10:02:12

标签: 大数据Hadoop数据分析

     今天,课课家就来和大家一起探讨一下大数据系列之并行计算引擎Spark,对这方面不是和了解的小伙伴,可以参考一下,对这方面有独特见解的大声,可以交流一下哟!一定要认真阅读哦!

     Spark:

  Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。

  Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

  Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

  Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

  尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。

  Spark的性能特点:

  1.更快的速度:内存计算下,Spark 比 Hadoop 快100倍。

  内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的I/O开销

  DAG引擎,减少多次计算之间中间结果写到HDFS的开销;

  使用多线程池模型来减少task启动开销,shuffle过程中避免不必要的sort操作已经减少磁盘I/O操作;

  2.易用性:

  Spark 提供了80多个高级运算符。

  提供了丰富的API,支持java,Scala,Python和R四种语言;

  代码量比MapReduce少2~5倍;

  3.通用性:Spark 提供了大量的库,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 开发者可以在同一个应用程序中无缝组合使用这些库。

  4.支持多种资源管理器:Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器

  Spark基本原理:

  Spark Streaming:构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+),虽然比不上专门的流式数据处理软件,也可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),一部分窄依赖的RDD数据集可以从源数据重新计算达到容错处理目的。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

  Spark背景:

  1.MapReduce局限性:

全面介绍大数据系列之并行计算引擎Spark_大数据_Hadoop_数据分析_课课家教育平台

  1.仅支持Map和Reduce两种操作;

  2.处理效率低效;不适合迭代计算(如机器学习、图计算等),交互式处理(数据挖掘)和流失处理(日志分析)

  3.Map中间结果需要写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据;

  4.任务调度和启动开销大;

  5.无法充分利用内存;(与MR产生时代有关,MR出现时内存价格比较高,采用磁盘存储代价小)

  6.Map端和Reduce端均需要排序;

  7.MapReduce编程不够灵活。(比较Scala函数式编程而言)

  8.框架多样化[采用一种框架技术(Spark)同时实现批处理、流式计算、交互式计算]:

  批处理:MapReduce、Hive、Pig;

  流式计算:Storm

  交互式计算:Impala

  Spark核心概念:

  RDD:Resilient Distributed Datasets,弹性分布式数据集

RDD:Resilient Distributed Datasets,弹性分布式数据集

  分布在集群中的只读对象集合(由多个Partition 构成);

  可以存储在磁盘或内存中(多种存储级别);

  通过并行“转换”操作构造;

  失效后自动重构;

  RDD基本操作(operator)

可以存储在磁盘或内存中(多种存储级别);    通过并行“转换”操作构造;

  Transformation具体内容

  map(func) :返回一个新的分布式数据集,由每个原元素经过func函数转换后组成

  filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成

  *flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  sample(withReplacement, frac, seed) :

  根据给定的随机种子seed,随机抽样出数量为frac的数据。

  union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成

  groupByKey([numTasks]) :

  在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task

  reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。

  join(otherDataset, [numTasks]) :

  在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

  groupWith(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup

  cartesian(otherDataset) : 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。

  flatMap(func) :

  类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  Actions具体内容

  reduce(func) : 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行

  collect() : 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM

  count() : 返回数据集的元素个数

  take(n) : 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)

  first() : 返回数据集的第一个元素(类似于take(1))

  saveAsTextFile(path) : 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本

  saveAsSequenceFile(path) : 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)

  foreach(func) : 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

  在迭代之间使用内存做数据传输的并行计算框架是当前的一个研究热点。与传统的基于硬盘和网络的计算方式相比,使用内存可以减少数据传输的时间。对于数据密集类型的任务,可以将运行时间提升十几倍。在新一代框架快速发展的同时,如何充分利用相对仍然紧缺的内存资源,保证任务的运行效率,成为一个亟待解决的问题。本文基于集群计算引擎Spark,研究了并行计算集群对于内存的使用行为。通过对内存行为进行建模与分析,对内存的使用进行了决策自动化以及替换策略优化。提高了任务在资源有限情况下的运行效率,以及在不同集群环境下任务效率的稳定性。本文的贡献主要有:通过对代码的语义进行分析,实现了内存策略的自动化。即调度器可以自动识别出价值的数据集(RDD)放入缓存,避免缓存存污染的同时,也减轻了程序员的编程负担。在对代码语义分析,获得任务详细信息的基础上,对内存使用的替换策略进行了优化。主要包括RDD大小和权重的计算,操作顺序的优化重排,使用寄存器分配模型加权重信息。

  算子分类

  大致可以分为三大类算子:

  Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。

  Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。

  Action算子,这类算子会触发SparkContext提交Job作业。

Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对

 

  Spark RDD cache/persist

  Spark RDD cache

  1.允许将RDD缓存到内存中或磁盘上,以便于重用

  2.提供了多种缓存级别,以便于用户根据实际需求进行调整

允许将RDD缓存到内存中或磁盘上以便于重用    2.提供了多种缓存级别,以便于用户根据实际需求进行调整

  3.cache使用

之前用MapReduce实现过WordCount,现在我们用Scala实现下wordCount.是不是很简洁呢

  之前用MapReduce实现过WordCount,现在我们用Scala实现下wordCount.是不是很简洁呢?!

  import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String]) { if (args.length == 0) { System.err.println("Usage: SparkWordCount ") System.exit(1) } val conf = new SparkConf().setAppName("SparkWordCount") val sc = new SparkContext(conf) val file=sc.textFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/README.md") val counts=file.flatMap(line=>line.split(" ")) .map(word=>(word,1)) .reduceByKey(_+_) counts.saveAsTextFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/countReslut.txt") } }

  关于RDD的Transformation与Action的特点我们介绍下;

  1.接口定义方式不同:

  Transformation: RDD[X]–>RDD[y]

  Action:RDD[x]–>Z (Z不是一个RDD,可能是一个基本类型,数组等)

  2.惰性执行:

  Transformation:只会记录RDD转化关系,并不会触发计算

  Action:是触发程序执行(分布式)的算子。

Transformation:只会记录RDD转化关系,并不会触发计算    Action:是触发程序执行(分布式)的算子。

  程序的执行流程:

Transformation:只会记录RDD转化关系,并不会触发计算    Action:是触发程序执行(分布式)的算子。

  Spark运行模式:

  Local(本地模式):

  1.单机运行,通常用于测试;

  local:只启动一个executor

  local[k]:启动k个executor

  local[*]:启动跟cpu数目相同的executor

  2.standalone(独立模式)

  独立运行在一个集群中

Transformation:只会记录RDD转化关系,并不会触发计算    Action:是触发程序执行(分布式)的算子。

  3.Yarn/mesos

  1.运行在资源管理系统上,比如Yarn或mesos

  2.Spark On Yarn存在两种模式

  yarn-client

运行在资源管理系统上,比如Yarn或mesos    2.Spark On Yarn存在两种模式    yarn-client

  yanr-cluster

运行在资源管理系统上,比如Yarn或mesos    2.Spark On Yarn存在两种模式    yarn-client

  两种方式的区别:

Spark在企业中的应用场景基于日志数据的快速查询系统业务;    构建于Spark之上的SparkSQL ,利用其快速以及内存表等优势,承担了日志数据的即席查询工作。

  Spark在企业中的应用场景

  基于日志数据的快速查询系统业务;

  构建于Spark之上的SparkSQL ,利用其快速以及内存表等优势,承担了日志数据的即席查询工作。

  典型算法的Spark实现

  预测用户的广告点击概率;

  计算两个好友间的共同好友数;

  用于ETL的SparkSQL和DAG任务。

  数据倾斜

  数据倾斜意味着某一个或某几个Partition中的数据量特别的大,这意味着完成针对这几个Partition的计算需要耗费相当长的时间。

  如 果大量数据集中到某一个Partition,那么这个Partition在计算的时候就会成为瓶颈。图1是Spark应用程序执行并发的示意图,在 Spark中,同一个应用程序的不同Stage是串行执行的,而同一Stage中的不同Task可以并发执行,Task数目由Partition数来决 定,如果某一个Partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的Stage无法开始,整个Job完成的时间就会非 常长。

  要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在Spark中Block使用 了ByteBuffer来存储数据,而ByteBuffer能够存储的最大数据量不超过2GB。如果某一个key有大量的数据,那么在调用cache或 persist函数时就会碰到spark-1476这个异常。

  下面列出的这些API会导致Shuffle操作,是数据倾斜可能发生的关键点所在

  1. groupByKey

  2. reduceByKey

  3. aggregateByKey

  4. sortByKey

  5. join

  6. cogroup

  7. cartesian

  8. coalesce

  9. repartition

  10. repartitionAndSortWithinPartitions

要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在Spark中Block使用 了ByteBuffer来存储数据,而ByteBuffer能够存储的最大数据量不超过2GB。如果某一个key有大量的数据,那么在调用cache或 persist函数时就会碰到spark-1476这个异常。

     结束语:以上内容,就是介绍大数据系列之并行计算引擎Spark,如果各位小伙伴想要了解更多关于这方面的知识,随时可以登录课课家教育平台浏览,欢迎你随时登录哦~

赞(26)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程