DataFrame

作者: 竞媒体 | 来源:发表于2020-05-21 11:28 被阅读0次

DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。

通过在分布式数据集中施加结构,让Spark用户利用Spark SQL来查询结构化的数据或使用Spark表达式方法(而不是lambda)

利用DataFrame加速PySpark

DataFrame和Catalyst优化器的意义是在和非优化的RDD查询比较时增加PySpark查询的性能。引入DataFrame之前,Python查询速度普遍比使用RDD的Scala查询慢。这种查询性能的降低源于Python和JVM之间的通信开销。

创建DataFrame

生成json数据 RDD

import findspark

findspark.init()

from pyspark import SparkContext, SparkConf

from pyspark.sql.session import SparkSession

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

conf = SparkConf().setAppName("wordcount")

sc =SparkContext(conf=conf)

spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()

people_json = [{"name":"Michael","age":22},{"name":"Andy", "age":30},{"name":"Justin", "age":19}]

schema = StructType([

        StructField("name", StringType(), True),

        StructField("age", IntegerType(), True)

    ])

stringJSONRDD = sc.parallelize(people_json)

people_dataframe = spark.createDataFrame(stringJSONRDD,schema)

people_dataframe.createOrReplaceTempView("table1")

print(spark.sql("select * from table1").collect())

输出:[Row(name='Michael', age=None), Row(name='Andy', age=30), Row(name='Justin', age=19)]

RDD的交互操作

有两种从RDD变换到DataFrame的不同方法:使用反射推断模式或以编程方式指定模式。

上面使用的就是编程方式指定模式

利用DataFrame API查询

people_dataframe.count()

返回DataFrame中的行数

运行筛选语句

people_dataframe.select("name","age").filter("age=19").show()

输出如下:

+------+---+

|  name|age|

+------+---+

|Justin| 19|

+------+---+

利用SQL查询

我们执行了.createOrReplaceTempView方法,可以使用SQL查询。

行数

spark.sql("select count(1) from table1").show()

利用where子句运行筛选语句

spark.sql("select name,age from table1 where age=19").show()

spark.sql("select name,age from table1 where name like 'a%'").show()

相关文章

网友评论

      本文标题:DataFrame

      本文链接:https://www.haomeiwen.com/subject/tyaaohtx.html