文章目录

  • 官方API
  • 自定义Redis Sink

官方API

flink提供了专门操作redis的Redis Sink

依赖

 <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:

场景 备注
FlinkJedisPoolConfig 单Redis服务器 适用于本地、测试场景
FlinkJedisClusterConfig Redis集群
FlinkJedisSentinelConfig Redis哨兵

使用

Redis Sink 核心类是 RedisMappe 是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法

RedisMapper

public interface RedisMapper<T> extends Function, Serializable {/*** 设置使用的redis数据结构类型,和key的名词* 通过RedisCommand设置数据结构类型* Returns descriptor which defines data type.** @return data type descriptor*/RedisCommandDescription getCommandDescription();/*** 设置value中的键值对 key的值* Extracts key from data.** @param data source data* @return key*/String getKeyFromData(T data);/*** 设置value中的键值对 value的值* Extracts value from data.** @param data source data* @return value*/String getValueFromData(T data);
}

RedisCommand

使用RedisCommand设置数据结构类型时和redis结构对应关系。

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZREM

Demo

package com.yljphp.demo.sinkimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.StringUtilsobject RedisBasicApp extends App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val text: DataStream[String] = env.socketTextStream("localhost", 9000)val data: DataStream[(String, String)] = text.filter(!StringUtils.isNullOrWhitespaceOnly(_)).flatMap(_.split(",")).map(("bin_test", _)).map(x=>{println(x)x})val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()val redisSink = new RedisSink[(String,String)](config,new MyRedisMapper)data.addSink(redisSink)env.execute("sink_demo")
}class MyRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.SADD, "bin_test")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

nc -lk 9000输入111,222,333,如下插入成功了。

127.0.0.1:6379> MONITOR
OK
1603866672.023727 [0 127.0.0.1:63764] "ECHO" "Test"
1603866685.772580 [0 127.0.0.1:63769] "SADD" "bin_test" "111"
1603866685.779417 [0 127.0.0.1:63769] "SADD" "bin_test" "222"
1603866685.779589 [0 127.0.0.1:63769] "SADD" "bin_test" "333"~/software/redis-6.0.8/src ./redis-cli
127.0.0.1:6379> smembers bin_test
1) "111"
2) "222"
3) "333"

使用 Redis Cluster:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());

使用 Redis Sentinel:

FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build();DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());

参考:Flink Redis Connector

自定义Redis Sink

Flink提供的Redis Sink API操作比较简单不过限制比较多,很多redis的操作不支持,可以用自定义Redis Sink,也比较简单。

依赖
依赖只需要引入jedis依赖即可。

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version>
</dependency>

Redis Sink类

package sinktoredis;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;/*** TODO 自定义 Redis Sink** @author liuzebiao* @Date 2020-2-18 10:26*/
public class MyRedisSink extends RichSinkFunction<Tuple3<String, String, Long>> {private transient Jedis jedis;@Overridepublic void open(Configuration config) {//        String host = parameters.getRequired("redis.host");String host = "127.0.0.1";
//        String password = parameters.get("redis.password", "");Integer port = 6379;Integer timeout = 5000;jedis = new Jedis(host, port, timeout);}@Overridepublic void invoke(Tuple3<String, String, Long> value, Context context) throws Exception {if (!jedis.isConnected()) {jedis.connect();}//保存jedis.hset(value.f0, value.f1, String.valueOf(value.f2));}@Overridepublic void close() throws Exception {jedis.close();}
}

Test测试类

package sinktoredis;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test {public static void main(String[] args) throws Exception {// step1:获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// step2:read data// math,zhangsan,88// math,lisi,60// math,wangwu,100DataStreamSource<String> data = env.socketTextStream("localhost", 6666);// step3:transformdata.filter(t -> t.split(",").length > 2).map(new MapFunction<String, Tuple3<String, String, Long>>() {@Overridepublic Tuple3<String, String, Long> map(String value) throws Exception {return new Tuple3<String, String, Long>(value.split(",")[0],value.split(",")[1],Long.parseLong(value.split(",")[2]));}}).addSink(new MyRedisSink());env.execute("Test");}
}

Cluster版本


import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;/*** @Author: zhangbin19* @Date: 2020/10/27 4:03 PM* @Usage:*/
public class Redis extends BaseOutputstream {private static Logger logger = LoggerFactory.getLogger(Redis.class);private static final long serialVersionUID = 1L;//    private transient Jedis jedis;private transient JedisCluster cluster;private transient JedisPoolConfig jedisPoolConfig;private Map config;
//    private String host;Set<HostAndPort> nodes = new HashSet<>();
//    private Integer port;private Integer connTimeout;private Integer soTimeout;private Integer maxAttempts;private String password;private Integer maxIdle;private Integer maxTotal;@Overridepublic void init(Map config) throws Exception {log.info(Redis.class.getCanonicalName() + " init config:" + config);System.out.println("===============redis init begin============");this.config = config;if (config == null) {throw new Exception("init Hbase config is null");}if (!config.containsKey("host")) {throw new Exception("redis config host is not exist");}
//        if (!config.containsKey("port")) {//            throw new Exception("redis config port is not exist");
//        }if (!config.containsKey("connTimeout")) {throw new Exception("redis config connTimeout is not exist");}String hostStr = String.valueOf(config.get("host"));String[] hostArr = hostStr.split(",");if (hostArr == null || hostArr.length < 1) {throw new Exception("redis config host is not exist");}for (String host : hostArr) {String[] arr = host.split(":");if (arr == null || arr.length < 2) {throw new Exception("redis config host is error");}this.nodes.add(new HostAndPort(arr[0],Integer.parseInt(arr[1])));}
//        this.port = Integer.parseInt(String.valueOf(config.get("port")));this.connTimeout = Integer.parseInt(String.valueOf(config.getOrDefault("connTimeout", "5000")));this.soTimeout = Integer.parseInt(String.valueOf(config.getOrDefault("soTimeout", "3000")));this.maxAttempts = Integer.parseInt(String.valueOf(config.getOrDefault("maxAttempts", "3")));this.password = String.valueOf(config.getOrDefault("password",""));this.jedisPoolConfig = new JedisPoolConfig();this.maxIdle = (Integer) config.getOrDefault("maxIdle", "8");this.maxTotal = (Integer) config.getOrDefault("maxTotal", "8");// 最大空闲连接数, 默认8个jedisPoolConfig.setMaxIdle(maxIdle);// 最大连接数, 默认8个jedisPoolConfig.setMaxTotal(maxTotal);
//        jedis = new Jedis(host, port, timeout);cluster = new JedisCluster(nodes, connTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);if (cluster == null) {System.out.println("jedis is null!!! in init");}System.out.println("===============redis init end============");}@Overridepublic void excute(Map event) throws IOException {if (jedisPoolConfig == null) {jedisPoolConfig = new JedisPoolConfig();// 最大空闲连接数, 默认8个jedisPoolConfig.setMaxIdle(maxIdle);// 最大连接数, 默认8个jedisPoolConfig.setMaxTotal(maxTotal);}if (cluster == null) {cluster = new JedisCluster(nodes, connTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);}//        if (!jedis.isConnected()) {//            jedis.connect();
//        }String value = JSON.toJSONString(event);log.info("sink to redis:" + value);String[] arr = value.split(",");//保存if (arr.length < 3) {return;}for (String str : arr) {String[] tempArr = str.replaceAll("\"","").split(":");if (tempArr.length > 1) {System.out.println("key: " + tempArr[0] + ", value: " + tempArr[1]);cluster.sadd(tempArr[0], tempArr[1]);}}}@Overridepublic void close() {if (cluster != null) {try {cluster.close();} catch (Exception e) {logger.error(e.getMessage());}}}}

Flink Redis Sink相关推荐

  1. flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink

    本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的. 1.单机部署Kafka 1.1 下载Kafka压缩包,解压 ...

  2. 【Flink】Flink 自定义 redis sink

    1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...

  3. Flink的Sink_API_Demo (kafka sink 、redis sink 、es sink)

    文章目录 pom文件说明 说明 必要的前提 下面的代码,启动后手动往topic打入数据,该程序读出来,逻辑是通过split分割(盖戳),然后select分流,取applestream写入kafka指定 ...

  4. 基于 Kafka + Flink + Redis 的电商大屏实时计算案

    前言 阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中,肯定也不会仅仅计算一两个维 ...

  5. 基于Kafka+Flink+Redis的电商大屏实时计算案例

    前言 一年一度的双11又要到了,阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中, ...

  6. 阿里技术:基于Kafka+Flink+Redis的电商大屏实时计算案例

    作者:zhisheng cloud.tencent.com/developer/article/1558372 阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashbo ...

  7. flink redis connector(支持flink sql)

    flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...

  8. Flink的sink实战之一:初探,2020-2021蚂蚁金服Java面试真题解析

    关于<Flink的sink实战>系列文章 本文是<Flink的sink实战>的第一篇,旨在初步了解sink,通过对基本API和addSink方法的分析研究,为后续的编码实战打好 ...

  9. flink mysql sink_聊聊flink的sink

    概述 flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中,flink 的sink和source的代码结 ...

最新文章

  1. nginx php7 fastcgi,[Mac php7 nginx]解决nginx FastCGI sent in stderr: “Primary script unknown”
  2. C#的Timer解析
  3. Java多线程(一)——多线程实现方法和生命周期
  4. thinkphp5项目--个人博客(八)
  5. 学生宿舍管理系统需求分析
  6. 实现微信小程序的分享转发功能(可以从分享页返回小程序首页)
  7. Keil V5.37.0.0 - 按 F12 无法跳转到定义
  8. 计算机网络工程用排线架,什么是网络配线架接法 简单学习网络配线架接法图解【详解】...
  9. c++语言程序中,要调用的函数必须在main()函数中定义,惠州学院C++考试复习题
  10. AjaxPro.2.dll基本使用
  11. procmon符号配置
  12. Web项目经理手册之项目经理需要铭记在心的话
  13. 用星坐标(Star Coordinates)表示高维数据
  14. 使用CloudCompare渲染好看的油麦菜点云的小方法
  15. 超高清视频体验-4K片源
  16. 电子设计竞赛前的点点滴滴
  17. 关于大学生工作日睡眠的调查
  18. Device eth0 does not seem————解决方法
  19. 大数据在营销中的应用
  20. 清华大学计算机科学与技术系分数线6,【清华大学分数线2017】2015-2016清华大学各省各专业录取分数线(6)...

热门文章

  1. 100个问题搞定Java虚拟机
  2. 当Kotlin完美邂逅设计模式之单例模式(一)
  3. 使用mshta.exe绕过应用程序白名单(多种方法)
  4. Clickhouse物化视图
  5. 解决element-ui中el-menu组件作为vue-router模式在刷新页面后default-active属性与当前路由页面不一致问题的方法
  6. 读书笔记---晚熟的人(莫言)
  7. c语言产生系统年月日,c语言中如何输入年月日
  8. 自动驾驶蓄势待发,车企如何拥抱智能化
  9. 感受云计算,从弹性计算开始
  10. 计算机组成原理——硬布线控制器设计(2)