美文网首页
spark练手与防遗忘系列:spark-rdd基本操作

spark练手与防遗忘系列:spark-rdd基本操作

作者: BlueCat2016 | 来源:发表于2017-12-13 17:56 被阅读0次
# coding=utf8

from pyspark import SparkConf, SparkContext

'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
intRDD = sc.parallelize([3, 1, 2, 5, 5])
stringRDD = sc.parallelize(['Apple', 'Orange', 'Grape', 'Banana', 'Apple'])
# RDD转换为Python数据类型
print (intRDD.collect())
print (stringRDD.collect())
print '-------------------------------------'
# map运算
print intRDD.map(lambda x: x + 1).collect()
print '-------------------------------------'
# filter运算
print intRDD.filter(lambda x: x < 3).collect()
print stringRDD.filter(lambda x: 'ra' in x).collect()
print '-------------------------------------'
# 去除重复元素
print intRDD.distinct().collect()
print '-------------------------------------'
# randomSplit运算
# randomSplit 运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出

sRDD = intRDD.randomSplit([0.4, 0.6])
print len(sRDD)
print sRDD[0].collect()
print sRDD[1].collect()
print '-------------------------------------'
# groupBy运算
result = intRDD.groupBy(lambda x: x % 2).collect()
print result
print sorted([(x, sorted(y)) for (x, y) in result])
print '-------------------------------------'
'''
多个RDD转换运算
'''
# 并集运算
intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
intRDD2 = sc.parallelize([5, 6])
intRDD3 = sc.parallelize([2, 7])
print intRDD1.union(intRDD2).union(intRDD3).collect()
# 交集运算
print intRDD1.intersection(intRDD2).collect()
# 差集运算
print intRDD1.subtract(intRDD2).collect()
# 笛卡尔积运算
print intRDD1.cartesian(intRDD2).collect()
print '-------------------------------------'
'''
基本“动作”运算
'''
# 读取元素
# 取第一条数据
print (intRDD.first())
# 取前两条数据
print (intRDD.take(2))
# 升序排列,并取前3条数据
print (intRDD.takeOrdered(3))
# 降序排列,并取前3条数据
print (intRDD.takeOrdered(3, lambda x: -x))
print '-------------------------------------'
# 统计功能
# 统计
print (intRDD.stats())
# 最小值
print (intRDD.min())
# 最大值
print (intRDD.max())
# 标准差
print (intRDD.stdev())
# 计数
print (intRDD.count())
# 求和
print (intRDD.sum())
# 平均
print (intRDD.mean())
print '-------------------------------------'
'''
RDD Key-Value基本“转换”运算
'''
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
print kvRDD1.keys().collect()
print kvRDD1.values().collect()
# 筛选键小于某值的元素
print kvRDD1.filter(lambda x: x[0] < 2).collect()
# 筛选值小于某值的元素
print kvRDD1.filter(lambda x: x[1] < 5).collect()
# 将值进行平方处理
print kvRDD1.mapValues(lambda x: x ** 2).collect()
# 按照key排序
print kvRDD1.sortByKey().collect()
print kvRDD1.sortByKey(True).collect()
print kvRDD1.sortByKey(False).collect()
print '-------------------------------------'

# 合并相同key值的数据
print kvRDD1.reduceByKey(lambda x, y: x + y).collect()
print '-------------------------------------'
'''
多个RDD Key-Value“转换”运算
'''
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD2 = sc.parallelize([(3, 8)])

# 内连接
print kvRDD1.join(kvRDD2).collect()
# 左外连接
print kvRDD1.leftOuterJoin(kvRDD2).collect()
# 右外连接
print kvRDD1.rightOuterJoin(kvRDD2).collect()
# 删除相同key值数据
print kvRDD1.subtractByKey(kvRDD2).collect()
'''
Key-Value“动作”运算
'''
# 读取第一条数据
print kvRDD1.first()
# 读取前两条数据
print kvRDD1.take(2)
# 读取第一条数据的key值
print kvRDD1.first()[0]
# 读取第一条数据的value值
print kvRDD1.first()[1]

# 按key值统计
print kvRDD1.countByKey()
# lookup查找
print kvRDD1.lookup(3)

map与flatMap的区别

# coding=utf8

from pyspark import SparkConf, SparkContext

'''
###基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
intRDD = sc.parallelize([3, 1, 2, 5, 5])
kvRDD = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])

# map运算
print intRDD.map(lambda x: x + 1).collect()
# print intRDD.flatMap(lambda x: x + 1)
print '-------------------'
print kvRDD.map(lambda x: (x[0], x[1] + 1)).collect()
print kvRDD.flatMap(lambda x: (x[0], x[1] + 1)).collect()

# 运行结果:
# [4, 2, 3, 6, 6]
# -------------------
# [('A', 2), ('B', 3), ('C', 4)]
# ['A', 2, 'B', 3, 'C', 4]

map和flatMap的区别

play_rdd = sc.parallelize([
    (1, ("A", 1, ['a', 'b'])),
    (0, ("B", 2, ['a', 'c'])),
    (2, ("C", 3, ['b', 'd', 'e'])
     )])

video_play_rdd = play_rdd.map(lambda x: [(video, 1) for video in x[1][2]])
print video_play_rdd.collect()

video_play_rdd = play_rdd.flatMap(lambda x: [(video, 1) for video in x[1][2]])
print video_play_rdd.collect()

# 运行结果:
# [[('a', 1), ('b', 1)], [('a', 1), ('c', 1)], [('b', 1), ('d', 1), ('e', 1)]]
# [('a', 1), ('b', 1), ('a', 1), ('c', 1), ('b', 1), ('d', 1), ('e', 1)]

join运算

# coding=utf8

from pyspark import SparkConf, SparkContext

'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3)])
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9)])

# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()
res = res.map(lambda line: (line[0].split(':')[0], line[0].split(':')[1], line[1])).collect()
print res

运行结果:

[('A:111', (1, 12.5)), ('B:222', (2, 8.7)), ('C:333', (3, 34.9))]
[('A', '111', (1, 12.5)), ('B', '222', (2, 8.7)), ('C', '333', (3, 34.9))]

注意:如果kvRDD1或者kvRDD2中如果有None,则会报错,比如如下形式:

kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)])
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9), None])

可以采取如下办法:

# coding=utf8

from pyspark import SparkConf, SparkContext

'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)]).filter(lambda line: line is not None)
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9), None]).filter(lambda line: line is not None)

# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()
res = res.map(lambda line: (line[0].split(':')[0], line[0].split(':')[1], line[1])).collect()
print res

注:如果两个rdd里面每个元素,有的用中括号,有的用小括号,也是可以join的,如下所示:

# coding=utf8

from pyspark import SparkConf, SparkContext

'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)]).filter(lambda line: line is not None)
kvRDD2 = sc.parallelize([["A:111", 12.5], ("A:111", 19.5), ["B:222", 8.7], ("C:333", 34.9), None]).filter(lambda line: line is not None)

# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()

spark从多个文件或目录中读取数据:

#读取多个文件
distFile = sc.textFile("/input/data1.txt, /input/data2.txt") 
#读取多个目录
kvRDD1 = sc.textFile('aaa/*,bbb/*') 

相关文章

  • spark练手与防遗忘系列:spark-rdd基本操作

    map与flatMap的区别 map和flatMap的区别 join运算 运行结果: 注意:如果kvRDD1或者k...

  • 练手与防遗忘系列-numpy

    越来越发现,numpy,pandas,scipy,matplotlib这类库的各种函数就像背英语单词一样,一直无法...

  • Spark-RDD操作MySQL

    Spark支持通过Java JDBC访问关系型数据库,需要通过JdbcRDD进行访问,示例如下: 添加依赖 MyS...

  • spark-RDD算子操作分类

    RDD算子操作分类 测试用例说明 前置方法 后置方法 成员变量 1.taansformation(转换) 它可以实...

  • spark-rdd

    rdd Resilient Distributed DataSets 容错的 并行的数据结果 transform ...

  • RDD是什么

    基本操作 wordcount.txt文件 bin/spark-shell 操作命令 bin\spark-submi...

  • mongodb基本操作&spark操作mongodb

    mongodb基本操作 spark连接并读取mongodb spark写入mongodb

  • Spark核心编程:Spark基本工作原理与RDD

    Spark核心编程:Spark基本工作原理与RDD Spark基本工作原理 画图讲解Spark的基本工作原理1、分...

  • Spark-RDD介绍

    RDD 1 RDD介绍 Driver program:包含程序的main()方法,RDDs的定义和操作。管理节点,...

  • Spark-RDD分区

    RDD分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。所以对...

网友评论

      本文标题:spark练手与防遗忘系列:spark-rdd基本操作

      本文链接:https://www.haomeiwen.com/subject/eeszixtx.html