通过使用 Flink DataStream Connectors 数据流连接器连接到 Redis 缓存数据库,并提供数据流输入与输出操作;

示例环境

java.version: 1.8.x
flink.version: 1.11.1
redis:3.2

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.redis;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;/*** @Description 从redis中读取数据输出到DataStream数据流中*/publicclass DataStreamSource {/*** 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/*/publicstaticvoid main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String key = "props";//实现RichSourceFunction抽象方法,加载数据源数据到流中DataStream<Tuple2<String, String>> dataStream = env.addSource(new RichSourceFunction<Tuple2<String, String>>(){private JedisPool jedisPool = null;@Overridepublicvoid run(SourceContext<Tuple2<String, String>> ctx) throws Exception {jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379, Protocol.DEFAULT_TIMEOUT);Jedis jedis = jedisPool.getResource();try{ctx.collect(Tuple2.of(key, jedis.get(key)));}catch(Exception e){e.printStackTrace();}finally{if (jedis != null){//用完即关,内部会做判断,如果存在数据源与池,则回滚到池中jedis.close();}}}@Overridepublicvoid cancel() {try {super.close();}catch(Exception e){}if (jedisPool != null){jedisPool.close();jedisPool = null;}}});dataStream.print();env.execute("flink redis source");}}

数据流输出

DataStreamSink.java

package com.flink.examples.redis;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Description 将数据流写入到redis中*/publicclass DataStreamSink {/*** 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/*/publicstaticvoid main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//1.写入数据到流中String [] words = newString[]{"props","student","build","name","execute"};DataStream<Tuple2<String, Integer>> sourceStream = env.fromElements(words).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String v) throws Exception {return Tuple2.of(v, RandomUtils.nextInt(1000, 9999));}});sourceStream.print();//2.实例化FlinkJedisPoolConfig 配置redisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();//3.写入到redis,实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redissourceStream.addSink(new RedisSink<>(conf, new RedisMapper<Tuple2<String, Integer>>(){@Overridepublic RedisCommandDescription getCommandDescription() {returnnew RedisCommandDescription(RedisCommand.SET, null);//通过实例化传参,设置hash值的key//return new RedisCommandDescription(RedisCommand.HSET, key);}@OverridepublicString getKeyFromData(Tuple2<String, Integer> tuple2) {return tuple2.f0;}@OverridepublicString getValueFromData(Tuple2<String, Integer> tuple2) {return tuple2.f1.toString();}}));env.execute("flink redis sink");}}

数据展示

Flink 系例 之 Connectors 连接 Redis相关推荐

  1. Flink 系例 之 Connectors 连接 Kafka

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  2. Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  3. Flink 系例 之 Connectors 连接 MySql

    通过使用 Flink DataStream Connectors 数据流连接器连接到 Mysql 数据源,并基于 JDBC 提供数据流输入与输出操作: 示例环境 java.version: 1.8.x ...

  4. Flink 系例 之 CountWindowAll

    countWindowAll 数量窗口 (不分区数量滚动窗口[滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠]) 示例环境 java.version: 1.8.x ...

  5. Flink 系例 之 Project

    Project算子:从数据流的元数组中,重新排例参数并指定不同的下标位,返回新的数据流 示例环境 java.version: 1.8.x flink.version: 1.11.1 示例数据源 (项目 ...

  6. Flink 系例 之 DataStream Connectors 与 示例模块

    官方介绍  Flink 中的 API Flink 为流式 / 批式处理应用程序的开发提供了不同级别的抽象. Flink API 最底层的抽象为有状态实时流处理.其抽象实现是 Process Funct ...

  7. Flink的Sink_API_Demo (kafka sink 、redis sink 、es sink)

    文章目录 pom文件说明 说明 必要的前提 下面的代码,启动后手动往topic打入数据,该程序读出来,逻辑是通过split分割(盖戳),然后select分流,取applestream写入kafka指定 ...

  8. Jedis连接Redis集群

    连接集群版 把jedis依赖的jar包 添加到工程中 //连接redis集群 @Test public void testJedisCluster() throws Exception {//创建一个 ...

  9. java连接redis存取数据(详细)

    声明:本文章仅供参考,学无止境,若有不足之处请指出,非常感谢! 源代码+相关工具下载:https://download.csdn.net/download/corleone_4ever/1081125 ...

最新文章

  1. 一种新的攻击方式:使用Outlook 表单进行横向渗透和常驻
  2. get 和 post
  3. Kafka Sender线程如何发送数据
  4. 复习笔记(四)——C++内联函数
  5. Test Report
  6. nedc和epa续航里程什么意思_NEDC、WLTP和EPA续航里程标准谁最真实?看比亚迪工程师怎么说...
  7. php maximum,解决PHP程序运行时:Fatal error: Maximum execution time of 30 seconds exceeded in的错误提示...
  8. mysql 4升级,MySQL_Sql_打怪升级_进阶篇_进阶4:常见函数
  9. 09_期望极大法EM2_统计学习方法
  10. 数据可视化常用LED字体
  11. 16个最佳软件配置管理工具
  12. 太宰治小说《人间失格》读后感及txt、epub、mobil电子图书下载
  13. 从《计算机网络》到TCP/IP
  14. 串口转以太网项目开发(1)-- 修改默认的设备树文件
  15. 【数据挖掘】金融风控 Task02 数据分析
  16. Task10 BERT
  17. SAP 内向交货单介绍
  18. 瑞星2008免费版下载
  19. php 心愿墙系统源码,php开发表白墙 |源码|微信表白|微信表白墙|吐槽墙|心愿墙|网站留言板源码...
  20. UDP-B-L-阿拉伯糖二钠盐,UDP-b-L-arabinopyranose disodium salt,15839-78-8

热门文章

  1. servers split sql_SQL中实现SPLIT函数几种方法总结(必看篇)
  2. 刘汝佳《算法竞赛入门经典(第二版)》习题(三)
  3. 列表查询java_查询订单列表示例代码
  4. pycharm的debug调试指定循环次数进入断点调试
  5. 【企业架构设计实战】大数据架构最佳实践
  6. sqlsever2000和mysql_数据库迁移 SQLServer2000到MYsql
  7. element ui el-time-picker 时间选择其组件的坑点记录
  8. 毕设做好了,论文怎么办?关于论文我不得不说的几个问题
  9. mysql 主从ppt_mysql主从配置
  10. 计算机专硕都发什么论文,比较好写的计算机研究生发论文题目 计算机研究生发论文标题怎样定...