Flink Redis Sink
文章目录
- 官方API
- 自定义Redis Sink
官方API
flink提供了专门操作redis的Redis Sink
依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
类
Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:
类 | 场景 | 备注 |
---|---|---|
FlinkJedisPoolConfig | 单Redis服务器 | 适用于本地、测试场景 |
FlinkJedisClusterConfig | Redis集群 | |
FlinkJedisSentinelConfig | Redis哨兵 |
使用
Redis Sink 核心类是 RedisMappe 是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
RedisMapper
public interface RedisMapper<T> extends Function, Serializable {/*** 设置使用的redis数据结构类型,和key的名词* 通过RedisCommand设置数据结构类型* Returns descriptor which defines data type.** @return data type descriptor*/RedisCommandDescription getCommandDescription();/*** 设置value中的键值对 key的值* Extracts key from data.** @param data source data* @return key*/String getKeyFromData(T data);/*** 设置value中的键值对 value的值* Extracts value from data.** @param data source data* @return value*/String getValueFromData(T data);
}
RedisCommand
使用RedisCommand设置数据结构类型时和redis结构对应关系。
Data Type | Redis Command [Sink] |
---|---|
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
Demo
package com.yljphp.demo.sinkimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.StringUtilsobject RedisBasicApp extends App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val text: DataStream[String] = env.socketTextStream("localhost", 9000)val data: DataStream[(String, String)] = text.filter(!StringUtils.isNullOrWhitespaceOnly(_)).flatMap(_.split(",")).map(("bin_test", _)).map(x=>{println(x)x})val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()val redisSink = new RedisSink[(String,String)](config,new MyRedisMapper)data.addSink(redisSink)env.execute("sink_demo")
}class MyRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.SADD, "bin_test")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}
nc -lk 9000
输入111,222,333
,如下插入成功了。
127.0.0.1:6379> MONITOR
OK
1603866672.023727 [0 127.0.0.1:63764] "ECHO" "Test"
1603866685.772580 [0 127.0.0.1:63769] "SADD" "bin_test" "111"
1603866685.779417 [0 127.0.0.1:63769] "SADD" "bin_test" "222"
1603866685.779589 [0 127.0.0.1:63769] "SADD" "bin_test" "333"~/software/redis-6.0.8/src ./redis-cli
127.0.0.1:6379> smembers bin_test
1) "111"
2) "222"
3) "333"
使用 Redis Cluster:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
使用 Redis Sentinel:
FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build();DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
参考:Flink Redis Connector
自定义Redis Sink
Flink提供的Redis Sink API操作比较简单不过限制比较多,很多redis的操作不支持,可以用自定义Redis Sink,也比较简单。
依赖
依赖只需要引入jedis依赖即可。
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version>
</dependency>
Redis Sink类
package sinktoredis;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;/*** TODO 自定义 Redis Sink** @author liuzebiao* @Date 2020-2-18 10:26*/
public class MyRedisSink extends RichSinkFunction<Tuple3<String, String, Long>> {private transient Jedis jedis;@Overridepublic void open(Configuration config) {// String host = parameters.getRequired("redis.host");String host = "127.0.0.1";
// String password = parameters.get("redis.password", "");Integer port = 6379;Integer timeout = 5000;jedis = new Jedis(host, port, timeout);}@Overridepublic void invoke(Tuple3<String, String, Long> value, Context context) throws Exception {if (!jedis.isConnected()) {jedis.connect();}//保存jedis.hset(value.f0, value.f1, String.valueOf(value.f2));}@Overridepublic void close() throws Exception {jedis.close();}
}
Test测试类
package sinktoredis;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test {public static void main(String[] args) throws Exception {// step1:获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// step2:read data// math,zhangsan,88// math,lisi,60// math,wangwu,100DataStreamSource<String> data = env.socketTextStream("localhost", 6666);// step3:transformdata.filter(t -> t.split(",").length > 2).map(new MapFunction<String, Tuple3<String, String, Long>>() {@Overridepublic Tuple3<String, String, Long> map(String value) throws Exception {return new Tuple3<String, String, Long>(value.split(",")[0],value.split(",")[1],Long.parseLong(value.split(",")[2]));}}).addSink(new MyRedisSink());env.execute("Test");}
}
Cluster版本
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;/*** @Author: zhangbin19* @Date: 2020/10/27 4:03 PM* @Usage:*/
public class Redis extends BaseOutputstream {private static Logger logger = LoggerFactory.getLogger(Redis.class);private static final long serialVersionUID = 1L;// private transient Jedis jedis;private transient JedisCluster cluster;private transient JedisPoolConfig jedisPoolConfig;private Map config;
// private String host;Set<HostAndPort> nodes = new HashSet<>();
// private Integer port;private Integer connTimeout;private Integer soTimeout;private Integer maxAttempts;private String password;private Integer maxIdle;private Integer maxTotal;@Overridepublic void init(Map config) throws Exception {log.info(Redis.class.getCanonicalName() + " init config:" + config);System.out.println("===============redis init begin============");this.config = config;if (config == null) {throw new Exception("init Hbase config is null");}if (!config.containsKey("host")) {throw new Exception("redis config host is not exist");}
// if (!config.containsKey("port")) {// throw new Exception("redis config port is not exist");
// }if (!config.containsKey("connTimeout")) {throw new Exception("redis config connTimeout is not exist");}String hostStr = String.valueOf(config.get("host"));String[] hostArr = hostStr.split(",");if (hostArr == null || hostArr.length < 1) {throw new Exception("redis config host is not exist");}for (String host : hostArr) {String[] arr = host.split(":");if (arr == null || arr.length < 2) {throw new Exception("redis config host is error");}this.nodes.add(new HostAndPort(arr[0],Integer.parseInt(arr[1])));}
// this.port = Integer.parseInt(String.valueOf(config.get("port")));this.connTimeout = Integer.parseInt(String.valueOf(config.getOrDefault("connTimeout", "5000")));this.soTimeout = Integer.parseInt(String.valueOf(config.getOrDefault("soTimeout", "3000")));this.maxAttempts = Integer.parseInt(String.valueOf(config.getOrDefault("maxAttempts", "3")));this.password = String.valueOf(config.getOrDefault("password",""));this.jedisPoolConfig = new JedisPoolConfig();this.maxIdle = (Integer) config.getOrDefault("maxIdle", "8");this.maxTotal = (Integer) config.getOrDefault("maxTotal", "8");// 最大空闲连接数, 默认8个jedisPoolConfig.setMaxIdle(maxIdle);// 最大连接数, 默认8个jedisPoolConfig.setMaxTotal(maxTotal);
// jedis = new Jedis(host, port, timeout);cluster = new JedisCluster(nodes, connTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);if (cluster == null) {System.out.println("jedis is null!!! in init");}System.out.println("===============redis init end============");}@Overridepublic void excute(Map event) throws IOException {if (jedisPoolConfig == null) {jedisPoolConfig = new JedisPoolConfig();// 最大空闲连接数, 默认8个jedisPoolConfig.setMaxIdle(maxIdle);// 最大连接数, 默认8个jedisPoolConfig.setMaxTotal(maxTotal);}if (cluster == null) {cluster = new JedisCluster(nodes, connTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);}// if (!jedis.isConnected()) {// jedis.connect();
// }String value = JSON.toJSONString(event);log.info("sink to redis:" + value);String[] arr = value.split(",");//保存if (arr.length < 3) {return;}for (String str : arr) {String[] tempArr = str.replaceAll("\"","").split(":");if (tempArr.length > 1) {System.out.println("key: " + tempArr[0] + ", value: " + tempArr[1]);cluster.sadd(tempArr[0], tempArr[1]);}}}@Overridepublic void close() {if (cluster != null) {try {cluster.close();} catch (Exception e) {logger.error(e.getMessage());}}}}
Flink Redis Sink相关推荐
- flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink
本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的. 1.单机部署Kafka 1.1 下载Kafka压缩包,解压 ...
- 【Flink】Flink 自定义 redis sink
1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...
- Flink的Sink_API_Demo (kafka sink 、redis sink 、es sink)
文章目录 pom文件说明 说明 必要的前提 下面的代码,启动后手动往topic打入数据,该程序读出来,逻辑是通过split分割(盖戳),然后select分流,取applestream写入kafka指定 ...
- 基于 Kafka + Flink + Redis 的电商大屏实时计算案
前言 阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中,肯定也不会仅仅计算一两个维 ...
- 基于Kafka+Flink+Redis的电商大屏实时计算案例
前言 一年一度的双11又要到了,阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中, ...
- 阿里技术:基于Kafka+Flink+Redis的电商大屏实时计算案例
作者:zhisheng cloud.tencent.com/developer/article/1558372 阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashbo ...
- flink redis connector(支持flink sql)
flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...
- Flink的sink实战之一:初探,2020-2021蚂蚁金服Java面试真题解析
关于<Flink的sink实战>系列文章 本文是<Flink的sink实战>的第一篇,旨在初步了解sink,通过对基本API和addSink方法的分析研究,为后续的编码实战打好 ...
- flink mysql sink_聊聊flink的sink
概述 flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中,flink 的sink和source的代码结 ...
最新文章
- nginx php7 fastcgi,[Mac php7 nginx]解决nginx FastCGI sent in stderr: “Primary script unknown”
- C#的Timer解析
- Java多线程(一)——多线程实现方法和生命周期
- thinkphp5项目--个人博客(八)
- 学生宿舍管理系统需求分析
- 实现微信小程序的分享转发功能(可以从分享页返回小程序首页)
- Keil V5.37.0.0 - 按 F12 无法跳转到定义
- 计算机网络工程用排线架,什么是网络配线架接法 简单学习网络配线架接法图解【详解】...
- c++语言程序中,要调用的函数必须在main()函数中定义,惠州学院C++考试复习题
- AjaxPro.2.dll基本使用
- procmon符号配置
- Web项目经理手册之项目经理需要铭记在心的话
- 用星坐标(Star Coordinates)表示高维数据
- 使用CloudCompare渲染好看的油麦菜点云的小方法
- 超高清视频体验-4K片源
- 电子设计竞赛前的点点滴滴
- 关于大学生工作日睡眠的调查
- Device eth0 does not seem————解决方法
- 大数据在营销中的应用
- 清华大学计算机科学与技术系分数线6,【清华大学分数线2017】2015-2016清华大学各省各专业录取分数线(6)...