美文网首页
Spark误用client导致`unable to create

Spark误用client导致`unable to create

作者: 天之見證 | 来源:发表于2020-02-26 22:04 被阅读0次

事件起因:

我司正在做tidb的调研, 需要使用tispark写数据, 核心代码如下:

sample.repartition(taskNum)
  .rdd
  .foreachPartition { iter =>
     iter.sliding(1000, 1000).foreach { seq =>
       // ...
       // 此处的batchGet是将tispark中的api给暴露出来了, 本来是没有的
       tiSession.createSnapshot().batchGet(...)
       // ...
     }
}

上述代码实在RDDforeachPartition 里面调用, 跑着跑着就报错误了, 主要是2类错误:

第一类: 如下,出现次数特别少

java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
    at java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:181)
    at com.pingcap.tikv.KVClient.sendBatchGet(KVClient.java:194)
    at com.pingcap.tikv.KVClient.batchGet(KVClient.java:101)
    at com.pingcap.tikv.KVClient.batchGet(KVClient.java:91)
    at com.pingcap.tikv.KVClient.batchGet(KVClient.java:86)
    at com.pingcap.tikv.Snapshot.batchGet(Snapshot.java:71)
    at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res7$3(<pastie>:68)
    at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res7$3$adapted(<pastie>:61)
    at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$935/1207528768.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:142

第二类:

container-launch failed ...大概类似这样的日志, 记不清了。。。

排查思路:

因为第一类错误出现较少,并且有比较明显的异常栈, 所以从该异常着手

看原因是nable to create new native thread, 回想之前遇到的spark问题, 都没遇到过这样的,而且spark自己在线程使用方面也没出现过这样的问题, 那就再看看其他方面

当时排查问题的时候其实并没有仔细看堆栈, 而是凭直觉点进去看了batchGet的源码实现

public List<KvPair> batchGet(List<ByteString> key) {
  return new KVClient(session.getConf(), session.getRegionStoreClientBuilder())
      .batchGet(key, timestamp.getVersion());
}

发现每次调用都会创建一个KVClient, 就怀疑为什么不用一个对象池来处理, 那再往里面看看, 发现batchGet的实现并没什么, 里面只是用多线程的方式去提交了请求

那回过头再看看KVClient的构造函数

public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
  Objects.requireNonNull(conf, "conf is null");
  Objects.requireNonNull(clientBuilder, "clientBuilder is null");
  this.conf = conf;
  this.clientBuilder = clientBuilder;
  // TODO: ExecutorService executors =
  // Executors.newFixedThreadPool(conf.getKVClientConcurrency());
  executorService = Executors.newFixedThreadPool(20);
}

看到这里有线程池, 那有个问题这个线程池是初始化的时候就已经在建立线程了吗?

其实并不是的, 只是在真正创建的时候才会新建一个线程, 按理说我们一次请求也只会建立一个线程

再回过头去看batchGet的源码就发现, 数据做了一些变化:

private List<KvPair> batchGet(BackOffer backOffer, Set<ByteString> keys, long version) {
  Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
  List<Batch> batches = new ArrayList<>();

  for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
    appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
  }
  return sendBatchGet(backOffer, batches, version);
}

private List<KvPair> sendBatchGet(BackOffer backOffer, List<Batch> batches, long version) {
  ExecutorCompletionService<List<KvPair>> completionService =
      new ExecutorCompletionService<>(executorService);
  for (Batch batch : batches) {
    completionService.submit(
        () -> {
          RegionStoreClient client = clientBuilder.build(batch.region);
          BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
          List<ByteString> keys = batch.keys;
          try {
            return client.batchGet(singleBatchBackOffer, keys, version);
          } catch (final TiKVException e) {
            // TODO: any elegant way to re-split the ranges if fails?
            singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
            logger.warn("ReSplitting ranges for BatchPutRequest");
            // recursive calls
            return batchGet(singleBatchBackOffer, batch.keys, version);
          }
        });
  }
  // ...
}

从上面可以看到我们请求过去的数据其实是又做了分批的, 根据region, 这里也可以看到一个查询tikv的优化点, 导致线程数会放大, 才会出现我们这样的问题, 其实只要我记得调用close方法就啥事都没有。。。

结尾: 很多细节只是在写博客的时候才发现的, 之前问题解决的时候, 并没有把整个问题理解的很透彻

相关文章

网友评论

      本文标题:Spark误用client导致`unable to create

      本文链接:https://www.haomeiwen.com/subject/lbsgchtx.html