美文网首页spark相关
关于spark往elasticsearch等外部源输出的问题

关于spark往elasticsearch等外部源输出的问题

作者: 黄药师ii | 来源:发表于2019-02-21 11:42 被阅读0次

一, 对于spark流的操作
1, spark流是由很多rdd组成的, 所以要调用foreachRDD
2, spark流是由很多rdd组成, 而这些rdd是被rdd action惰性执行的, 所以在里面定义的资源也惰性加载, 性能才会好.

******spark流输出es******

dstream.foreachRDD(rdd -> {
  //这里是drive空间, 初始化了没用
  rdd.foreachPartition(partitionOfRecords -> {
    ESLazyHolder.esurl = esurl;
    while (partitionOfRecords.hasNext()) {
      TransportClient client = ESLazyHolder.getInstance();
      partitionOfRecords.next();
      ......
    }
  });
});

******es惰性加载的类******

public class ESLazyHolder
{
    public volatile static String esurl;

    private static class LazyHolder
    {
        private final static transient TransportClient client = ESUtil.getClient(esurl);
        static
        {
            SysUtil.addhook(client);
        }
    }

    public static final TransportClient getInstance()
    {
        return LazyHolder.client;
    }
}

二, 对于rdd操作就更简单了, 不需要foreachRDD, 也不需要惰性加载

  rdd.foreachPartition(partitionOfRecords -> {
    if (partitionOfRecords.hasNext())
      try (final TransportClient client = ESUtil.getClient(esurl);)
      {
        while (partitionOfRecords.hasNext())
        {
          partitionOfRecords.next();
          ......
        }
      }
  });

后经测试, rdd在10个分片的情况下, 只初始化了7次, 实际用惰性加载在rdd的情况下也有更好的性能.

相关文章

网友评论

    本文标题:关于spark往elasticsearch等外部源输出的问题

    本文链接:https://www.haomeiwen.com/subject/fbkryqtx.html