美文网首页
Flink中使用异步Function之AsyncDataStre

Flink中使用异步Function之AsyncDataStre

作者: 和平菌 | 来源:发表于2019-05-22 17:32 被阅读0次

Flink处理数据时候,遇到比较耗时的操作时,需要异步处理数据。
例子如下:

 DataStream<Order> asyncStream = AsyncDataStream.unorderedWait(orderStream, new RichAsyncFunction<Order, Order>() {
            public transient ThreadPoolExecutor executor;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                executor = new ThreadPoolExecutor(5, //
                        10, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());//
            }

            @Override
            public void close() throws Exception {
                super.close();
                executor.shutdownNow();
            }

           @Override
            public void timeout(SkuOrder input, ResultFuture<SkuOrder> resultFuture) {
                //超时后的处理
            }

            @Override
            public void asyncInvoke(Order input, ResultFuture<Order> resultFuture) throws Exception {
                CompletableFuture.runAsync( ()->{
                    int mills = new Random().nextInt(10);
                    System.out.println("异步处理数据:" + Thread.currentThread().getId() + "|" + JSON.toJSONString(input));
                    try {
                        TimeUnit.SECONDS.sleep(mills);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    resultFuture.complete(Collections.singleton(input));
                },executor);
            }
        },1, TimeUnit.MINUTES, 1000).setParallelism(1);

说明:
1、AsyncDataStream有2个方法,unorderedWait表示数据不需要关注顺序,处理完立即发送,orderedWait表示数据需要关注顺序,为了实现该目标,操作算子会在该结果记录之前的记录为发送之前缓存该记录。这往往会引入额外的延迟和一些Checkpoint负载,因为相比于无序模式结果记录会保存在Checkpoint状态内部较长的时间。
2、Timeout配置,主要是为了处理死掉或者失败的任务,防止资源被长期阻塞占用。
3、最后一个参数Capacity表示同时最多有多少个异步请求在处理,异步IO的方式会导致更高的吞吐量,但是对于实时应用来说该操作也是一个瓶颈。限制并发请求数,算子不会积压过多的未处理请求,但是一旦超过容量的显示会触发背压。
该参数可以不配置,但是默认是100。

相关文章

网友评论

      本文标题:Flink中使用异步Function之AsyncDataStre

      本文链接:https://www.haomeiwen.com/subject/mrmuzqtx.html