点击上方蓝

字关注~

在做维表关联如果要求低延时,即维表数据的变更能够被立刻感知到,所以就要求在查询时没有缓存策略,直接查询数据库维表信息。
本篇以实时查询redis为例,要求redis 客户端支持异步查询,可以使用io.lettuce包,支持redis不同模式:单点模式、sentinel模式、集群模式,需要在pom中引入:

<dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.0.5.RELEASE</version>
</dependency>
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.24.Final</version></dependency>

关于其不同模式的用法可以参考:https://juejin.im/post/5d8eb73ff265da5ba5329c66
里面做了比较详细的说明,为方便测试使用单点模式,仍以广告业务为例,根据广告位ID从redis里面查询对位的广告主ID。

Redis中数据准备:

hmset 1 aid 1 cid 1
hmset 2 aid 1 cid 2

使用hash结构,key表示广告位ID、aid表示广告主ID、cid表示广告计划ID

定义RichAsyncFunction类型的RedisSide,异步查询Redis

class RedisSide extends RichAsyncFunction[AdData, AdData] {private var redisClient: RedisClient = _private var connection: StatefulRedisConnection[String, String] = _private var async: RedisAsyncCommands[String, String] = _override def open(parameters: Configuration): Unit = {val redisUri = "redis://localhost"redisClient = RedisClient.create(redisUri)connection = redisClient.connect()async = connection.async()}override def asyncInvoke(input: AdData, resultFuture: ResultFuture[AdData]): Unit = {val tid = input.tId.toStringasync.hgetall(tid).thenAccept(new Consumer[util.Map[String, String]]() {override def accept(t: util.Map[String, String]): Unit = {if (t == null || t.size() == 0) {resultFuture.complete(util.Arrays.asList(input))return}t.foreach(x => {if ("aid".equals(x._1)) {val aid = x._2.toIntvar newData = AdData(aid, input.tId, input.clientId, input.actionType, input.time)resultFuture.complete(util.Arrays.asList(newData))}})}})}//关闭资源override def close(): Unit = {if (connection != null) connection.close()if (redisClient != null) redisClient.shutdown()}}

主流程

case class AdData(aId: Int, tId: Int, clientId: String, actionType: Int, time: Long)object Demo1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val kafkaConfig = new Properties();kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");val consumer = new FlinkKafkaConsumer[String]("topic1", new SimpleStringSchema(), kafkaConfig);val ds = env.addSource(consumer).map(x => {val a: Array[String] = x.split(",")AdData(0, a(0).toInt, a(1), a(2).toInt, a(3).toLong) //默认给0})val redisSide: AsyncFunction[AdData, AdData] = new RedisSideAsyncDataStream.unorderedWait(ds, redisSide, 5L, SECONDS, 1000).print()env.execute("Demo1")}
}

测试验证
生产数据:

1,clientId1,1,1571646006000
3,clientId1,1,1571646006000

输出:

AdData(1,1,clientId1,1,1571646006000)
AdData(0,3,clientId1,1,1571646006000)

验证完毕,也算是补上维表系列里面的空缺。

关注回复Flink获取更多信息~

flink维表关联系列之Redis维表关联:实时查询相关推荐

  1. [游戏开发]Python打表工具系列 [第二篇] [打表流程描简述]

    [上一篇链接] [游戏开发]Python打表工具系列 [第一篇][IDE开发环境部署] VSCode Python环境调试_Little丶Seven的博客-CSDN博客 [前言] 第二篇文章是对流程的 ...

  2. Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化

    2019独角兽企业重金招聘Python工程师标准>>> 引子 流计算中一个常见的需求就是为数据流补齐字段.因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度 ...

  3. 刘雨昕成为Swisse斯维诗胶原蛋白系列代言人

    50年的品质坚守,50年的创新追求,健合集团旗下澳大利亚专业营养品牌Swisse斯维诗,用50年的光彩岁月刻画了一个勇于突破,奋力向前的品牌故事.今年,Swisse斯维诗直面时代挑战,对品牌内涵进行融 ...

  4. 表单二维码怎么做?二维码怎么统计信息?

    通过二维码来统计信息是现在很多企业.社区等等经常会使用的一种方法,那么如何制作可以统计信息的表单二维码图片呢?下面教大家使用二维码生成器来在线制作二维码图片的方法,多种场景应用二维码在线生成,比如图片 ...

  5. Matlab系列之二维图形(上)

    Matlab系列之二维图形(上) 简要 绘制基础 plot 文字标注 (1) 添加图形标题 (2)添加坐标轴标注 (3)图例 (4)文本注释 示例 程序 结果 线型.点型和颜色 坐标轴设置 示例 结果 ...

  6. 方维分享系统,品牌无法设置分类关联

    方维分享系统,品牌无法设置分类关联 店铺分类那里不能设置分类关联 这样去修改,这是官方给出的方案,是方维3.0的一个bug: ALTER TABLE  `%DB_PREFIX%taobao_shop` ...

  7. redis分表_《面试官系列:深入数据库分区分库分表》

    一.为什么要分库分表 软件时代,传统应用都有这样一个特点:访问量.数据量都比较小,单库单表都完全可以支撑整个业务.随着互联网的发展和用户规模的迅速扩大,对系统的要求也越来越高.因此传统的MySQL单库 ...

  8. html 二维表_焦虑症自测表,自测你的焦虑程度

    SAS焦虑症自测表 焦虑,大概已经成为了当下最显著的一种时代病. 好像也没什么突发事件,怎么就焦虑得睡不好觉? 明明吃穿不愁了,为什么还是莫名地担心忧虑? 其实自己知道外面很安全,可是为什么一到公共场 ...

  9. windows服务器日志文件定期清理,运维编排场景系列-----定时清理Windows服务器日志...

    本文介绍在运维编排OOS的控制台,通过OOS服务下的定时运维功能,定时执行一个功能性模版,实现某些需定时管理服务器或定时管理其它服务的需求. 应用背景 运行中的实例内部运行了很多服务程序,随时间的推移 ...

最新文章

  1. socket 读取 所有 数据 java_Java Socket 读取服务器端返回数据
  2. 大道至简阅读笔记02
  3. 没有可用于当前位置的源代码
  4. Confluence 6 设置 Oracle 数据库准备
  5. TOMCAT websocket 多连接内存泄漏与jetty对比分析
  6. hazelcast入门教程_Hazelcast入门指南第4部分
  7. 字符变量赋值规则_Java的常量、变量、数据类型(基础篇二)
  8. cocos2d-x性能优化的那些事
  9. c++ mysql 导入sql_mysql导入sql文件命令和mysql远程登陆使用详解
  10. python查单词音标_有没有通过读音或音标就能查出英语单词的办法,比如发音查词软件?...
  11. Anti-aliasing and Continuity with Trapezoidal Shadow Maps
  12. 合上笔记本屏幕 Ubuntu 20.04 不休眠
  13. 揭秘 Cortex-A55,为何它是对未来数字世界举足轻重的处理器?
  14. 达梦数据库 年月周查询
  15. 用mac的chrome浏览器调试 Android 手机的网页
  16. kali中文输入法的安装
  17. 远程连接mscs下oracle,oracle10G_windows_MSCS_双机安装
  18. php往文件里面写入数据,PHP创建文件及写入数据(覆盖写入,追加写入)的方法详解...
  19. [原创]K8Cscan插件之C段旁站扫描\子域名扫描
  20. 关于接口的规范和文档总结

热门文章

  1. 管理综合联考可以用计算机嘛,考试可以带计算器吗?可以带草稿纸吗?
  2. Alias.StudioTools.Techniques.Art.To.Part
  3. Denso 盘点机应用介绍
  4. android 乱码问题
  5. python大气模型算法_关于预测空气质量机器学习哪个算法简单?
  6. 「自控原理」2.4 信号流图与梅逊公式、闭环传递函数
  7. 路漫漫其修远兮,吾将上下而求索——《深入浅出MFC》读后感
  8. python旅游小项目
  9. oracle 动态条件查询语句,教您Oracle动态查询语句的用法
  10. java每个类都定义构造方法吗,Java中的每个类都至少有一个构造方法,一个类中如果没有定义构造方法,系统会自动为这个类创建一个默认的构造方法。()...