
  • 一、目标
  • 二、方案
    • 2.1、 自己实现demo:
    • 2.2、 社区实现:


希望使用flink sql来关联维度表,但是想用异步IO的方式关联。


当前社区进展:目前Flink SQL 中的connector都没实现异步io关联维表,但是接口是已经支持了的,可以自定义实现;HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用。

2.1、 自己实现demo:
    /*** An async lookup function which find matched rows with the given fields. NOTE: We have to* declare it as public because it will be used in code generation.*/public static class AsyncTestValueLookupFunction extends AsyncTableFunction<Row> {private static final long serialVersionUID = 1L;private final Map<Row, List<Row>> mapping;private transient boolean isOpenCalled = false;private transient ExecutorService executor;protected AsyncTestValueLookupFunction(Map<Row, List<Row>> mapping) {this.mapping = mapping;}@Overridepublic void open(FunctionContext context) throws Exception {RESOURCE_COUNTER.incrementAndGet();isOpenCalled = true;executor = Executors.newSingleThreadExecutor();}public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... inputs) {checkArgument(isOpenCalled, "open() is not called.");final Row key = Row.of(inputs);if (Arrays.asList(inputs).contains(null)) {throw new IllegalArgumentException(String.format("Lookup key %s contains null value, which should not happen.",key));}CompletableFuture.supplyAsync(() -> {List<Row> list = mapping.get(key);if (list == null) {return Collections.<Row>emptyList();} else {return list;}},executor).thenAccept(resultFuture::complete);}@Overridepublic void close() throws Exception {RESOURCE_COUNTER.decrementAndGet();if (executor != null && !executor.isShutdown()) {executor.shutdown();}}}
2.2、 社区实现:


