美文网首页
Spark之RDD和Dataset转换

Spark之RDD和Dataset转换

作者: 学师大术 | 来源:发表于2019-06-11 21:44 被阅读0次

版本

基于Spark 2.0+以上版本。scala ,java案例不多,记录下java代码进行RDD和dataset转换。

列举常见的两种方式

1.方式一:bean反射

//1.创建一个bean,对应表中字段
public class Zzltable implements Serializable {
    private String key=null;
    private String name=null;
    public String getKey() {
        return key;
    }
    public void setKey(String key) {
        this.key = key;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
//2.创建sparksession,并构造RDD
SparkSession sparkSession =SparkSession.builder().enableHiveSupport().getOrCreate();
List<String> data = new ArrayList<String>();
data.add("name1:name2");
data.add("name3:name4");
JavaRDD<String> jr= new JavaSparkContext(sparkSession.sparkContext()).parallelize(data);
 JavaRDD<Zzltable> jd = jr.map(new Function<String, Zzltable>() {
            public Zzltable call(String s) throws Exception {
                Zzltable z1 = new Zzltable();
                String[] arr = s.split(":");
                z1.setName(arr[1]);
                z1.setKey(arr[0]);
                return z1;
 }
});
//3.直接使用反射Zzltable
Dataset<Row> data1 = sparkSession.createDataFrame(jd,Zzltable.class);
//4.添加分区字段,分区字段为常量        
data1.withColumn("dt",lit("11")).write().partitionBy("dt").mode(SaveMode.Append).saveAsTable("example_zzl");

2.方式二:定义structType和structFiled

//1.使用RowFactory,构造RDD<Row>
       JavaRDD<Row> jdr = jr.map(new Function<String, Row>() {
            public Row call(String s) throws Exception {
                String[] arr = s.split(":");
                String name=arr[1];
                String id = arr[2];
               return RowFactory.create(name,id);
            }
        });
//2.构造字段类型structType、structFiled
 ArrayList<StructField> fields = new ArrayList<StructField>();
 fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
 fields.add(DataTypes.createStructField("id",DataTypes.StringType,true));
 StructType types = DataTypes.createStructType(fields);

//3.构造dataset
Dataset<Row> data2 = sparkSession.createDataFrame(jdr,types);
//4.保存   
data2.withColumn("dt",lit("11")).write().partitionBy("dt").mode(SaveMode.Append).saveAsTable("example_zz2");

总结

1.转化的方法都可在SparkSession.createDataFrame中获取,有大量API供您选择。
2.在java中如果需要withColumn添加列,尤其是需要添加常量的时候可以使用lit。在java中需要静态引入:import static org.apache.spark.sql.functions.lit;

相关文章

网友评论

      本文标题:Spark之RDD和Dataset转换

      本文链接:https://www.haomeiwen.com/subject/xzbcfctx.html