扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容主要讲解“spark的RDD、算子、持久化算子分别是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark的RDD、算子、持久化算子分别是什么”吧!
创新互联主要从事成都网站制作、做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务大石桥,十多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18980820575
一:RDD的介绍
RDD(Resilient Distributed Dateset),弹性分布式数据集。
RDD的五大特性:
1.RDD是由一系列的partition组成的。
2.函数是作用在每一个partition(split)上的。
3.RDD之间有一系列的依赖关系。
4.分区器是作用在K,V格式的RDD上。
5.RDD提供一系列最佳的计算位置。
注意:
textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
RDD实际上不存储数据,这里方便理解,暂时理解为存储数据。
什么是K,V格式的RDD?
如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的。RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。
RDD的创建方式
XX.parallelize() XX.makeRDD()
二、算子
Transformations转换算子有如下:
filter 过滤符合条件的记录数,true保留,false过滤掉 map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素 特点:输入一条,输出一条数据 flatMap 先map后flat。与map类似,每个输入项可以映射为0到多个输出项 sample 随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样 reduceByKey 将相同的Key根据相应的逻辑进行处理 sortByKey/sortBy 作用在K,V格式的RDD上,对key进行升序或者降序排序 join,leftOuterJoin,rightOuterJoin,fullOuterJoin 作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W)),join后的分区数与父RDD分区数多的那一个相同。 union 合并两个数据集。两个数据集的类型要一致,返回新的RDD的分区数是合并RDD分区数的总和。 intersection 取两个数据集的交集,返回新的RDD与父RDD分区多的一致 subtract 取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。 mapPartitions 与map类似,遍历的单位是每个partition上的数据。 distinct 实际内部使用的是map+reduceByKey+map,就是去重的意思 cogroup 当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable,Iterable )),子RDD的分区与父RDD多的一致 mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值。 repartition 增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle) coalesce 用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。true为产生shuffle,false不产生shuffle。默认是false。 如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true) groupByKey 作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable )。 zip 将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同 zipWithIndex 该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
Action行动算子有如下:
count 返回数据集中的元素数。会在结果计算完成后回收到Driver端 take(n) 返回一个包含数据集前n个元素的集合 first first=take(1),返回数据集中的第一个元素 foreach 循环遍历数据集中的每个元素,运行相应的逻辑 collect 将计算结果回收到Driver端
转换算子和行动算子的区别:
大多数情况下转换算子返回的内心是RDD类型,行动算子返回的类型大多常常是普通类型,当不知道算子是属于那个类型算子的时候,可以用这个推测
最后一种叫做:控制算子,分别是cache、persist、checkpoint,cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系
三、案列
案列1:cache
请求数据:words数据有约1700万条记录,文件大小约200m大小
结果:
没用做持久化算子处理:56995 毫秒 cache: 274 毫秒
scala代码
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() //默认将RDD的数据持久化到内存中。cache是懒执行,使用persist时候,与cache搭配使用,效率要高点 //不能这样写:rdd.cache().count()返回的不是持久化的RDD,而是一个数值了 result.count() println("没用做持久化算子处理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //默认将RDD的数据持久化到内存中。cache是懒执行 result.cache() //cache和persist都是懒执行,必须有一个action类算子触发执行 result.count() println("cache: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") }
案列2: persist
数据:words数据有约1700万条记录,文件大小约200m大小
结果:
没用做持久化算子处理:55350毫秒 persist: 312 毫秒
scala代码
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() result.count() println("没用做持久化算子处理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //默认将RDD的数据持久化到内存中。cache是懒执行,使用persist时候,与cache搭配使用,效率要高点 //不能这样写:rdd.persist().count()返回的不是持久化的RDD,而是一个数值了 //persist()内部就是调用cache() result.persist() //1cache和persist都是懒执行,必须有一个action类算子触发执行。 result.count() println("persist: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") } }
案列3: checkpoint
数据:words数据有约1700万条记录,文件大小约200m大小
结果:
没用做持久化算子处理:55575 毫秒 checkpoint: 55851 毫秒 同时会在指定的输出路径中多出持久化到硬盘的数据文件 由于要持久化到硬盘中,速度要慢很多 可以与result.cache()搭配使用 搭配使用后的运行结果 checkpoint: 54621 毫秒 比之前的少了1000多毫秒,result使用cache后,以前处理的数据都放到缓存中去了,所以要稍微快一点
scala代码
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) //设置checkpoint输出路径 context.setCheckpointDir("./checkpoint_file") //数据来源 val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() result.count() println("没用做持久化算子处理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //不能这样:rdd.checkpoint().count() result.checkpoint() /** 一个action类原子就是会启动一个job 1.当RDD的job执行完毕后,会从finalRDD从后往前回溯。 2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。 3.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步 */ //result.cache() result.count() println("checkpoint: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") }
总结:
cache和persist的注意事项: 1.cache和persist都是懒执行,必须有一个action类算子触发执行。 2.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。 3.cache和persist算子后不能立即紧跟action算子。 4.cache和persist算子持久化的数据当applilcation执行完成之后会被清除。 错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。 checkpoint checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。 checkpoint 的执行原理: 1.当RDD的job执行完毕后,会从finalRDD从后往前回溯。 2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。 3.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步
到此,相信大家对“spark的RDD、算子、持久化算子分别是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流