1.检查重复数据 未观测数据和异常数据(离群值)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Name") \
.getOrCreate()
df = spark.createDataFrame(
[(1,144.5,5.9,33,"M"),
(2,167.2,5.4,45,"F"),
(3,124.1,5.2,23,"F"),
(4,144.5,5.9,33,"M"),
(5,133.2,5.7,54,"F"),
(3,124.1,5.2,23,"F"),
(5,129.2,5.3,42,"M")
],["id","weight","height","age","gender"]
)
# find out the completely duplicated rows
df.show()
print ("Count of rows {}".format(df.count()))
print ("count of distinct rows {}".format(df.distinct().count()))
df = df.dropDuplicates()
df.show()
#find the replicates compared without id
print ("count of rows {}".format(df.count()))
rs = df.select([c for c in df.columns if c != "id"]).distinct().count()
print ("count of distinct row {}".format(rs))
df = df.dropDuplicates(subset=[c for c in df.columns if c != "id"])
df.show()
#check the replicated ids
import pyspark.sql.functions as fn
df.agg(fn.count("id").alias("count"),fn.countDistinct("id").alias("distinct")).show()
#add new ids
df.withColumn("new_id",fn.monotonically_increasing_id()).show()
2. 缺失数据处理
查看什么特征存在大量缺失 根据实际意义决定排除与否
缺失数据填充
分类型变量可以添加missing 形成第三类别
连续性或数值型变量可以填充平均数 分位数等
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
spark = SparkSession.builder \
.appName("TestName") \
.getOrCreate()
df = spark.createDataFrame([
(1,143.5,5.6,28,"M",100000),
(2,167.2,5.4,45,"M",None),
(3,None,5.2,None,None,None),
(4,144.5,5.9,33,"M",None),
(5,133.2,5.7,54,"F",None),
(6,124.1,5.2,None,"F",None),
(7,129.2,5.3,42,"M",76000)],
["id","weight","height","age","gender","income"])
df.createOrReplaceTempView("df")
#show the counts of filed where data is na per row
re = df.rdd.map(lambda x:(x["id"],sum([c == None for c in x]))).collect()
print (re)
df.where("id == 3").show()
spark.sql("select * from df where id = 3").show()
#drio the income
df_noincome = df.select([c for c in df.columns if c != "income"])
df_noincome.show()
#drop the row which na fields more than a threshold
df_noincome.dropna(thresh=3).show()
#fill the na with a certain value
means = df_noincome.agg(*[fn.mean(c).alias(c) for c in df_noincome.columns if c != "gender"]).toPandas().to_dict('record')[0]
means["gender"] = "missing"
df_new = df_noincome.fillna(means).show()
3. 离群值
离群值是值那些与样本其余部分的分布显著偏离的观测数据 一般来说 如果所有的值都大致分布在Q1-1.5IQR Q3+1.5IQR 范围内,则可认为基本没有离群值 IQR是四分位范围 即上四分位与下四分位之差 也就是75分为与25分为的距离
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
spark = SparkSession.builder \
.appName("TestName") \
.getOrCreate()
df = spark.createDataFrame([
(1,143.5,5.3,28),
(2,154.2,5.5,45),
(3,342.3,5.1,99),
(4,144.5,5.5,33),
(5,133.2,5.4,54),
(6,124.1,5.1,21),
(7,129.2,5.3,42)],
["id","weight","height","age"])
cols = ["weight","height","age"]
bounds = {}
for col in cols:
quantile = df.approxQuantile(col,[0.25,0.75],0.05)
IQR = quantile[1] - quantile[0]
bounds[col] = [quantile[0] - 1.5 * IQR, quantile[1] + 1.5 * IQR]
outliers = df.select(*['id'] + [ ((df[c] < bounds[c][0]) | (df[c] > bounds[c][1])).alias(c + "_o") for c in cols])
outliers.show()
4. 特征相关性
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("TestName") \
.getOrCreate()
sc = spark.sparkContext
fraud = sc.textFile("file:///home/njliu/prc/pyspark/04/ccFraud.csv")
header = fraud.first()
fraud = fraud.filter(lambda x:x!=header) \
.map(lambda row:[int(elem) for elem in row.split(",")])
fields = [StructField(h[1:-1],IntegerType(),True) for h in header.split(",")]
schema =StructType(fields)
fraud_df = spark.createDataFrame(fraud,schema)
fraud_df.groupby("gender").count().show()
numerical = ["balance","numTrans","numIntlTrans"]
#desc = fraud_df.describe(numerical)
#desc.show()
# features correlation
n_numerical = len(numerical)
cor = []
for i in range(0,n_numerical):
temp = [None] * i
for j in range(i,n_numerical):
temp.append(fraud_df.corr(numerical[i],numerical[j]))
cor.append(temp)
print (cor)
网友评论