目录

Redis

API

使用RedisCommand设置数据结构类型时和redis结构对应关系

需求

代码实现


Redis

API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

Apache Flink Streaming Connector for Redis

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示

1.getCommandDescription() :

设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型

2.String getKeyFromData(T data):

设置value 中的键值对key的值

3.String getValueFromData(T data);

设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系

Data Type

Redis Command [Sink]

HASH

HSET

LIST

RPUSH, LPUSH

SET

SADD

PUBSUB

PUBLISH

STRING

SET

HYPER_LOG_LOG

PFADD

SORTED_SET

ZADD

SORTED_SET

ZREM

需求

将Flink集合中的数据通过自定义Sink保存到Redis

代码实现

package cn.it.connectors;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* 需求:* 接收消息并做WordCount,* 最后将结果保存到Redis* 注意:存储到Redis的数据结构:使用hash也就是map* key            value* WordCount    (单词,数量)*/
public class ConnectorsDemo_Redis {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割并记为1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分组KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.Sinkresult.print();// * 最后将结果保存到Redis// * 注意:存储到Redis的数据结构:使用hash也就是map// * key            value// * WordCount      (单词,数量)//-1.创建RedisSink之前需要创建RedisConfig//连接单机版RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();//连接集群版Redis//HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));//FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();//连接哨兵版Redis//Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));//FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();//-3.创建并使用RedisSinkresult.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));//5.executeenv.execute();}/*** -2.定义一个Mapper用来指定存储到Redis中的数据结构*/public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WordCount");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}
}

2021年大数据Flink(十六):流批一体API Connectors ​​​​​​​​​​​​​​Redis相关推荐

  1. 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】

    导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...

  2. 大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  3. pb数据窗口怎么调用视图_大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  4. 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

    目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...

  5. 2021年大数据Flink(十四):流批一体API Connectors JDBC

    目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector 代码演示 package ...

  6. 2021年大数据Flink(六):Flink On Yarn模式

    目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...

  7. 大数据Flink(十四):流批一体API Connectors JDBC

    文章目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector

  8. 2021年大数据Flink(十):流处理相关概念

    目录 流处理相关概念 数据的时效性 ​​​​​​​流处理和批处理 ​​​​​​​流批一体API DataStream API 支持批执行模式 API 编程模型 ​​​​​​​流处理相关概念 数据的时效 ...

  9. 2021年大数据Kafka(六):❤️安装Kafka-Eagle❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装Kafka-Eagle 一.Kafka-eagle基本介 ...

最新文章

  1. 鲁棒,抗遮挡的对柔性手抓取的物体6D姿态估计
  2. 对象的引用和clone
  3. java的reflection
  4. python炫酷特效代码_推荐几个炫酷的 Python 开源项目
  5. fortinate防火墙使用本地用户三步开通PPTP ***
  6. mysql多源复制 知乎_MySQL多主一从(多源复制)同步配置
  7. VB.NET工作笔记004---认识wsf文件
  8. OpenCV编程-无法解析的外部符号 void __cdecl cv::cvtColor
  9. cisco初级随堂笔记1
  10. kafka安装完整步骤_还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家...
  11. 计算机联锁设备施工论文,毕业设计论文-计算机联锁设计
  12. 33.iptables备份与恢复 firewalld的9个zone以及操作 service的操作
  13. GoLang之什么是workstealing(5)
  14. php 与 html 的混合编程
  15. paip 刮刮卡砸金蛋抽奖概率算法跟核心流程
  16. html怎么放边框,html怎么设置边框
  17. 云计算的认识和看法_我的关于云计算的看法和认识
  18. pandas如何统计均线、移动平均线的方法rolling总结
  19. 数据可视化系列-01大数据可视化基础
  20. MySee创业团队:舍我其谁

热门文章

  1. 使用rancher对Docker容器服务升级
  2. 2022-2028年中国分散式风电行业投资分析及前景预测报告
  3. Vue 自定义权限指令
  4. dataframe,python,numpy 问题索引1
  5. Pytorch 多 GPU 并行处理机制
  6. 命名实体识别入门教程(必看)
  7. 缓存击穿、缓存穿透、缓存雪崩
  8. NVIDIA GPU上的Tensor线性代数
  9. Auto ML自动特征工程
  10. Python MemoryError 问题