这里就不再介绍Spark了,这篇文章主要记录一下关于Spark的核心RDD
的相关操作以及以单词计数这个简单的例子,描述一下Spark的处理流程。
Spark RDD
Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集,这个存储系统可以是一个共享文件系统,比如HDFS、HBase或任意提供了Hadoop输入格式的数据来源。
RDD支持两类操作:
- 转换(Transform)
- 动作(Action)
还是不翻译的好,下面都用英文描述。Transform
:用于从已有的数据集转换产生新的数据集,Transform的操作是Lazy Evaluation
的,也就是说这条语句过后,转换并没有发生,而是在下一个Action
调用的时候才会返回结果。Action
:用于计算结果并向驱动程序返回结果。
演示一下上面两种基本操作:
1 | lines = sc.textFile("data.txt") |
第一行是有外部存储系统中创建一个RDD对象,第二行定义map操作,是一个Transform
操作,由于Lazy Evaluation
,对象lineLength
并没有立即计算得到。第三行,reduce
是一个Action
操作,这时,Spark将整个计算过程划分成许多任务在多台机器上并行执行,每台机器运行自己部分的map操作和reduce操作,最终将自己部分的运算结果返回给驱动程序。
1 | lineLength.persist() |
这一行,Spark将lineLength
对象保存在内存中,以便后面计算中使用。Spark的一个重要功能就是在将数据集持久化(或缓存)到内存中以便在多个操作中重复使用。
以上就是RDD的一些基本操作,API文档中写的都很清楚,我就不多说了。
统计一篇文档中单词的个数
首先,写一个函数,用来计算单词个数
1 | def wordCount(wordListRDD): |
使用正则表达式清理原始文本
1 | import re |
去读文件内容到RDD中
1 | import os.path |
这时候,需要把单词通过空格隔开,然后过滤掉为空的内容
1 | shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split()) |
统计出出现次数前15多的单词以及个数:
1 | top15WordAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda (k, v): -v) |
输出结果为:
word | count |
---|---|
the: | 27361 |
and: | 26028 |
i: | 20681 |
to: | 19150 |
of: | 17463 |
a: | 14593 |
you: | 13615 |
my: | 12481 |
in: | 10956 |
that: | 10890 |
is: | 9134 |
not: | 8497 |
with: | 7771 |
me: | 7769 |
it: | 7678 |
Spark是用Scala写出来的,所以可想而知如果用Scala写的效率会比Python高一些,在这儿顺便贴一个Scala版写的WordCount:
1 | val wordCounts = textFile.flatMap(line => line.split(" ")) |
真是简洁,Spark真好,嘿嘿~