DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。
通过在分布式数据集中施加结构,让Spark用户利用Spark SQL来查询结构化的数据或使用Spark表达式方法(而不是lambda)
利用DataFrame加速PySpark
DataFrame和Catalyst优化器的意义是在和非优化的RDD查询比较时增加PySpark查询的性能。引入DataFrame之前,Python查询速度普遍比使用RDD的Scala查询慢。这种查询性能的降低源于Python和JVM之间的通信开销。
创建DataFrame
生成json数据 RDD
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
conf = SparkConf().setAppName("wordcount")
sc =SparkContext(conf=conf)
spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()
people_json = [{"name":"Michael","age":22},{"name":"Andy", "age":30},{"name":"Justin", "age":19}]
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
stringJSONRDD = sc.parallelize(people_json)
people_dataframe = spark.createDataFrame(stringJSONRDD,schema)
people_dataframe.createOrReplaceTempView("table1")
print(spark.sql("select * from table1").collect())
输出:[Row(name='Michael', age=None), Row(name='Andy', age=30), Row(name='Justin', age=19)]
RDD的交互操作
有两种从RDD变换到DataFrame的不同方法:使用反射推断模式或以编程方式指定模式。
上面使用的就是编程方式指定模式
利用DataFrame API查询
people_dataframe.count()
返回DataFrame中的行数
运行筛选语句
people_dataframe.select("name","age").filter("age=19").show()
输出如下:
+------+---+
| name|age|
+------+---+
|Justin| 19|
+------+---+
利用SQL查询
我们执行了.createOrReplaceTempView方法,可以使用SQL查询。
行数
spark.sql("select count(1) from table1").show()
利用where子句运行筛选语句
spark.sql("select name,age from table1 where age=19").show()
spark.sql("select name,age from table1 where name like 'a%'").show()












网友评论