# coding=utf8
from pyspark import SparkConf, SparkContext
'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
intRDD = sc.parallelize([3, 1, 2, 5, 5])
stringRDD = sc.parallelize(['Apple', 'Orange', 'Grape', 'Banana', 'Apple'])
# RDD转换为Python数据类型
print (intRDD.collect())
print (stringRDD.collect())
print '-------------------------------------'
# map运算
print intRDD.map(lambda x: x + 1).collect()
print '-------------------------------------'
# filter运算
print intRDD.filter(lambda x: x < 3).collect()
print stringRDD.filter(lambda x: 'ra' in x).collect()
print '-------------------------------------'
# 去除重复元素
print intRDD.distinct().collect()
print '-------------------------------------'
# randomSplit运算
# randomSplit 运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出
sRDD = intRDD.randomSplit([0.4, 0.6])
print len(sRDD)
print sRDD[0].collect()
print sRDD[1].collect()
print '-------------------------------------'
# groupBy运算
result = intRDD.groupBy(lambda x: x % 2).collect()
print result
print sorted([(x, sorted(y)) for (x, y) in result])
print '-------------------------------------'
'''
多个RDD转换运算
'''
# 并集运算
intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
intRDD2 = sc.parallelize([5, 6])
intRDD3 = sc.parallelize([2, 7])
print intRDD1.union(intRDD2).union(intRDD3).collect()
# 交集运算
print intRDD1.intersection(intRDD2).collect()
# 差集运算
print intRDD1.subtract(intRDD2).collect()
# 笛卡尔积运算
print intRDD1.cartesian(intRDD2).collect()
print '-------------------------------------'
'''
基本“动作”运算
'''
# 读取元素
# 取第一条数据
print (intRDD.first())
# 取前两条数据
print (intRDD.take(2))
# 升序排列,并取前3条数据
print (intRDD.takeOrdered(3))
# 降序排列,并取前3条数据
print (intRDD.takeOrdered(3, lambda x: -x))
print '-------------------------------------'
# 统计功能
# 统计
print (intRDD.stats())
# 最小值
print (intRDD.min())
# 最大值
print (intRDD.max())
# 标准差
print (intRDD.stdev())
# 计数
print (intRDD.count())
# 求和
print (intRDD.sum())
# 平均
print (intRDD.mean())
print '-------------------------------------'
'''
RDD Key-Value基本“转换”运算
'''
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
print kvRDD1.keys().collect()
print kvRDD1.values().collect()
# 筛选键小于某值的元素
print kvRDD1.filter(lambda x: x[0] < 2).collect()
# 筛选值小于某值的元素
print kvRDD1.filter(lambda x: x[1] < 5).collect()
# 将值进行平方处理
print kvRDD1.mapValues(lambda x: x ** 2).collect()
# 按照key排序
print kvRDD1.sortByKey().collect()
print kvRDD1.sortByKey(True).collect()
print kvRDD1.sortByKey(False).collect()
print '-------------------------------------'
# 合并相同key值的数据
print kvRDD1.reduceByKey(lambda x, y: x + y).collect()
print '-------------------------------------'
'''
多个RDD Key-Value“转换”运算
'''
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD2 = sc.parallelize([(3, 8)])
# 内连接
print kvRDD1.join(kvRDD2).collect()
# 左外连接
print kvRDD1.leftOuterJoin(kvRDD2).collect()
# 右外连接
print kvRDD1.rightOuterJoin(kvRDD2).collect()
# 删除相同key值数据
print kvRDD1.subtractByKey(kvRDD2).collect()
'''
Key-Value“动作”运算
'''
# 读取第一条数据
print kvRDD1.first()
# 读取前两条数据
print kvRDD1.take(2)
# 读取第一条数据的key值
print kvRDD1.first()[0]
# 读取第一条数据的value值
print kvRDD1.first()[1]
# 按key值统计
print kvRDD1.countByKey()
# lookup查找
print kvRDD1.lookup(3)
map与flatMap的区别
# coding=utf8
from pyspark import SparkConf, SparkContext
'''
###基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
intRDD = sc.parallelize([3, 1, 2, 5, 5])
kvRDD = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])
# map运算
print intRDD.map(lambda x: x + 1).collect()
# print intRDD.flatMap(lambda x: x + 1)
print '-------------------'
print kvRDD.map(lambda x: (x[0], x[1] + 1)).collect()
print kvRDD.flatMap(lambda x: (x[0], x[1] + 1)).collect()
# 运行结果:
# [4, 2, 3, 6, 6]
# -------------------
# [('A', 2), ('B', 3), ('C', 4)]
# ['A', 2, 'B', 3, 'C', 4]
map和flatMap的区别
play_rdd = sc.parallelize([
(1, ("A", 1, ['a', 'b'])),
(0, ("B", 2, ['a', 'c'])),
(2, ("C", 3, ['b', 'd', 'e'])
)])
video_play_rdd = play_rdd.map(lambda x: [(video, 1) for video in x[1][2]])
print video_play_rdd.collect()
video_play_rdd = play_rdd.flatMap(lambda x: [(video, 1) for video in x[1][2]])
print video_play_rdd.collect()
# 运行结果:
# [[('a', 1), ('b', 1)], [('a', 1), ('c', 1)], [('b', 1), ('d', 1), ('e', 1)]]
# [('a', 1), ('b', 1), ('a', 1), ('c', 1), ('b', 1), ('d', 1), ('e', 1)]
join运算
# coding=utf8
from pyspark import SparkConf, SparkContext
'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3)])
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9)])
# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()
res = res.map(lambda line: (line[0].split(':')[0], line[0].split(':')[1], line[1])).collect()
print res
运行结果:
[('A:111', (1, 12.5)), ('B:222', (2, 8.7)), ('C:333', (3, 34.9))]
[('A', '111', (1, 12.5)), ('B', '222', (2, 8.7)), ('C', '333', (3, 34.9))]
注意:如果kvRDD1或者kvRDD2中如果有None,则会报错,比如如下形式:
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)])
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9), None])
可以采取如下办法:
# coding=utf8
from pyspark import SparkConf, SparkContext
'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)]).filter(lambda line: line is not None)
kvRDD2 = sc.parallelize([("A:111", 12.5), ("B:222", 8.7), ("C:333", 34.9), None]).filter(lambda line: line is not None)
# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()
res = res.map(lambda line: (line[0].split(':')[0], line[0].split(':')[1], line[1])).collect()
print res
注:如果两个rdd里面每个元素,有的用中括号,有的用小括号,也是可以join的,如下所示:
# coding=utf8
from pyspark import SparkConf, SparkContext
'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
# 创建RDD
kvRDD1 = sc.parallelize([("A:111", 1), ("B:222", 2), ("C:333", 3), ("E:555", 5)]).filter(lambda line: line is not None)
kvRDD2 = sc.parallelize([["A:111", 12.5], ("A:111", 19.5), ["B:222", 8.7], ("C:333", 34.9), None]).filter(lambda line: line is not None)
# join运算
res = kvRDD1.join(kvRDD2)
print res.collect()
spark从多个文件或目录中读取数据:
#读取多个文件
distFile = sc.textFile("/input/data1.txt, /input/data2.txt")
#读取多个目录
kvRDD1 = sc.textFile('aaa/*,bbb/*')











网友评论