美文网首页
Spark算子

Spark算子

作者: duval | 来源:发表于2017-03-14 12:45 被阅读0次

layout: pospost
title: Spark算子
date: 2017-02-27 16:03:52
tags: [Spark算子,Spark,大数据]
categories: "大数据"


RDD创建操作

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
  从一个Seq集合创建 RDD。
参数1:Seq集合,必须。
参数2:分区数,默认为该Application分配到的资源的CPU核数( Spark will run one task for each partition of the cluster. )

scala> val data = Array(1,2,3,4,5,6);
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num = sc.parallelize(data);
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

scala> num.collect();
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> val num1 = sc.parallelize(1 to 10);
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> num1.collect();
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> num1.partitions.size
res3: Int = 4

scala> val num2 = sc.parallelize(1 to 10 ,2);
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> num2.partitions.size
res4: Int = 2

  • makeRDD (跳过)

与parallelize一模一样

从外部存储创建RDD

  • textFile

从hdfs文件创建

scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> textFile.collect();
res6: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> textFile.count();
res7: Long = 3

从其他HDFS文件格式创建 (跳过)

  • hadoopFile
  • sequenceFile
  • objectFile
  • newAPIHadoopFile

从Hadoop接口API创建 (跳过)

  • hadoopRDD
  • newAPIHadoopRDD

RDD基本转换操作(1) –map、flatMap、distinct

数据准备

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ bin/hadoop dfs -copyFromLocal ~/test/1.txt  /user/test/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

hadoop@hzbxs-bigdata16:~/hadoop-hp2$ cat ~/test/1.txt 
Hello World
Hello Yang
Hello SCALA
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ 
  • map

将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。

//从HDFS从加载文件
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[13] at textFile at <console>:24
//打印 textFile 内容
scala> textFile.collect();
res7: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)               
// map 算子
scala> val mapdata = textFile.map(line => line.split("\\s+"));
mapdata: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:26
//打印data 内容
scala> mapdata.collect();
res8: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Yang), Array(Hello, SCALA))
  • flatMap

与map类似,但是最后会将结果合并到一个分区。

scala> val flatmapdata = textFile.flatMap(line => line.split("\\s+"));
flatmapdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26

scala> flatmapdata.collect();
res10: Array[String] = Array(Hello, World, Hello, Yang, Hello, SCALA)

  • distinct

对元素去重

scala> val flatdistincedata = textFile.flatMap(line => line.split("\\s+")).distinct();
flatdistincedata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at distinct at <console>:26

scala> flatdistincedata.collect();
res0: Array[String] = Array(Hello, SCALA, World, Yang)

RDD基本转换操作(2)–coalesce、repartition

  • coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
将RDD的分区数量重新调整为numPartitions个,如果shuffle为true,分区数量可以大于原值。

scala> var data = sc.textFile("/user/test/1.txt");
data: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[16] at textFile at <console>:24

scala> data.collect();
res9: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> data.partitions.size;
res11: Int = 2

scala> var newdata = data.coalesce(1);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[17] at coalesce at <console>:26

scala> newdata.collect();
res12: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)

scala> newdata.partitions.size;
res13: Int = 1

// shuttle 默认为true,新分区数量大于原分区值,无效
scala> var newdata = data.coalesce(3);  
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:26

scala> newdata.partitions.size;
res14: Int = 2

// shuttle 默认为false,新分区数量可以大于原分区值,有效
scala> var newdata = data.coalesce(3,true);
newdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at coalesce at <console>:26

scala> newdata.partitions.size;
res15: Int = 3

  • repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]
相当于 coalesce 的shuttle 为true

scala> var newdata1 = data.repartition(3);
newdata1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:26

scala> newdata1.partition
partitioner   partitions

scala> newdata1.partitions.size
res16: Int = 3

RDD基本转换操作(3)–union(并集)、intersection(交集)、subtract(差集)

  • union

def union(other: RDD[T]): RDD[T]
将两个RDD 合并

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(6 to 10,2);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd1.union(rdd2).collect();
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  • intersetion

def intersection(other: org.apache.spark.rdd.RDD[Int],numPartitions: Int): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int],partitioner: org.apache.spark.Partitioner)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int]): org.apache.spark.rdd.RDD[Int]
求两个RDD的交集,可以指定返回的RDD的分区数或者指定分区函数。不去重

scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(3 to 7,1);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> rdd1.intersection(rdd2).collect();
res28: Array[Int] = Array(4, 3, 5)
scala> var rdd3 = rdd1.intersection(rdd2,4);
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[50] at intersection at <console>:28
scala> rdd3.partitions.size
res31: Int = 4
  • substract

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
返回在RDD中出现,并且不在otherRDD中出现的元素,不去重

scala> var rdd1 = sc.makeRDD(Seq(1,1,2,2,3,3));
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:24

scala> var rdd2 = sc.makeRDD(Seq(3,3,4,4,5,5));
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at makeRDD at <console>:24

scala> rdd1.subtract(rdd2).collect();
res32: Array[Int] = Array(1, 1, 2, 2)

RDD基本转换操作(4)-mapPartitions 、mapPartitionsWithIndex

  • mapPartitions

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T.
跟map类似,区别在于mapPartitions作用在RDD的每一个分区上,func需要实现从 Iterator< T > 到Iterator< U > 的转换。

scala> var rdd = sc.makeRDD(1 to 10,4);
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at makeRDD at <console>:24
scala> var rdd1 = rdd.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0;
     |     while (x.hasNext){
     |        i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[58] at mapPartitions at <console>:26

scala> rdd1.collect
res33: Array[Int] = Array(3, 12, 13, 27)

scala> rdd.partitions.size
res34: Int = 4

  • mapPartitionsWithIndex

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator< T >) => Iterator< U > when running on an RDD of type T.
跟mapPartitions 类似,区别在于 func需要实现(Int, Iterator< T >) 到 Iterator< U > 的类型转换,第一个整数代表分区的下标

scala> var rdd1 = rdd.mapPartitionsWithIndex{ (index,x) => {
     |     var result = List[String]()
     |     var i = 0
     |     while (x.hasNext){
     |        i+=x.next()
     |     }
     |     result.::(index+"|"+i).iterator
     |  }
     | }
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at mapPartitionsWithIndex at <console>:26

scala> rdd1.collect
res38: Array[String] = Array(0|3, 1|12, 2|13, 3|27)

RDD基本转换操作(5)

  • groupByKey

**def groupByKey(): RDD[(K, Iterable[V])] **
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] // 指定分区数
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

scala> var arr = Array(("1","YANG"),("1","YANG1"),("2","HELLO"),("2","HELLO2"),("3","WORLD"));
arr: Array[(String, String)] = Array((1,YANG), (1,YANG1), (2,HELLO), (2,HELLO2), (3,WORLD))

scala> var rdd = sc.makeRDD(arr);
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[61] at makeRDD at <console>:26

scala> var rdd1 = rdd.groupByKey();
rdd1: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:28

scala> rdd1.collect
res39: Array[(String, Iterable[String])] = Array((1,CompactBuffer(YANG, YANG1)), (2,CompactBuffer(HELLO, HELLO2)), (3,CompactBuffer(WORLD)))

  • reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",3),("B",3)));
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:24

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[64] at reduceByKey at <console>:26

scala> rdd1.collect
res40: Array[(String, Int)] = Array((A,6), (B,6))

scala> rdd.partitions.size
res41: Int = 4

scala> rdd1.partitions.size
res42: Int = 4

scala> var rdd1 = rdd.reduceByKey((x,y) => x+y,10);
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:26

scala> rdd1.collect
res43: Array[(String, Int)] = Array((A,6), (B,6))                               

scala> rdd1.partitions.size
res44: Int = 10
  • reduceByKeyLocally

类似与reduceByKey ,但是返回的不是RDD,而是Map

scala> rdd.collect
res45: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (B,3), (B,3))

scala> rdd.reduceByKeyLocally((x,y) => x+y)
res46: scala.collection.Map[String,Int] = Map(B -> 6, A -> 6)

相关文章

网友评论

      本文标题:Spark算子

      本文链接:https://www.haomeiwen.com/subject/placnttx.html