直接代码:
package com.soul.bigdata.task.task03
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Random
object SerializatioApp {
def main(args: Array[String]): Unit = {
val sparkconf = new SparkConf()
sparkconf.setMaster("local[2]").setAppName("SerializatioApp")
sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkconf.registerKryoClasses(Array(classOf[Info]))
val sc = new SparkContext(sparkconf)
val arr = new ArrayBuffer[Info]()
val nameArr = Array[String]("zhangsan", "lisi", "wangwu")
val genderArr = Array[String]("male", "female")
val addressArr = Array[String]("beijing", "shanghai", "shengzhen", "wenzhou", "hangzhou")
for (i <- 1 to 1000000) {
val name = nameArr(Random.nextInt(3))
val age = Random.nextInt(100)
val gender = genderArr(Random.nextInt(2))
val address = addressArr(Random.nextInt(5))
arr += (Info(name, age, gender, address))
}
val rdd = sc.parallelize(arr)
rdd.unpersist()
//Java MEMORY_ONLY 34.3 MB
//Java MEMORY_ONLY_SER 25.4 MB
//spark.serializer MEMORY_ONLY_SER 71.7 MB
//spark.serializer+register MEMORY_ONLY_SER 21.1 MB
rdd.persist(StorageLevel.MEMORY_ONLY_SER).collect()
Thread.sleep(100000)
sc.stop()
}
case class Info(name: String, age: Int, gender: String, addr: String)
}
- Java MEMORY_ONLY 34.3 MB
不使用序列化,将数据缓存到内存 - Java MEMORY_ONLY_SER 25.4 MB
使用Spark默认的Java序列化后,将数据缓存到内存 - spark.serializer MEMORY_ONLY_SER 71.7 MB
改用kryo序列化但是不注册,将数据缓存到内存 - spark.serializer+register MEMORY_ONLY_SER 21.1 MB
改用kryo序列化并注册自定义的类,将数据缓存到内存
网友评论