2021年大数据Flink(十六):流批一体API Connectors Redis
目录
Redis
API
使用RedisCommand设置数据结构类型时和redis结构对应关系
需求
代码实现
Redis
API
通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。
Apache Flink Streaming Connector for Redis
RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示
1.getCommandDescription() :
设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
2.String getKeyFromData(T data):
设置value 中的键值对key的值
3.String getValueFromData(T data);
设置value 中的键值对value的值
使用RedisCommand设置数据结构类型时和redis结构对应关系
Data Type |
Redis Command [Sink] |
HASH |
HSET |
LIST |
RPUSH, LPUSH |
SET |
SADD |
PUBSUB |
PUBLISH |
STRING |
SET |
HYPER_LOG_LOG |
PFADD |
SORTED_SET |
ZADD |
SORTED_SET |
ZREM |
需求
将Flink集合中的数据通过自定义Sink保存到Redis
代码实现
package cn.it.connectors;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* 需求:* 接收消息并做WordCount,* 最后将结果保存到Redis* 注意:存储到Redis的数据结构:使用hash也就是map* key value* WordCount (单词,数量)*/
public class ConnectorsDemo_Redis {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割并记为1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分组KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.Sinkresult.print();// * 最后将结果保存到Redis// * 注意:存储到Redis的数据结构:使用hash也就是map// * key value// * WordCount (单词,数量)//-1.创建RedisSink之前需要创建RedisConfig//连接单机版RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();//连接集群版Redis//HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));//FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();//连接哨兵版Redis//Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));//FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();//-3.创建并使用RedisSinkresult.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));//5.executeenv.execute();}/*** -2.定义一个Mapper用来指定存储到Redis中的数据结构*/public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WordCount");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}
}
2021年大数据Flink(十六):流批一体API Connectors Redis相关推荐
- 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】
导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...
- 大数据架构如何做到流批一体?
阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...
- pb数据窗口怎么调用视图_大数据架构如何做到流批一体?
阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...
- 2021年大数据Flink(十五):流批一体API Connectors Kafka
目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...
- 2021年大数据Flink(十四):流批一体API Connectors JDBC
目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector 代码演示 package ...
- 2021年大数据Flink(六):Flink On Yarn模式
目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...
- 大数据Flink(十四):流批一体API Connectors JDBC
文章目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector
- 2021年大数据Flink(十):流处理相关概念
目录 流处理相关概念 数据的时效性 流处理和批处理 流批一体API DataStream API 支持批执行模式 API 编程模型 流处理相关概念 数据的时效 ...
- 2021年大数据Kafka(六):❤️安装Kafka-Eagle❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装Kafka-Eagle 一.Kafka-eagle基本介 ...
最新文章
- 鲁棒,抗遮挡的对柔性手抓取的物体6D姿态估计
- 对象的引用和clone
- java的reflection
- python炫酷特效代码_推荐几个炫酷的 Python 开源项目
- fortinate防火墙使用本地用户三步开通PPTP ***
- mysql多源复制 知乎_MySQL多主一从(多源复制)同步配置
- VB.NET工作笔记004---认识wsf文件
- OpenCV编程-无法解析的外部符号 void __cdecl cv::cvtColor
- cisco初级随堂笔记1
- kafka安装完整步骤_还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家...
- 计算机联锁设备施工论文,毕业设计论文-计算机联锁设计
- 33.iptables备份与恢复 firewalld的9个zone以及操作 service的操作
- GoLang之什么是workstealing(5)
- php 与 html 的混合编程
- paip 刮刮卡砸金蛋抽奖概率算法跟核心流程
- html怎么放边框,html怎么设置边框
- 云计算的认识和看法_我的关于云计算的看法和认识
- pandas如何统计均线、移动平均线的方法rolling总结
- 数据可视化系列-01大数据可视化基础
- MySee创业团队:舍我其谁