RDD是Spark最核心的数据抽象,全称叫Resilient Distributed Datasets(弹性分布式数据集)。org.apache.spark.rdd.RDD
是一个抽象类,定义了RDD的基本操作和属性
1 | // 计算某个分区的数据,返回Iterator |
创建RDD
RDD可以有三种创建方式:
- 从存储系统中创建,例如HDFS、文件等等
- 从已经存在的RDD中创建,即使用Transform操作
- 从内存中的列表数据创建
下面是在内存中创建RDD的例子
1 | val rdd01 = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) |
RDD支持两类操作:
- 转换(Transform)
- 行动(Action)
当RDD执行转换操作时候,实际计算并没有被执行,只有当RDD执行行动操作时候才会触发计算任务提交,执行相应的计算操作。
首先准备RDD
1 | val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) |
转换Transform
下面是几个简单的Transform操作
1 | /* map操作 */ |
map(func)
将函数应用于 RDD 中的每个元素,返回值构成新的 RDD
flatMap(func)
类似于 map,但是每个输入项可以映射为0个输出项或更多输出项(打散)
filter(func)
将函数应用于 RDD 中的每个元素,返回 func 函数的值为true的元素形成一个新的 RDD
distinct()
去重
union(otherRDD)
并集
生成一个包含两个 RDD 中所有元素的 RDD。如果输入的 RDD 中有重复数据,union() 操作也会包含这些重复的数据.
intersection(otherRDD)
求两个 RDD 共同的元素的 RDD。 intersection() 在运行时也会去掉所有重复的元素
subtract(otherRDD)
差集
subtract 接受另一个 RDD 作为参数,返回一个由只存在第一个 RDD 中而不存在第二个 RDD 中的所有元素组成的 RDD
下面的Transform操作应用在Key-Value的RDD上。
groupByKey()
分组
根据键值对 key 进行分组。 在(K,V)键值对的数据集上调用时,返回(K,Iterable )键值对的数据集
note: 基于combineByKeyWithClassTag
1 | val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) |
note: 如果分组是为了在每个 key 上执行聚合(如求总和或平均值)则使用 reduceByKey 或 aggregateByKey 会有更好的性能。
reduceByKey(func, [numTasks])
根据key聚合
当在(K,V)键值对的数据集上调用时,返回(K,V)键值对的数据集,使用给定的reduce函数 func 聚合每个键的值,该函数类型必须是(V,V)=> V
note: 基于combineByKeyWithClassTag
1 | val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y) |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
根据key排序
在(K,V)键值对的数据集调用,其中 K 实现 Ordered 接口,按照升序或降序顺序返回按键排序的(K,V)键值对的数据集
1 | val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1) |
行动Action
下面是常见的Action操作
1 | /* count操作 */ |
reduce(func)
接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD并返回一个同样类型的新元素
1 | rddInt.reduce((x ,y) => x + y) |
collect()
将整个RDD的内容返回
take(n)
返回 RDD 中的n个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合
saveAsTextFile(path)
将数据集的元素写入到本地文件系统,HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录的文本文件(或文本文件集合)中
saveAsSequenceFile(path)
将数据集的元素写入到本地文件系统,HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径下的 Hadoop SequenceFile中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用
foreach(func)
在数据集的每个元素上运行函数 func
Key-Value RDD
Spark里创建键值对RDD只可以从内存里读取。所有从文件中读取的RDD都是一般的RDD对象,需要进行转化。
对于Pair RDD常见的聚合操作如:reduceByKey,foldByKey,groupByKey,combineByKey,这些API的定义在 PairRDDFunctions类中。
常见的Key-Value RDD转换操作:
1 | reduceByKey:合并具有相同键的值; |
常见的行动操作:
1 | countByKey:对每个键的元素进行分别计数; |
combineByKey
combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的。例如groupByKey,reduceByKey等等
1 | def combineByKey[C]( |
其中主要的前三个参数如下:
createCombiner
函数: 组合器函数,用于将RDD[K,V]中的V转换成一个新的值C1;在找到给定分区中第一次碰到的key(在RDD元素中)时被调用。此方法为这个key初始化一个累加器。mergeValue
函数:合并值函数,将一个C1类型值和一个V类型值合并成一个C2类型,输入参数为(C1,V),输出为新的C2; 当累加器已经存在的时候(也就是上面那个key的累加器)调用。mergeCombiners
函数:合并组合器函数,用于将两个C2类型值合并成一个C3类型,输入参数为(C2,C2),输出为新的C3。如果哪个key跨多个分区,该参数就会被调用。
举个例子说明计算的流程:
现在有一个Key-Value的Pair RDD,两个分区
1 | val pairStrRDD = sc.parallelize[(String, Int)](Seq(("coffee", 1), ("coffee", 2), ("tea", 3), ("coffee", 9)), 2) |
这个RDD有两个分区,第一个分区是((“Coffee”, 1), (“Coffee”, 2)),第二个分区是((“Tea”, 3), (“Coffee”,9))
然后定义combineByKey的前三个参数
1 | // 找到给定分区中第一次碰到的key(在RDD元素中)时被调用 |
在分区1中,第一次遇到key “Coffee”,createCombiner
函数被调用,为”Coffee”产生一个累加器,coffee的值为1,出现次数为1。(1, 1);第二次遇到key “Coffee”,调用函数mergeValue
函数,上个累加器的第一个元素加上这次遇到的value,上个累加器的第二个元素加上1作为次数。(1+2, 1+1)。在分区2中,同理得到两个累加器tea: (3, 1) coffee: (9, 1)
“Coffee”这个key值跨越两个分区,函数mergeCombiners
被调用,coffee: (3+9, 2+1)
所以最后得到的RDD为 coffee (12, 3), tea (3, 1)
1 | val testCombineByKeyRDD = pairStrRDD.combineByKey(createCombiner, mergeValue, mergeCombiners) |
groupByKey
的实现
上面说过,例如reduceByKey、groupByKey等键值对RDD的转换,都是基于combineByKey的
源码中,可以看出定义了combineByKey的三个参数
- createCombiner 将原RDD中的K类型转换为Iterable[V]类型,实现为CompactBuffer
- mergeValue 将原RDD的元素追加到CompactBuffer中,即将追加操作(+=)视为合并操作
- mergeCombiners 针对每个key值所对应的Iterable[V],提供合并功能(c1 ++= c2)
groupByKey函数针对PairRddFunctions的RDD[(K, V)]按照key对value进行分组
关于Shuffle
大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作
RDD缓存
persist()
or cache()