一、flink分析结果写入redis

1、下载flink-hadoop整合包,放入所有节点

2、KafkaToRedisWordCount

package cn._51doit.flink.day08;import cn._51doit.flink.day02.RedisSinkDemo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;import java.util.Properties;/*** 当前的程序能不能容错(保证数据的一致性)* 当前程序如果可以保证数据的一致性,是使用ExactlyOnce还是AtLeastOnce,使用的是AtLeastOnce* KafkaSource:可以记录偏移量,可以将偏移量保存到状态中(OperatorState)* keyBy后调用sum:sum有状态(ValueState)* RedisSink:使用HSET方法可以将数据覆盖(幂等性)*/
public class KafkaToRedisWordCount {//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn //--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021public static void main(String[] args) throws Exception{//System.setProperty("HADOOP_USER_NAME", "root");ParameterTool parameterTool = ParameterTool.fromArgs(args);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//可以间内存中的状态持久化到StateBackendenv.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));//设置状态存储的后端env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));//如果你手动canceljob后,不删除job的checkpoint数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置Kafka相关参数Properties properties = new Properties();//设置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读properties.setProperty("auto.offset.reset", "earliest");//设置消费者组IDproperties.setProperty("group.id", parameterTool.get("groupId"));//开启checkpoint,不然让flink的消费(source对他的subtask)自动提交偏移量properties.setProperty("enable.auto.commit", "false");//创建FlinkKafkaConsumer并传入相关参数FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(parameterTool.getRequired("topic"), //要读取数据的Topic名称new SimpleStringSchema(), //读取文件的反序列化Schemaproperties //传入Kafka的参数);//设置在checkpoint是不将偏移量保存到kafka特殊的topic中,可设可不设//kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);//将聚合后的结果写入到Redis中//调用Sink//summed.addSink()FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(parameterTool.getRequired("redisHost")).setPassword(parameterTool.getRequired("redisPwd")).setDatabase(9).build();summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));env.execute();}private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}}

备注:若redis挂了,flink继续写入数据,redis恢复,错过数据依旧写进来,因为;
取消flink, 不删除偏移量数据,重启后指定上次checkpoint,还能继续计算, 上面的案例就使用的这种方式或者使用savePoint,取消时手动保存。


二、从kafka读取数据,处理后写回kafka

package cn._51doit.flink.day09;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** 从Kafka中读取数据,并且将数据处理后在写回到Kafka* 要求:保证数据的一致性* ExactlyOnce(Source可以记录偏移量【重放】,如果出现异常,的偏移量不更新),Sink要求支持事务* 开启Checkpointping,Source的偏移量保存到状态中(OperatorState),然后将处理的数据也保存状态中*/
public class KafkaToKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//开启checkpointingenv.enableCheckpointing(30000);env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));//设置Kafka相关参数Properties properties = new Properties();//设置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读properties.setProperty("auto.offset.reset", "earliest");//设置消费者组IDproperties.setProperty("group.id", "g1");//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量properties.setProperty("enable.auto.commit", "false");//创建FlinkKafkaConsumer并传入相关参数FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("doit2021", //要读取数据的Topic名称new SimpleStringSchema(), //读取文件的反序列化Schemaproperties //传入Kafka的参数);//使用addSource添加kafkaConsumerkafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中DataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));//使用的是AtLeastOnce
//        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
//                "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema()
//        );//写入Kafka的topicString topic = "out2021";//设置Kafka相关参数properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");//创建FlinkKafkaProducerFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(topic, //指定topicnew KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schemaproperties, //指定Kafka的相关参数FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义);filtered.addSink(kafkaProducer);env.execute();}
}

2、定义KafkaStringSerializationSchema

package cn._51doit.flink.day09;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.nio.charset.Charset;/*** 自定义String类型数据Kafka的序列化Schema*/
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {private String topic;private String charset;//构造方法传入要写入的topic和字符集,默认使用UTF-8public KafkaStringSerializationSchema(String topic) {this(topic, "UTF-8");}public KafkaStringSerializationSchema(String topic, String charset) {this.topic = topic;this.charset = charset;}//调用该方法将数据进行序列化@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {//将数据转成bytes数组byte[] bytes = element.getBytes(Charset.forName(charset));//返回ProducerRecordreturn new ProducerRecord<>(topic, bytes);}
}

大数据之flink数据一致性相关推荐

  1. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  2. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  3. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  4. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

  5. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  6. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

  7. 【硬刚大数据】Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    欢迎关注博客主页:https://blog.csdn.net/u013411339 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发于 CSDN博客! 本文首发CSDN论坛,未经过官 ...

  8. 大数据之flink共享资源槽

    算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行.这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加 ...

  9. 大数据之Flink的看了就可入门

    Flink介绍 介绍 原理 简单使用 初步编程 介绍 1 什么是Flink Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算. Flink起源于Stratosp ...

最新文章

  1. 编程模式 之美 -- 抽象工厂模式
  2. Mysql常用词汇_mysql 数据库常用单词
  3. 5、Makefile基础知识汇总(转自陈皓总述)
  4. java 端口8161_ActiveMQ_Windows和Linux版本的安装部署
  5. 鸿蒙开发-从搭建todolist待办事项来学习组件与js之间的交互
  6. app能不能跳转外部h5_轻羽微信小程序和H5的区别在哪里?主要有三点
  7. Spring MVC学习总结(11)——Spring MVC集成Swagger跨域问题
  8. SAP ERP 安全管理平台系统
  9. asp.net简单的投票系统代码 转载牛腩兄弟的
  10. 自己设计个动态屏保吧
  11. 数据分析之 假设检验
  12. 服务器挂载本地iso文件系统,CentOS系统中挂载光盘镜像ISO文件的教程
  13. 【技术贴】关于IE主页被篡改、假IE的根治方法。。。
  14. 微信群疯传!助力抢票不靠谱!还有风险…
  15. oracle常见语句(转载)
  16. ClickHouse快速安装-可视化工具连接-创建第一个ck库表(一)
  17. dsolve 的 用法
  18. 融合软泥怪 (优先队列)
  19. #Linux#进程间通信# 管道(pipe)-匿名管道pipe
  20. MySQL GIS功能介绍

热门文章

  1. zip压缩到指定目录
  2. 徒步旅行中的注意事项
  3. 五一出游-徒步旅行主旋律
  4. Unity URP 2020 设置DOTS
  5. C++版和MATLAB版调用摄像头显示画面
  6. 1.SpringBoot整合Mybatis(CRUD的实现)
  7. 基金 、 社保和QFII等机构的重仓股排名评测
  8. 云账户合法吗_云账户
  9. 浪潮云ERP到底属于什么水平?
  10. c语言书面作业,巢湖学院2011级网络工程1,2班C语言书面作业(江家宝)版