美文网首页
Spark常见操作和数据持久化

Spark常见操作和数据持久化

作者: NEO_X | 来源:发表于2019-05-22 17:08 被阅读0次

常见transformation和action操作

在这里接触Spark中大部分常见的转化操作和行动操作。包含特定数据类型的RDD还支持一些附加操作,例如,数字类型的RDD支持统计型函数操作,而键值对形式的RDD则支持诸如根据键聚合数据的键值对操作。我们也会在后面几节中讲到如何转换RDD类型,以及各类型对应的特殊操作。

基本transformation操作

很可能会用到的两个最常用的转化操作是map()和filter()。转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果。RDD中对应元素的值。而转化操作filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回

map()函数

转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果,RDD 中对应元素的值。而转化操作 filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。map()的返回值类型不需要和输入类型一样.

map_filter.png
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)
map_squared.png

flatmap()

希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()。和map()类似,提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。flatMap()的一个简单用途是把输入的字符串切分为单词

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"
flatMap.png

flatMap() 和 map() 的区别。你可以把flatMap()看作将返回的迭代器降维展开,这样就得到了一个由各列表中的元素组成的 RDD,而不是一个由列表组成的RDD

diff_map_flatMap.png

伪集合操作

尽管RDD本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。注意,这些操作都要求操作的RDD是相同数据类型的

  • 即通常数学上定义的 交集,并集,差集,笛卡尔积等。如下表所示。
函数 目的 示例 返回值
union() 并集,生成一个包含两个RDD中所有元素的RDD rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 交集,求两个 RDD 共同的元素的RDD rdd.intersection(other) {3}
subtract() 差集,移除一个RDD中的内容(例如移除训练数据) rdd.subtract(other) {1, 2}
cartesian() 笛卡尔积,与另一个RDD 的笛卡儿积 rdd.cartesian(other) {(1, 3), (1, 4), ...(3, 5)}
  • RDD转化操作
函数 功能 示例 返回值
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() 将函数应用于RDD中的每个元素,将返回值构成新的RDD rdd.flatMap(x => x.to(3)) {1, 2, 3, 2,3, 3, 3}
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x => x != 1) {2, 3, 3}
distinct() 去重 rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, [seed]) 对 RDD 采样,以及是否替换 rdd.sample(false, 0.5) 不确定

Action操作

可能会用到基本RDD上最常见的Action操作是reduce()。它接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。使用reduce(),可以很方便地计算出 RDD中所有元素的总和、元素的个数,以及其他类型的聚合操作

sum = rdd.reduce(lambda x, y: x + y)
函数 目的 示例 返回值
collect() 返回RDD 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() RDD中的元素个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
take(num) 从RDD中返回num个元素 rdd.take(2) {1, 2}
top(num) 从RDD中返回最前面的num个元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSample(withReplacement, num, [seed]) 从RDD 中返回任意一些元素 rdd.takeSample(false, 1) 非确定的
reduce(func) 并行整合RDD中所有数据(例如 sum) rdd.reduce((x, y) => x + y) 9
fold(zero)(func) 和reduce()一 样,但是需要提供初始值 rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) 和reduce()相似,但是通常返回不同类型的函数 rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func)

使用 aggregate() 时,需要提供我们期待返回的类型的初始值。然后通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并

数据持久化

org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别

级别 使用的空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据
DISK_ONLY
  • scala相关代码
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
  • python相关代码
result = inputRdd.map(lambad x :x*x )
result.persist(StorageLevel.DISK_ONLY)
print result.count()

相关文章

  • Lesson 0-3 Objective-C basic

    15. 数据持久化 数据持久化方式: plist:属性列表plist 文件的常见操作 NSUserDefaults...

  • Spark常见操作和数据持久化

    常见transformation和action操作 在这里接触Spark中大部分常见的转化操作和行动操作。包含特定...

  • RDD 持久化的工作原理

    Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 ...

  • RDD持久化

    RDD持久化 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点...

  • JAVAEE框架学习——Hibernate——持久化

    Hibernate持久化类的编写规则 持久化类 Hibernate是持久层的ORM映射框架,专注于数据的持久化操作...

  • iOS 开发技术选型之数据库:SQLite vs. Core D

    持久化方案 在 iOS 开发中,数据持久化存储是一个很常见的需求。所谓持久化存储,就是将数据存到硬盘,使得应用重启...

  • spark结构化流--kafka

    我们的场景是通过spark-streaming读取kafka数据,实时持久化到hdfs上。并按day和hour分区...

  • Docker学习(13) 卷与持久化数据

    Docker学习(13) 卷与持久化数据 卷与持久化数据——简介 数据主要分为两种:持久化和非持久化。 持久化:就...

  • Redis常见面试题

    Redis常见面试题 Redis持久化机制 Redis是一个支持持久化的内存数据库,通过持久化机制把内存中的数据同...

  • Python中的文件与异常

    文件和异常 在实际开发中,常常需要对程序中的数据进行持久化操作,而实现数据持久化最直接简单的方式就是将数据保存到文...

网友评论

      本文标题:Spark常见操作和数据持久化

      本文链接:https://www.haomeiwen.com/subject/dxwuzqtx.html