扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
RDD
全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:
松原网站制作公司哪家好,找创新互联公司!从网页设计、网站建设、微信开发、APP开发、自适应网站建设等网站项目制作,到程序开发,运营维护。创新互联公司2013年至今到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联公司。
RDD[T]
抽象类的部分相关代码如下:
// 由子类实现以计算给定分区
def compute(split: Partition, context: TaskContext): Iterator[T]
// 获取所有分区
protected def getPartitions: Array[Partition]
// 获取所有依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 获取优先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// 分区器 由子类重写以指定它们的分区方式
@transient val partitioner: Option[Partitioner] = None
RDD 有两种创建方式,分别介绍如下:
这里使用 spark-shell
进行测试,启动命令如下:
spark-shell --master local[4]
启动 spark-shell
后,程序会自动创建应用上下文,相当于执行了下面的 Scala 语句:
val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]")
val sc = new SparkContext(conf)
由现有集合创建 RDD,你可以在创建时指定其分区个数,如果没有指定,则采用程序所分配到的 CPU 的核心数:
val data = Array(1, 2, 3, 4, 5)
// 由现有集合创建 RDD,默认分区数为程序所分配到的 CPU 的核心数
val dataRDD = sc.parallelize(data)
// 查看分区数
dataRDD.getNumPartitions
// 明确指定分区数
val dataRDD = sc.parallelize(data,2)
引用外部存储系统中的数据集,例如本地文件系统,HDFS,HBase 或支持 Hadoop InputFormat 的任何数据源。
val fileRDD = sc.textFile("/usr/file/emp.txt")
// 获取第一行文本
fileRDD.take(1)
使用外部存储系统时需要注意以下两点:
两者都可以用来读取外部文件,但是返回格式是不同的:
RDD[String]
,返回的是就是文件内容,RDD 中每一个元素对应一行数据;RDD[(String, String)]
,元组中第一个参数是文件路径,第二个参数是文件内容;def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}
RDD 支持两种类型的操作:transformations(转换,从现有数据集创建新数据集)和 actions(在数据集上运行计算后将值返回到驱动程序)。RDD 中的所有转换操作都是惰性的,它们只是记住这些转换操作,但不会立即执行,只有遇到 action 操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。
val list = List(1, 2, 3)
// map 是一个 transformations 操作,而 foreach 是一个 actions 操作
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出: 10 20 30
Spark 速度非常快的一个原因是 RDD 支持缓存。成功缓存后,如果之后的操作使用到了该数据集,则直接从缓存中获取。虽然缓存也有丢失的风险,但是由于 RDD 之间的依赖关系,如果某个分区的缓存数据丢失,只需要重新计算该分区即可。
Spark 支持多种缓存级别 :
Storage Level (存储级别) | Meaning(含义) |
---|---|
MEMORY_ONLY | 默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
MEMORY_AND_DISK | 将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在需要使用这些分区时从磁盘读取。 |
MEMORY_ONLY_SER | 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增加 CPU 的计算负担。仅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER | 类似于 MEMORY_ONLY_SER ,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。仅支持 Java 和 Scala。 |
DISK_ONLY | 只在磁盘上缓存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 , etc | 与上面的对应级别功能相同,但是会为每个分区在集群中的两个节点上建立副本。 |
OFF_HEAP | 与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。这需要启用堆外内存。 |
启动堆外内存需要配置两个参数:
- spark.memory.offHeap.enabled:是否开启堆外内存,默认值为 false,需要设置为 true;
- spark.memory.offHeap.size: 堆外内存空间的大小,默认值为 0,需要设置为正值。
缓存数据的方法有两个:persist
和 cache
。cache
内部调用的也是 persist
,它是 persist
的特殊化形式,等价于 persist(StorageLevel.MEMORY_ONLY)
。示例如下:
// 所有存储级别均定义在 StorageLevel 对象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()
Spark 会自动监视每个节点上的缓存使用情况,并按照最近最少使用(LRU)的规则删除旧数据分区。当然,你也可以使用 RDD.unpersist()
方法进行手动删除。
在 Spark 中,一个任务对应一个分区,通常不会跨分区操作数据。但如果遇到 reduceByKey
等操作,Spark 必须从所有分区读取数据,并查找所有键的所有值,然后汇总在一起以计算每个键的最终结果 ,这称为 Shuffle
。
Shuffle 是一项昂贵的操作,因为它通常会跨节点操作数据,这会涉及磁盘 I/O,网络 I/O,和数据序列化。某些 Shuffle 操作还会消耗大量的堆内存,因为它们使用堆内存来临时存储需要网络传输的数据。Shuffle 还会在磁盘上生成大量中间文件,从 Spark 1.3 开始,这些文件将被保留,直到相应的 RDD 不再使用并进行垃圾回收,这样做是为了避免在计算时重复创建 Shuffle 文件。如果应用程序长期保留对这些 RDD 的引用,则垃圾回收可能在很长一段时间后才会发生,这意味着长时间运行的 Spark 作业可能会占用大量磁盘空间,通常可以使用 spark.local.dir
参数来指定这些临时文件的存储目录。
由于 Shuffle 操作对性能的影响比较大,所以需要特别注意使用,以下操作都会导致 Shuffle:
repartition
和 coalesce
;groupByKey
和 reduceByKey
,但 countByKey
除外;cogroup
和 join
。RDD 和它的父 RDD(s) 之间的依赖关系分为两种不同的类型:
如下图,每一个方框表示一个 RDD,带有颜色的矩形表示分区:
区分这两种依赖是非常有用的:
RDD(s) 及其之间的依赖关系组成了 DAG(有向无环图),DAG 定义了这些 RDD(s) 之间的 Lineage(血统) 关系,通过血统关系,如果一个 RDD 的部分或者全部计算结果丢失了,也可以重新进行计算。那么 Spark 是如何根据 DAG 来生成计算任务呢?主要是根据依赖关系的不同将 DAG 划分为不同的计算阶段 (Stage):
更多大数据系列文章可以参见 GitHub 开源项目: 大数据入门指南
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流