事件起因:
我司正在做tidb的调研, 需要使用tispark写数据, 核心代码如下:
sample.repartition(taskNum)
.rdd
.foreachPartition { iter =>
iter.sliding(1000, 1000).foreach { seq =>
// ...
// 此处的batchGet是将tispark中的api给暴露出来了, 本来是没有的
tiSession.createSnapshot().batchGet(...)
// ...
}
}
上述代码实在RDD
的foreachPartition
里面调用, 跑着跑着就报错误了, 主要是2类错误:
第一类: 如下,出现次数特别少
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
at java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:181)
at com.pingcap.tikv.KVClient.sendBatchGet(KVClient.java:194)
at com.pingcap.tikv.KVClient.batchGet(KVClient.java:101)
at com.pingcap.tikv.KVClient.batchGet(KVClient.java:91)
at com.pingcap.tikv.KVClient.batchGet(KVClient.java:86)
at com.pingcap.tikv.Snapshot.batchGet(Snapshot.java:71)
at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res7$3(<pastie>:68)
at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res7$3$adapted(<pastie>:61)
at $line45.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$935/1207528768.apply(Unknown Source)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:142
第二类:
container-launch failed ...大概类似这样的日志, 记不清了。。。
排查思路:
因为第一类错误出现较少,并且有比较明显的异常栈, 所以从该异常着手
看原因是nable to create new native thread
, 回想之前遇到的spark问题, 都没遇到过这样的,而且spark自己在线程使用方面也没出现过这样的问题, 那就再看看其他方面
当时排查问题的时候其实并没有仔细看堆栈, 而是凭直觉点进去看了batchGet
的源码实现
public List<KvPair> batchGet(List<ByteString> key) {
return new KVClient(session.getConf(), session.getRegionStoreClientBuilder())
.batchGet(key, timestamp.getVersion());
}
发现每次调用都会创建一个KVClient
, 就怀疑为什么不用一个对象池来处理, 那再往里面看看, 发现batchGet
的实现并没什么, 里面只是用多线程的方式去提交了请求
那回过头再看看KVClient
的构造函数
public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = conf;
this.clientBuilder = clientBuilder;
// TODO: ExecutorService executors =
// Executors.newFixedThreadPool(conf.getKVClientConcurrency());
executorService = Executors.newFixedThreadPool(20);
}
看到这里有线程池, 那有个问题这个线程池是初始化的时候就已经在建立线程了吗?
其实并不是的, 只是在真正创建的时候才会新建一个线程, 按理说我们一次请求也只会建立一个线程
再回过头去看batchGet
的源码就发现, 数据做了一些变化:
private List<KvPair> batchGet(BackOffer backOffer, Set<ByteString> keys, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}
return sendBatchGet(backOffer, batches, version);
}
private List<KvPair> sendBatchGet(BackOffer backOffer, List<Batch> batches, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
for (Batch batch : batches) {
completionService.submit(
() -> {
RegionStoreClient client = clientBuilder.build(batch.region);
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
List<ByteString> keys = batch.keys;
try {
return client.batchGet(singleBatchBackOffer, keys, version);
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
logger.warn("ReSplitting ranges for BatchPutRequest");
// recursive calls
return batchGet(singleBatchBackOffer, batch.keys, version);
}
});
}
// ...
}
从上面可以看到我们请求过去的数据其实是又做了分批的, 根据region, 这里也可以看到一个查询tikv的优化点, 导致线程数会放大, 才会出现我们这样的问题, 其实只要我记得调用close方法就啥事都没有。。。
结尾: 很多细节只是在写博客的时候才发现的, 之前问题解决的时候, 并没有把整个问题理解的很透彻
网友评论