flink kafka实现反序列化:

package Flink_Kafka;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.io.IOException;
import java.util.Properties;//kafka的反序列化
public class KafkaSimpleSche {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties consumerProperties = new Properties();//设置服务consumerProperties.setProperty("bootstrap.servers","s101:9092");//设置消费者组consumerProperties.setProperty("group.id","con56");//自动提交偏移量consumerProperties.setProperty("enable.auto.commit","true");consumerProperties.setProperty("auto.commit.interval.ms","2000");//SimpleStringSchema()这是系统提供了一个kafka反序列化,我们也可以自定义一个反序列化类
//        DataStream<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<>("browse_topic",new SimpleStringSchema(),consumerProperties));DataStreamSource<BrowseLog> browse_topic = env.addSource(new FlinkKafkaConsumer010<>("browse_topic", new BrowseLogDeserializationSchema(), consumerProperties));browse_topic.print();env.execute("FlinkKafkaConsumer");}//自定义一个kafka的反序列化类public static class BrowseLogDeserializationSchema implements DeserializationSchema<BrowseLog> {@Overridepublic BrowseLog deserialize(byte[] bytes) throws IOException {return JSON.parseObject(new String(bytes),BrowseLog.class);}@Overridepublic boolean isEndOfStream(BrowseLog browseLogDeserializationSchema) {return false;}@Overridepublic TypeInformation<BrowseLog> getProducedType() {return TypeInformation.of(BrowseLog.class);}}public static class BrowseLog{private String userID;private String eventTime;private int productPrice;public BrowseLog(){}public BrowseLog(String userID, String eventTime, int productPrice) {this.userID = userID;this.eventTime = eventTime;this.productPrice = productPrice;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public int getProductPrice() {return productPrice;}public void setProductPrice(int productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "BrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", productPrice=" + productPrice +'}';}}
}

Flink的kafka写一个消费者consumer:

package Flink_Kafka;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.util.Properties;public class KafkaConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties consumerProperties = new Properties();//设置服务consumerProperties.setProperty("bootstrap.servers","s101:9092");//设置消费者组consumerProperties.setProperty("group.id","con56");//自动提交偏移量consumerProperties.setProperty("enable.auto.commit","true");consumerProperties.setProperty("auto.commit.interval.ms","2000");//SimpleStringSchema()这是系统提供了一个kafka反序列化,我们也可以自定义一个反序列化类FlinkKafkaConsumer010<String> consumer10 = new FlinkKafkaConsumer010<>("browse_topic",new SimpleStringSchema(),consumerProperties);
//            consumer10.setStartFromLatest();//获取数据consumer10.setStartFromEarliest();DataStreamSource<String> dataStreamSource = env.addSource(consumer10).setParallelism(6);DataStream<BrowseLog> process = dataStreamSource.process(new ProcessFunction<String, BrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<BrowseLog> collector) throws Exception {try{BrowseLog browseLog = JSON.parseObject(s,BrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch (Exception e){System.out.print("解析Json异常,异常信息是:"+e.getMessage());}}}).setParallelism(10);DataStream<String> map = process.map(new MapFunction<BrowseLog, String>() {@Overridepublic String map(BrowseLog browseLog) throws Exception {String jsonString=JSON.toJSONString(browseLog);return jsonString;}});Properties producerProperties = new Properties();producerProperties.setProperty("bootstrap.servers","s101:9092");map.addSink(new FlinkKafkaProducer010("topic_flinktest",new SimpleStringSchema(),producerProperties));env.execute("KafkaConsumer");}public static class BrowseLog{private String userID;private String eventTime;private int productPrice;public BrowseLog(){}public BrowseLog(String userID, String eventTime, int productPrice) {this.userID = userID;this.eventTime = eventTime;this.productPrice = productPrice;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public int getProductPrice() {return productPrice;}public void setProductPrice(int productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "BrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", productPrice=" + productPrice +'}';}}}

Kafka的容错性机制:

上面的程序当中,我们可以实现将数据写入到Kafka当中,但是不支持exactly-once语义,因此不能保证数据的绝对安全性,在分析Kafka Producer的容错性时,
需要根据Kafka的不同版本来进行分析。
1、Kafka 0.8版本
只能保证 at most once:因为只能保证最多一次,所以可能会造成数据丢失;
2、Kafka0.9版本or0.10版本
如果Flink开启了CheckPoint机制,默认可以保证at least once语义,即至少消费一次,但是发送的数据有可能造成重复,虽然可以保证at least once语义,
但是需要开启2个参数(不开也行,默认就是响应的作用):

myProducer.setLogFailuresOnly(false):默认该值就是false,表示当producer向kafka发送数据失败是,是否需要打印日志。
false:不打印日志,直接抛出异常,导致应用重启,从而实现at least once语义;
true:发送数据失败的时候,打印日志(数据丢失),不能实现at least once
myProducer.setFlushOnCheckpoint(true):默认值是true,true可以保证 at least once
true:当kafka写数据的时候,只有返回ack,Flink才会执行checkpoint
false:不需要等待返回ack,Flink就会执行checkpoint
因为myProduce.setLogFailuresOnly(false)这个参数的缘故,建议修改Kafka生产者的重试次数,retries这个参数默认是0,建议修改为3
3、kafka 0.11版本
在Kafka011版本当中,当Flink开启了Checkpoint机制,则针对FlinkKafkaProducer011就可以提供exactly-once语义
在具体使用的时候,可以选择具体语义,支持以下三项:
semantic.NONE:可能丢失重复
semantic.AT_LEAST_ONCE:(默认值),不会丢失,但可能重复
semantic.EXACTLY_ONCE:使用Kafka事物提供exactly-once语义

Flink的并行度和Kafka的partition的结合相关推荐

  1. 【Flink】flink并行度与kafka分区(partition)设置

    1.概述 默认: [Flink]FlinkConsumer是如何保证一个partition对应一个thread的 当分区与并行度不一样呢? 2.原理 采用取模运算:平衡 kafka partition ...

  2. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

  3. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  4. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  5. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  6. Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

    #################################################################################################### ...

  7. Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑

    一.需求 Flume向kafka发送数据时,同一个flume发送到kafka的数据总是固定在某一个partition中.而业务需求是发送的数据在所有的partition平均分布 二.实现 Flume的 ...

  8. centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  9. flink java 并行度_flink solt和并行度

    简介 Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群,flink on yarn都是要启动这两个角色.JobManager主要是负责接受客 ...

最新文章

  1. 华为自研编程语言「仓颉」火上热搜,已正式开启内测,成员辟谣:不是中文编程...
  2. syslog(),closelog()与openlog()--日志操作函数
  3. Fedora 30将获得Bash 5.0,淘汰Yum推迟到Fedora 31
  4. 第一次运行Spring Boot有感
  5. 论文理解 R-FCN:基于区域的全卷积网络来检测物体
  6. 每次创建maven都要重新设置set,如何将本地maven设置为默认的maven
  7. 如何自己找出SMBDA服务使用的端口号
  8. c++ STL 全排列
  9. Silverlight实例教程 - Validation数据验证基础属性和事件
  10. python列表去括号_python的常用序列
  11. python类中导入库_python导入库的具体方法
  12. 如何写好产品需求文档?
  13. matlab 3.BPF封装 巴特沃斯带通滤波器
  14. 邮箱服务申请数字证书
  15. 微信小程序 —— 成员管理及开发管理
  16. win10安装Visual Studio 2019失败
  17. CF106C Buns动态规划解决多重背包
  18. 【新能源】新能源之锂电池产业链梳理
  19. HTML中文字间距调整
  20. iphone主屏幕动态壁纸_iPhoneXLivePhoto动态壁纸

热门文章

  1. flask高级编程-循环引用
  2. 使用pdf.js在移动端预览pdf文档
  3. Linux下more命令C语言实现实践 (Unix-Linux编程实践教程)
  4. json序列化后日期如何变回来
  5. css3 menu 手机菜单3
  6. 临时表、表变量、CTE的比较
  7. android开发:input类型
  8. 无法装载文件或者汇编的AjaxControlToolkit
  9. 在Intellij idea 中YAML文件出现代码提示
  10. idea中配置Springboot热部署