Resilient Distributed Dataset (RDD) 弹性分布式数据集
Spark中的RDD,简单来说就是所有对象的一个不可变的分布式集合。每个RDD都被分割为多个分区,这就可以在集群的不同节点上进行计算。RDD可以包含任何Python,Java,Scala对象类型,包括用户自定义类型。
2.基础
在Spark 中,所有的工作都被表达为创建新RDD,对已存在的RDD做变换(1.变换),或者对RDD调用某些操作来计算得到一个结果(2.动作)
每个Spark程序或者 shell 会话都是像这样工作:
1. 从外部数据创建一些作为输入的RDD
2. 使用类似filter()之类的变换来定义出新的RDD --变换
3. 要求Spark对需要重用的任何中间 RDD进行 persist() 注:每次你执行个动作,Spark的RDD默认会被重新计算
4. 启动类似count()和 first()的动作开始并行计算,然后 Spark 会优化并执行 --动作
注:在任何时候你都可以定义一个新的RDD,但是Spark总是以一种lazy的方式计算它们,也就是它们被第一次用于动作的时候。
说明:总是重新计算一个RDD的能力事实上就是为什么RDD 被称为“弹性”的原因。当拥有RDD数据的机
器发生故障,Spark就利用这个能力重新计算丢失的分区,这对用户来说是透明的。
3.创建RDD
1.普通数组创建出来
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("pandas","i like pandas"));
System.out.println(lines.count());
2.加载一个外部数据集
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
System.out.println(lines.count());
4.RDD操作
RDD支持两种类型的操作:变换(transformation)和动作(action)
变换:对一个RDD进行操作得到一个新的RDD,如map()和filter()
动作:是向应用程序返回值,或向存储系统导出数据的那些操作,如count()和first()
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errsRDD = lines.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String x) {
System.out.println("RDD变换计算");
return x.contains("Exception");
}
});
System.out.println(lines.count());
System.out.println(errsRDD.count());
注:RDD的变换计算是会延迟的,直到你在一个动作中用到
以上例子中我们可以先注释掉2条println,此时是不会输出"RDD变换计算"的,只有在执行action如count()时才会输出"RDD变换计算"
5.传递函数
在Java中,函数是实现了org.apache.spark.api.java包中的Spark函数接口的对象
函数名 | 方法 | 用法 |
Function<T, R> |
R call(T) |
一个输入一个输出,用于map(),filter()之类的操作 |
Function2<T1, T2, R> |
R call(T1, T2) |
两个输入一个输出,用于aggregate(),fold()之类的操作 |
FlatMapFunction<T, R> |
Iterable<R> call(T) |
一个输入零个或多个输出,用于 flagMap()之类的操作 |
注:在Java8中,你也可以用lambda来简洁的实现函数接口
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errors = lines.filter(new Contains("Exception"));
System.out.println(errors.count());
class Contains implements Function<String, Boolean> {
private static final long serialVersionUID = 1L;
private String query;
public Contains(String query) {
this.query = query;
}
@Override
public Boolean call(String v1) throws Exception {
return v1.contains(query);
}
}
6.常见的变换和动作
变换
两个最常见的变换是map()和filter(),二者的区别:返回的结果是新的值还是原有的值
map():返回结果就是变换后的每个元素构成的新RDD
filter():返回的是该RDD中仅能通过该函数的元素构成的新RDD
通过查看接口也可以知道:map(Function<T,R>),filter(Function<T,Boolean>)
如果希望每个输入元素产生多个输出元素,这个操作叫做flatMap(FlatMapFunction<T,U>)
看一个flatMap的例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("hello world","hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
System.out.println(StringUtils.join(words.collect(), ","));
结果:hello,world,hi --hello world实现了产生多个输出元素
对应map的实例:
JavaRDD<String[]> words = lines.map(new Function<String, String[]>() {
@Override
public String[] call(String v1) throws Exception {
return v1.split(" ");
}
});
RDD支持许多数学集合操作
对包含{1, 2, 3}和{3, 4, 5}的两个 RDD 进行变换
函数名 | 描述 | 结果 |
union() |
生成一个包含两个RDD中所有元素的RDD |
{1, 2, 3, 3, 4, 5} |
intersection() |
生成两个RDD中都有的元素组成的 RDD |
{3} |
subtract() |
从一个RDD中去掉另一个RDD中存在的元素 |
{1, 2} |
cartesian() |
生成两个RDD的笛卡尔积的 |
{(1, 3), (1, 4), ..., (3,5)} |
动作
以上例子中已经列出了一些动作,看看还有其他哪些常用的
collect() |
返回RDD中的所有元素 |
count() |
返回RDD中元素个数 |
countByValue() |
RDD中每个元素出现的次数 |
take(num) |
返回 RDD 中的 num个元素 |
top(num) |
返回 RDD 中前 num个元素 |
takeOrdered(num)(ording) |
返回RDD中基于给定顺序的num个元素 |
takeSample(withReplacement,num,[seed]) |
随机返回RDD中的num个元素 |
reduce(func) |
并行合并 RDD 中的元素(比如求和) |
fold(func) |
和reduce()一样,但是提供了一个初值 |
aggregate(zeroValue)(seqOp, combOp) |
类似 reduce(),但是用于返回不同的类型 |
foreach(func) |
对RDD中的每个元素应用函数func |
7.持久化-persist()
每次对RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD,为避免多次计算同一个RDD,可以缓存该数据;当Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区
Spark 有多个级别的持久策略可选择
级别 | 空间占用 | cpu | 在内存 | 在硬盘 | 描述 |
MEMORY_ONLY |
高 |
低 |
是 | 否 | |
MEMORY_ONLY_SE |
低 |
高 |
是 |
否 |
|
MEMORY_AND_DISK |
高 |
中 | 有时 |
有时 |
如果数据太多不能放在内存里,则溢出到磁盘 |
MEMORY_AND_DISK_SER |
低 |
高 |
有时 |
有时 |
如果数据太多不能放在内存里,则溢出到磁盘。内存中的数据表现为序列化。 |
DISK_ONLY |
低 |
高 |
否 |
是 |
|
如果你试图缓存太多的数据,当超出了内存,Spark会使用 LRU 缓存策略丢弃旧的分区。对于 memory-only 存储级别,Spark 会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据
被丢弃而进行过多的计算。
看一个例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer x) {
System.err.println("重新计算:" + x);
return x * x;
}
});
result.persist(StorageLevel.MEMORY_ONLY());
System.out.println(result.count());
System.out.println(result.count());
当我们注释掉result.persist...
重新计算:1
重新计算:2
2
重新计算:1
重新计算:2
2
发现2个count,RDD变换计算了2次
启用persist:
重新计算:1
重新计算:2
2
2
--以上是Leaning Spark的笔记