一、RDD转为DataFrame例子:
第一种用法
在Spark shell中执行,下面把部分回显去掉
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> val fields = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))
scala> val schema = StructType(fields)
scala> val peopleRDD = sc.textFile("file:///home/hadoop/software/spark301/examples/src/main/resources/people.txt")
scala> val rowRDD = peopleRDD.map(_.split(",")).map(row => Row(row(0),row(1).trim.toInt))//trim很重要,因为例子中对应字段有前置空格
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("select name, age from people")
scala> results.map(a => "name: "+a(0)+", age: "+a(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael, ag...|
| name: Andy, age: 30|
|name: Justin, age...|
+--------------------+
scala> results.map(a => "name: "+a(0)+", age: "+a(1)).show(false)
+----------------------+
|value |
+----------------------+
|name: Michael, age: 29|
|name: Andy, age: 30 |
|name: Justin, age: 19 |
+----------------------+
第二种用法
scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> import ogr.apache.spark.sql.Encoder
scala> import spark.implicits._
scala> case class Person(name:String, age:Long)
scala> val peopleDF = sc.textFile("file:///home/hadoop/software/spark301/examples/src/main/resources/people.txt").map(_.split(",")).map(attr => Person(attr(0),attr(1).trim().toInt)).toDF()
scala> peopleDF.createOrReplaceTempView("people")
scala> val resultsDF = spark.sql("select name, age from people where age > 20")
scala> resultsDF.map(t => "name is : "+t(0)+", age is : "+t(1)).show()
scala> resultsDF.map(t => "name is : "+t(0)+", age is : "+t(1)).show(false)
+------------------------------+
|value |
+------------------------------+
|name is : Michael, age is : 29|
|name is : Andy, age is : 30 |
+------------------------------+













网友评论