Flink实现Kafka到Mysql的Exactly-Once

背景

最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,于是在flink官网中搜索此类场景的处理方式,发现官网也没有实现flink到mysql的Exactly-Once例子,但是官网却有类似的例子来解决端到端的仅一次消费问题。这个现成的例子就是FlinkKafkaProducer011这个类,它保证了通过FlinkKafkaProducer011发送到kafka的消息是Exactly-Once的,主要的实现方式就是继承了TwoPhaseCommitSinkFunction这个类,关于TwoPhaseCommitSinkFunction这个类的作用可以先看上一篇文章:https://blog.51cto.com/simplelife/2401411。

实现思想

这里简单说下这个类的作用就是实现这个类的方法:beginTransaction、preCommit、commit、abort,达到事件(preCommit)预提交的逻辑(当事件进行自己的逻辑处理后进行预提交,如果预提交成功之后才进行真正的(commit)提交,如果预提交失败则调用abort方法进行事件的回滚操作),结合flink的checkpoint机制,来保存topic中partition的offset。

达到的效果我举个例子来说明下:比如checkpoint每10s进行一次,此时用FlinkKafkaConsumer011实时消费kafka中的消息,消费并处理完消息后,进行一次预提交数据库的操作,如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中,如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,当然下一次的checkpoint也不会做了,checkpoint记录的还是上一次成功消费的offset,本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了,注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。等你将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。

具体实现代码三个类

StreamDemoKafka2Mysql.java

import org.apache.flink.runtime.state.filesystem.FsStateBackend;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;

import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**

* Created with IntelliJ IDEA.

* User: zzy

* Date: 2019/5/28

* Time: 8:40 PM

* To change this template use File | Settings | File Templates.

*

* 消费kafka消息,sink(自定义)到mysql中,保证kafka to mysql 的Exactly-Once

*/

@SuppressWarnings("all")

public class StreamDemoKafka2Mysql {

private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度

env.setParallelism(1);

//checkpoint的设置

//每隔10s进行启动一个检查点【设置checkpoint的周期】

env.enableCheckpointing(10000);

//设置模式为:exactly_one,仅一次语义

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//确保检查点之间有1s的时间间隔【checkpoint最小间隔】

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

//检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】

env.getCheckpointConfig().setCheckpointTimeout(10000);

//同一时间只允许进行一次检查点

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地

// env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));

//设置kafka消费参数

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzy:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group2");

//kafka分区自动发现周期

props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");

/*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/

FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic_ExactlyOnce, new JSONKeyValueDeserializationSchema(true), props);

//加入kafka数据源

DataStreamSource streamSource = env.addSource(kafkaConsumer011);

// System.out.println("streamSource:" + streamSource.print());

streamSource.print();

//数据传输到下游

streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");

//触发执行

env.execute(StreamDemoKafka2Mysql.class.getName());

}

}

MySqlTwoPhaseCommitSink.java

import org.apache.flink.api.common.ExecutionConfig;

import org.apache.flink.api.common.typeutils.TypeSerializer;

import org.apache.flink.api.common.typeutils.base.VoidSerializer;

import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.Timestamp;

import java.text.SimpleDateFormat;

import java.util.Date;

/**

* Created with IntelliJ IDEA.

* User: zzy

* Date: 2019/5/28

* Time: 8:47 PM

* To change this template use File | Settings | File Templates.

*

* 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交

*/

public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {

private static final Logger log = LoggerFactory.getLogger(MySqlTwoPhaseCommitSink.class);

public MySqlTwoPhaseCommitSink(){

super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);

}

/**

* 执行数据库入库操作 task初始化的时候调用

* @param connection

* @param objectNode

* @param context

* @throws Exception

*/

@Override

protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {

log.info("start invoke...");

String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

log.info("===>date:" + date + " " + objectNode);

log.info("===>date:{} --{}",date,objectNode);

String value = objectNode.get("value").toString();

log.info("objectNode-value:" + value);

JSONObject valueJson = JSONObject.parseObject(value);

String value_str = (String) valueJson.get("value");

String sql = "insert into `mysqlExactlyOnce_test` (`value`,`insert_time`) values (?,?)";

PreparedStatement ps = connection.prepareStatement(sql);

ps.setString(1,value_str);

Timestamp value_time = new Timestamp(System.currentTimeMillis());

ps.setTimestamp(2,value_time);

log.info("要插入的数据:{}--{}",value_str,value_time);

//执行insert语句

ps.execute();

//手动制造异常

if(Integer.parseInt(value_str) == 15) {

System.out.println(1 / 0);

}

}

/**

* 获取连接,开启手动提交事物(getConnection方法中)

* @return

* @throws Exception

*/

@Override

protected Connection beginTransaction() throws Exception {

log.info("start beginTransaction.......");

String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";

Connection connection = DBConnectUtil.getConnection(url, "root", "123456");

return connection;

}

/**

*预提交,这里预提交的逻辑在invoke方法中

* @param connection

* @throws Exception

*/

@Override

protected void preCommit(Connection connection) throws Exception {

log.info("start preCommit...");

}

/**

* 如果invoke方法执行正常,则提交事务

* @param connection

*/

@Override

protected void commit(Connection connection) {

log.info("start commit...");

DBConnectUtil.commit(connection);

}

/**

* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行

* @param connection

*/

@Override

protected void abort(Connection connection) {

log.info("start abort rollback...");

DBConnectUtil.rollback(connection);

}

}

DBConnectUtil.java

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Connection;

/**

* Created with IntelliJ IDEA.

* User: zzy

* Date: 2019/5/28

* Time: 8:58 PM

* To change this template use File | Settings | File Templates.

*/

public class DBConnectUtil {

private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);

/**

* 获取连接

*

* @param url

* @param user

* @param password

* @return

* @throws SQLException

*/

public static Connection getConnection(String url, String user, String password) throws SQLException {

Connection conn = null;

try {

Class.forName("com.mysql.jdbc.Driver");

} catch (ClassNotFoundException e) {

log.error("获取mysql.jdbc.Driver失败");

e.printStackTrace();

}

try {

conn = DriverManager.getConnection(url, user, password);

log.info("获取连接:{} 成功...",conn);

}catch (Exception e){

log.error("获取连接失败,url:" + url + ",user:" + user);

}

//设置手动提交

conn.setAutoCommit(false);

return conn;

}

/**

* 提交事物

*/

public static void commit(Connection conn) {

if (conn != null) {

try {

conn.commit();

} catch (SQLException e) {

log.error("提交事物失败,Connection:" + conn);

e.printStackTrace();

} finally {

close(conn);

}

}

}

/**

* 事物回滚

*

* @param conn

*/

public static void rollback(Connection conn) {

if (conn != null) {

try {

conn.rollback();

} catch (SQLException e) {

log.error("事物回滚失败,Connection:" + conn);

e.printStackTrace();

} finally {

close(conn);

}

}

}

/**

* 关闭连接

*

* @param conn

*/

public static void close(Connection conn) {

if (conn != null) {

try {

conn.close();

} catch (SQLException e) {

log.error("关闭连接失败,Connection:" + conn);

e.printStackTrace();

}

}

}

}

代码测试

为了方便发送消息,我用一个定时任务每秒发送一个数字,1~20,往kafka写日志的程序

public class KafkaUtils {

// private static final String broker_list = "localhost:9092";

private static final String broker_list = "zzy:9092";

//flink 读取kafka写入mysql exactly-once 的topic

private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";

public static void writeToKafka2() throws InterruptedException {

Properties props = new Properties();

props.put("bootstrap.servers", broker_list);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// KafkaProducer producer = new KafkaProducer(props);//老版本producer已废弃

Producer producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

try {

for (int i = 1; i <= 20; i++) {

MysqlExactlyOncePOJO mysqlExactlyOnce = new MysqlExactlyOncePOJO(String.valueOf(i));

ProducerRecord record = new ProducerRecord(topic_ExactlyOnce, null, null, JSON.toJSONString(mysqlExactlyOnce));

producer.send(record);

System.out.println("发送数据: " + JSON.toJSONString(mysqlExactlyOnce));

Thread.sleep(1000);

}

}catch (Exception e){

}

producer.flush();

}

public static void main(String[] args) throws InterruptedException {

writeToKafka2();

}

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public class MysqlExactlyOncePOJO {

private String value;

}

在发送到数字15之前,应该是做过一次checkpoint了,并且快要到第二次checkpoint的时间,第一次checkpoint的消费数据成功将插入数据库中,在消费到数字15的时候,手动造一个异常,此时数据库中应该只有第一次checkpoint后commit的数据,第二次checkpoint的数据并不会插入到数据库中(因为预提交已经失败,不会进行真正的提交),我实验的日志信息:

19/06/01 14:52:07 INFO TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Setting restore state in the FlinkKafkaConsumer: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}

19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = latest

bootstrap.servers = [zzy:9092]

check.crcs = true

client.id =

connections.max.idle.ms = 540000

enable.auto.commit = true

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = flink-consumer-group2

heartbeat.interval.ms = 3000

interceptor.classes = null

internal.leave.group.on.close = true

isolation.level = read_uncommitted

key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 500

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 305000

retry.backoff.ms = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

session.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.

19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0

19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f

19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}

19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = latest

bootstrap.servers = [zzy:9092]

check.crcs = true

client.id =

connections.max.idle.ms = 540000

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = flink-consumer-group2

heartbeat.interval.ms = 3000

interceptor.classes = null

internal.leave.group.on.close = true

isolation.level = read_uncommitted

key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 500

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 305000

retry.backoff.ms = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

session.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.

19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0

19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f

{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"12"}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:12--2019-06-01 14:52:07.616

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"13"}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:13--2019-06-01 14:52:07.617

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"14"}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:14--2019-06-01 14:52:07.618

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"15"}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:15--2019-06-01 14:52:07.619

{"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}

{"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}

{"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start abort rollback...

19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.

java.lang.ArithmeticException: / by zero

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:68)

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:30)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)

at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)

at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)

at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:675)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:667)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)

at java.lang.Thread.run(Thread.java:748)

19/06/01 14:52:07 INFO Task: Freeing task resources for Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903).

19/06/01 14:52:07 INFO Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) [FAILED]

19/06/01 14:52:07 INFO TaskExecutor: Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) c284f48cd0b113da4f68fd835e643903.

19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.

java.lang.ArithmeticException: / by zero

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:68)

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:30)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)

at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)

at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)

at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:675)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:667)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)

at java.lang.Thread.run(Thread.java:748)

19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RUNNING to FAILING.

...

19/06/01 14:52:07 INFO TaskExecutor: Discarding the results produced by task execution c284f48cd0b113da4f68fd835e643903.

19/06/01 14:52:07 INFO ExecutionGraph: Try to restart or fail the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) if no longer possible.

19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state FAILING to RESTARTING.

19/06/01 14:52:07 INFO ExecutionGraph: Restarting the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89).

19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RESTARTING to CREATED.

19/06/01 14:52:07 INFO CheckpointCoordinator: Restoring job a7188181ec45ab397d21bb1f928c7b89 from latest valid checkpoint: Checkpoint 3 @ 1559371921807 for a7188181ec45ab397d21bb1f928c7b89.

19/06/01 14:52:07 INFO CheckpointCoordinator: No master state to restore

19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state CREATED to RUNNING.

19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to SCHEDULED.

19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from SCHEDULED to DEPLOYING.

19/06/01 14:52:07 INFO ExecutionGraph: Deploying Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (attempt #33) to localhost

19/06/01 14:52:07 INFO TaskExecutor: Received task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1).

19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to DEPLOYING.

19/06/01 14:52:07 INFO Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING]

19/06/01 14:52:07 INFO Task: Loading JAR files for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].

19/06/01 14:52:07 INFO Task: Registering task at network: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].

19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.

19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.

19/06/01 14:52:07 INFO StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)

19/06/01 14:52:07 INFO TwoPhaseCommitSinkFunction: MySqlTwoPhaseCommitSink 0/1 - restoring state

19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start commit...

19/06/01 14:52:07 ERROR DBConnectUtil: 提交事物失败,Connection:com.mysql.jdbc.JDBC4Connection@69ae3a8c

java.sql.SQLException: Unexpected exception encountered during query.

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)

at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2523)

at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1547)

at com.zzy.bigdata.flink.streaming.DBConnectUtil.commit(DBConnectUtil.java:56)

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:103)

at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:30)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommit(TwoPhaseCommitSinkFunction.java:200)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:395)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:353)

at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)

at java.lang.Thread.run(Thread.java:748)

通过日志发现成功入库的日志是1-11,消费到数字15的时候,提交失败,日志最后一行发生了回滚,关闭了连接,然后进行conmit的时候也失败了,消费的数据12-15不会插入到数据库中,此时checkpoint也不会做了,checkpoint保存的还是上一次成功消费后的offset数据。

数据库表:mysqlExactlyOnce_test

CREATE TABLE `mysqlExactlyOnce_test` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`value` varchar(255) DEFAULT NULL,

`insert_time` datetime DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

表中的数据

kafka to mysql_Flink实现Kafka到Mysql的Exactly-Once相关推荐

  1. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  2. mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)

    目录 Canal安装部署 1.1. 服务器准备 1.2. 设置主机名并配置hosts 1.3. 免密设置 1.4. 设置ntp时间 1.5. 关闭防火墙 1.6. 关闭selinux 1.7. 安装J ...

  3. kafka结合mysql_logstash集成kafka,mysql实现数据采集

    logstash是一个非常灵活好用的数据采集框架工具,可以通过简单的配置满足绝大多数数据采集场景的需求. 采集数据一个非常典型的场景就是将数据先放到kafka队列里削峰,然后从kafka队列里读取数据 ...

  4. 【kafka】服务器上Kafka启动 Cannot allocate memory

    1.概述 转载:服务器上Kafka启动报错:error='Cannot allocate memory' (errno=12) 解决问题思路:大问题拆小问题.从源头(Kafka有无启动成功)开始测试, ...

  5. Kafka快速入门(Kafka消费者)

    Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer ...

  6. Kafka(二)Kafka Connector与Debezium

    Kafka Connector与Debezium 1.介绍 kafka connector 是连接kafka集群和其他数据库.集群等系统的连接器.kafka connector可以进行多种系统类型与k ...

  7. kafka系列三、Kafka三款监控工具比较

    转载原文:http://top.jobbole.com/31084/ 通过研究,发现主流的三种kafka监控程序分别为: Kafka Web Conslole Kafka Manager KafkaO ...

  8. 学习笔记Kafka(一)—— Kafka简介

    一.什么是消息系统 消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它. 分布式消息传递基于可靠消息队列的概念. 消息在客户端应用程序和消息传递系统之 ...

  9. java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

    Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...

最新文章

  1. 网络高可用性之流量镜像和端口流量控制
  2. Redis存储总是心里没底?你大概漏了这些数据结构原理
  3. 游张孝祥老师博客(blog)有感
  4. 交流电的有效值rms值_交流电路中的电源
  5. knime如何连接mysql_knime怎么连接MySQL?
  6. 用shell或者python写出各种图形
  7. 助你解决新手开车四大问题 为您支招
  8. java门槛_Java的入行门槛高吗?对学历有限制吗?
  9. C语言程序设计现代方法1,2,3章
  10. 东财mysql数据库系统及应用_2020秋东财《MySQL数据库系统及应用》单元作业一
  11. 计算机组成原理————P1 软硬件发展
  12. UltraISO 软碟通制作 Windows 7 系统 U 盘启动盘
  13. PHP 之建行龙支付 - 退款
  14. 关于新版微信电脑版HOOK的技术经验(WX电脑版3.0)
  15. 有一种英语口音,不仅一般人儿听不懂,连各种语音识别都全军覆没…(新版)...
  16. Centos自动登录
  17. 解决移动端滑动方向相反
  18. 一张图看懂MTK 芯片命名规则
  19. 关于NX/UG使用KF二次开发的常用方法
  20. 彻底删除微信聊天记录,手机中的小秘密不再泄露!

热门文章

  1. linux找数组规矩,linux shell 数组建立及使用技巧(示例代码)
  2. 离群点检测方法_离群点+高杠杆点+强影响点,这些都是啥意思?
  3. c语言到底写什么系统,用C语言写关于操作系统的一个问题。
  4. VGG Pool5 Feature Map特征提取
  5. ajax 请求成功 再执行javascript,jquery中ajax请求后台数据成功后既不执行success也不执行error的完美解决方法...
  6. Java 设计模式之 State 状态模式
  7. Swagger中paramType
  8. python星号*在函数中、传参时的含义
  9. python使用zip迭代列表
  10. php 合并 js css,PHP实现合并多个JS和CSS文件示例