本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的。

1、单机部署Kafka

1.1 下载Kafka压缩包,解压

下载链接:http://archive.apache.org/dist/kafka/

此处我下载的版本是kafka_2.12-3.2.0,这个版本需要将–bootstrap-server localhost:9092参数替代原先的–-zookeeper localhost:2181参数,否则执行命令会报错:

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized optionat joptsimple.OptionException.unrecognizedOption(OptionException.java:108)at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)at joptsimple.OptionParser.parse(OptionParser.java:396)at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)at kafka.admin.TopicCommand.main(TopicCommand.scala)w

1.2 启动内置Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

1.3 启动Kafka

bin/kafka-server-start.sh config/server.properties

1.4 常用Kafka指令

  • 创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic newTopic --partitions 1 --replication-factor 1
  • 删除topic
./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic newTopic
  • 查看所有的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 向topic中推送数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic newTopic
  • 消费topic中数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newTopic  --from-beginning

2、单机部署Redis

下载链接:http://download.redis.io/releases/redis-5.0.7.tar.gz

这个文章很多,推荐参考:https://www.runoob.com/redis/redis-install.html

3、项目实现

3.1 pom.xml配置

<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><!-- 连接kafka依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.12.2</version></dependency><!--连接redis依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.2</version></dependency><!-- 如果将程序作为 Maven 项目开发,则必须添加 flink-clients 模块的依赖 必须要有 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.2</version></dependency><!-- 解决 Failed to load class "org.slf4j.impl.StaticLoggerBinder". --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency>
</dependencies>

3.2 Kafka Source

//从kafka读取数据
Properties properties = new Properties();
//指定kafka的Broker地址
properties.setProperty("bootstrap.servers", "localhost:9092");
//指定组ID
properties.setProperty("group.id", "summo-group");
//如果没有记录偏移量,第一次从最开始消费
properties.setProperty("auto.offset.reset", "earliest");
//kafka的消费者不自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("newTopic", new SimpleStringSchema(), properties);

3.3 Redis Sink

Index.java

public class Index {/*** key*/private String key;/*** value*/private String value;public Index(String key, String value) {this.key = key;this.value = value;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getValue() {return value;}public void setValue(String value) {this.value = value;}@Overridepublic String toString() {return "Index{" +"key='" + key + ''' +", value=" + value +'}';}
}

MyReadSink.java

import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class MyReadSink extends RedisSink<Index> {public MyReadSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<Index> redisSinkMapper) {super(flinkJedisConfigBase, redisSinkMapper);}
}

MyReadMapper.java

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
public class MyReadMapper implements RedisMapper<Index> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET, "index_key");}@Overridepublic String getKeyFromData(Index index) {return index.getKey();}@Overridepublic String getValueFromData(Index index) {return index.getValue();}
}

3.4 启动入口

public class Test {private static final Logger LOG = LoggerFactory.getLogger(Test.class);public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//从kafka读取数据Properties properties = new Properties();//指定kafka的Broker地址properties.setProperty("bootstrap.servers", "localhost:9092");//指定组IDproperties.setProperty("group.id", "summo-group");//如果没有记录偏移量,第一次从最开始消费properties.setProperty("auto.offset.reset", "earliest");//kafka的消费者不自动提交偏移量properties.setProperty("enable.auto.commit", "false");//kafkaSourceFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("newTopic", new SimpleStringSchema(), properties);SingleOutputStreamOperator<String> lines = executionEnvironment.addSource(kafkaSource).name("kafka source").setParallelism(1);DataStream<Index> indexDataStream = lines.map(line -> {String[] fields = line.split(":");try {LOG.info("当前输入的key:[{}];当前输入的value:[{}]", fields[0], fields[1]);return new Index(fields[0], fields[1]);} catch (Exception e) {LOG.error("输入的数据格式不规范");}return null;}).filter(index -> null != index).name("transfer node").setParallelism(1);//定义jedis连接配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();//设置sinkindexDataStream.addSink(new MyReadSink(config, new MyReadMapper())).name("redis sink");//执行executionEnvironment.execute("kafka消费的数据存入redis");}
}

3.5 演示效果

Kafka窗口

控制台console

Redis窗口

flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink相关推荐

  1. Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错

    kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287 首先在kafka中创建一个topic,名称 ...

  2. Flink教程(04)- Flink入门案例

    文章目录 01 引言 02 开发前准备 2.1 API 2.2 编程模型 03 入门案例 3.1 项目搭建 3.2 代码实现 3.2.1 基于DataSet 3.2.2 基于DataStream 3. ...

  3. flink入门_Flink入门:读取Kafka实时数据流,实现WordCount

    本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上.通过本文你可以了解如何编写和运行Flink程序. 代码拆解 首先要设置Flink的执行环境: ...

  4. 轻松入门进阶Flink第十课 Flink 面试

    第39讲:Flink 面试-基础篇 到目前为止,关于 Flink 的学习我们就告一段落了,接下来我们将进入最后一个面试模块的学习.在当前大背景下,面试这一关是求职者必须要面对的,也能从侧面考察对 Fl ...

  5. 第一天:什么是Flink、WordCount入门、Flink安装、并行度

    1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...

  6. Flink 入门教程

    大数据处理的应用场景 大数据是近些年才出现的吗,人们是近些年才发现大数据的利用价值的吗?其实不然,早在几十年前,数学分析就已经涉猎金融行业了,人们依托于金融和数学知识来建立数学模型,利用金融市场所产的 ...

  7. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  8. 轻松入门进阶Flink第一课 Flink基础

    开篇词:实时计算领域最锋利的武器 Flink 你好,欢迎来到 Flink 专栏,我是王知无,目前在某一线互联网公司从事数据平台架构和研发工作多年,算是整个大数据开发领域的老兵了. 我最早从 Relea ...

  9. flink入门_阿里巴巴为何选择Flink?20年大佬分11章讲解Flink从入门到实践!

    前言 Apache Flink 是德国柏林工业大学的几个博士生和研究生从学校开始做起来的项目,之前叫做 Stratosphere.他们在2014 年开源了这个项目,起名为 Flink. Apache ...

最新文章

  1. web.xml 配置中classpath: 与classpath*:的区别
  2. CYPRESS USB芯片win10驱动
  3. so 问题来了,你现在值多少钱?
  4. Java锁详解之改进读写锁StampedLock
  5. 透视大数据,未来市场谁主沉浮?这个4月,3W企服大数据OpenForm等你报名!
  6. 深度学习图片分类CNN模板
  7. phpmyadmin管理mysql_用phpMyAdmin管理MySQL数据库_MySQL
  8. 凯利公式计算器安卓_华为MatePad Pro 5G评测:一屏双任务打破安卓平板生态限制...
  9. mysql +cobar_Cobar源码解析(一)
  10. 9月TIOBE编程语言排行榜出炉,这个语言或成最大赢家!
  11. 债券收益率预测模型_基于时间序列模型的可转换债券收益率的实证研究
  12. 醉林疯的PTA 7-2 换硬币 (20分)
  13. for(int i:nums){.....}的含义
  14. 翻译:swift 5 iOS Accessibility从入门到精通
  15. Unity 知识点 - 3D游戏 - 视角跟随和键盘移动
  16. Netty面试题和答案
  17. 解决enter键Typora不能单换行的问题
  18. android 触摸屏干扰,如何解决电容触摸屏的抗干扰问题?
  19. 2021年创业项目:知识付费副业做网课赚钱
  20. 零基础学习C语言如何入门(内附工具书推荐+视频教程)

热门文章

  1. 华为机考108题(c++)(41-51)
  2. 在Linux下制作工资表(转)
  3. Wiley开放科学大使访谈——刘永鑫
  4. 2022-2028全球与中国电穿孔缓冲液市场现状及未来发展趋势
  5. 前端开发:Vue报错Computed property “show“ was assigned to but it has no setter的解决方法
  6. 1 0.99999的悖论_宇宙年龄只有138亿年,宽度却有930亿光年,这是悖论吗?
  7. 使用POI读取EXCEL大文件时,在解析数据的过程中对数据完成处理转换
  8. Ubuntu 22.04安装搜狗输入法
  9. c 语言的产生及发展过程,在意识的产生和发展过程中,起决定性作用的是( ) A.人脑B.语言C.物质D.劳动 - 赏学吧...
  10. Linux系统简介(简单粗暴)