一, 对于spark流的操作
1, spark流是由很多rdd组成的, 所以要调用foreachRDD
2, spark流是由很多rdd组成, 而这些rdd是被rdd action惰性执行的, 所以在里面定义的资源也惰性加载, 性能才会好.
******spark流输出es******
dstream.foreachRDD(rdd -> {
//这里是drive空间, 初始化了没用
rdd.foreachPartition(partitionOfRecords -> {
ESLazyHolder.esurl = esurl;
while (partitionOfRecords.hasNext()) {
TransportClient client = ESLazyHolder.getInstance();
partitionOfRecords.next();
......
}
});
});
******es惰性加载的类******
public class ESLazyHolder
{
public volatile static String esurl;
private static class LazyHolder
{
private final static transient TransportClient client = ESUtil.getClient(esurl);
static
{
SysUtil.addhook(client);
}
}
public static final TransportClient getInstance()
{
return LazyHolder.client;
}
}
二, 对于rdd操作就更简单了, 不需要foreachRDD, 也不需要惰性加载
rdd.foreachPartition(partitionOfRecords -> {
if (partitionOfRecords.hasNext())
try (final TransportClient client = ESUtil.getClient(esurl);)
{
while (partitionOfRecords.hasNext())
{
partitionOfRecords.next();
......
}
}
});
后经测试, rdd在10个分片的情况下, 只初始化了7次, 实际用惰性加载在rdd的情况下也有更好的性能.
网友评论