最近的一个项目,需求是首先把如下一张表实现由窄表转宽表的透视效果,然后再和另一个表做一个两表按主键的merge操作。

主要麻烦的地方在上面这张表的透视上。
透视以后的效果应该是
country_code,device_code,value1_value2,value1_value3,value2_value2,value2_value3,value3_value2,value3_value3
US,ios-all,....
CN,ios-all,...
其中 value1_value2,是value字段的值为1时,value2的值。
value1_value3,是value字段的值为1时,value3的值。
依次类推。
其实用pandas或者pyspark比较简单的方式是按照value的值对value2/value3做两次透视,然后做一次按主键的join。
但是我开始没想到。
然后就写出了以下方式的两种代码。推荐使用pyarrow+pandas方式的,不需要groupby。也不需要自己写python的循环和if-else。效率应该高点。
代码如下,
PyArrow + Pandas实现方式
arrow_agg_test.py
import pandas as pd
from pyarrow import dataset as ds
def test_csv_agg_list():
ds_ = ds.dataset("./data.csv", format="csv")
def get_projection_tb(col):
value2_col = "value{}_value2".format(col)
value3_col = "value{}_value3".format(col)
projection = {
"country_code": ds.field("country_code"),
"device_code": ds.field("device_code"),
value2_col: ds.field("value2"),
value3_col: ds.field("value3"),
"value": ds.field("value")
}
value_col_tb = ds_.to_table(columns=projection, filter=ds.field("value")==col)
return value_col_tb.to_pandas().drop(["value"], axis=1).reset_index(drop=True)
sub_tb_list = list()
for i in range(1, 5):
sub_tb_list.append(get_projection_tb(i))
for i in range(0, len(sub_tb_list)-1):
if i == 0:
final_tb = pd.merge(sub_tb_list[0], sub_tb_list[1], on=["country_code", "device_code"])
else:
final_tb = pd.merge(final_tb, sub_tb_list[i+1], on=["country_code", "device_code"])
print(final_tb.head())
纯Pandas实现方式,
pandas_agg_test.py
import pandas as pd
def test_csv_agg_list():
#显示所有列
pd.set_option('display.max_columns', None)
#显示所有行
pd.set_option('display.max_rows', None)
#设置value的显示长度为100,默认为50
pd.set_option('max_colwidth',4000)
## 做 groupby 和 flattern 拍平
df = pd.read_csv("./data.csv")
res_df = df.groupby(["country_code", "device_code"]).agg(lambda x: list(x)).reset_index()
print(res_df)
appended_cols = {
"value1_value2":[],
"value2_value2":[],
"value3_value2":[],
"value4_value2":[],
"value1_value3":[],
"value2_value3":[],
"value3_value3":[],
"value4_value3":[]
}
for _, row_ in res_df.iterrows():
idx = 0
for val_ in row_["value"]:
if val_ == 1:
appended_cols["value1_value2"].append(row_["value2"][idx])
appended_cols["value1_value3"].append(row_["value3"][idx])
elif val_ == 2:
appended_cols["value2_value2"].append(row_["value2"][idx])
appended_cols["value2_value3"].append(row_["value3"][idx])
elif val_ == 3:
appended_cols["value3_value2"].append(row_["value2"][idx])
appended_cols["value3_value3"].append(row_["value3"][idx])
elif val_ == 4:
appended_cols["value4_value2"].append(row_["value2"][idx])
appended_cols["value4_value3"].append(row_["value3"][idx])
idx += 1
for key, vals in appended_cols.items():
res_df[key] = vals
res_df = res_df.drop(["value", "value2", "value3"], axis=1).reset_index(drop=True)
print(res_df)
## 做join 取另一个dataframe的值
right_df = pd.read_csv("./data1.csv").reset_index(drop=True)
print(right_df)
final_df = pd.merge(res_df, right_df, on=["country_code", "device_code"])
## To & from json testing
js = final_df.to_dict()
print(js)
res = pd.DataFrame(js)
print(res)
程序输出如下,

网友评论