问题:

我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info :

checkpoint一直卡在那里

程序一直等待中:

原因:倒全量数据 chunlSplitter 花费了太长时间,这个在社区提问看有没有解决方案。

知识备份:

阿里云Flink CDC文档地址:

MySQL的CDC源表 - 实时计算Flink版 - 阿里云

cdc参数:

WITH参数

参数 说明 是否必填 数据类型 备注
connector 源表类型。 STRING 可以填写为mysql-cdc或者mysql,二者等价。
hostname MySQL数据库的IP地址或者Hostname。 STRING 无。
username MySQL数据库服务的用户名。 STRING 无。
password MySQL数据库服务的密码。 STRING 无。
database-name MySQL数据库名称。 STRING 数据库名称支持正则表达式以读取多个数据库的数据。
table-name MySQL表名。 STRING 表名支持正则表达式以读取多个表的数据。
port MySQL数据库服务的端口号。 INTEGER 默认值为3306。
server-id 数据库客户端的一个数字 ID。 STRING 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。

该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。

scan.incremental.snapshot.enabled 是否开启增量快照。 BOOLEAN 默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:

  • 读取全量数据时,Source可以是并行读取。
  • 读取全量数据时,Source支持chunk粒度的检查点。
  • 读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。

如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。

scan.incremental.snapshot.chunk.size 表的chunk的大小(行数)。 Integer 默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。
scan.snapshot.fetch.size 当读取表的全量数据时,每次最多拉取的记录数。 Integer 默认值为1024。
scan.startup.mode 消费数据时的启动模式。 STRING 参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。

server-time-zone 数据库在使用的会话时区。 STRING 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。
debezium.min.row.count.to.stream.results 当表的条数大于该值时,会使用分批读取模式。 INTEGER 默认值为1000。Flink采用以下方式读取MySQL源表数据:

  • 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。
  • 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。
connect.timeout 在尝试连接MySQL数据库服务器之后,连接器在超时之前应该等待的最大时间。 Duration 默认值为30秒。

版本:

Flink版本 1.13

Flink cdc版本 2.1.1

场景说明:

使用flink cdc stream api 读取mysql整库数据直接写入doris

大概100G数据量,大概几十个表,大表小表,字段多,字段少,单个字段类型复杂等等情况都包含了。

出现情况:

任务运行一段时间之后挂掉,出现问题:

2022-02-11 18:33:59,461 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:196', splitKeyType=[`id` INT NOT NULL], splitStart=[17079248], splitEnd=[17165910], highWatermark=null} to subtask 0
2022-02-11 18:33:59,976 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator receives finished split offsets FinishedSnapshotSplitsReportEvent{finishedOffsets={plateform_stable_copy.order_address:196={ts_sec=0, file=mysql-bin.006361, pos=441499143, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18305222, row=0, event=0}}} from subtask 0.
2022-02-11 18:33:59,977 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:197', splitKeyType=[`id` INT NOT NULL], splitStart=[17165910], splitEnd=[17252572], highWatermark=null} to subtask 0
2022-02-11 18:34:00,079 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 57 (type=CHECKPOINT) @ 1644575640072 for job 01f4e4416ccf488091611165e921b83b.
2022-02-11 18:34:00,760 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler      [] - FATAL: Thread 'SourceCoordinator-Source: dataSourceStream -> processStream' produced an uncaught exception. Stopping the process...
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2022-02-11 18:34:00,768 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,788 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-02-11 18:34:00,789 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-02-11 18:34:00,791 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RUNNING to RESTARTING.
2022-02-11 18:34:00,795 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:12287
2022-02-11 18:34:00,800 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,816 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-dd18d903-98a1-4ae6-8e07-cd5a7bae3801/flink-web-ui
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://prod-qd-ct7-cdh-data-node03:9490 lost leadership
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2022-02-11 18:34:00,828 INFO  org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,829 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping all currently running jobs of dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,830 INFO  org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_181]
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_181]
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) ~[?:1.8.0_181]
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) [flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-11.0.jar:2.6.0-cdh5.16.2-11.0]
2022-02-11 18:34:00,832 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Job 01f4e4416ccf488091611165e921b83b was not finished by JobManager.
2022-02-11 18:34:00,832 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Shutting down cluster because job not finished
2022-02-11 18:34:00,834 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/1) (d6710965feba37b327b8f6083c4f53cc) switched from RUNNING to FAILED on container_e05_1641803753156_0100_01_000002 @ prod-qd-ct7-cdh-data-node03 (dataPort=29600).
java.util.concurrent.ExecutionException: Boxed Error
        at scala.concurrent.impl.Promise$.resolver(Promise.scala:59) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
Caused by: java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
        at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.subtaskFailed(SourceCoordinator.java:182) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$subtaskFailed$1(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.subtaskFailed(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.subtaskFailed(OperatorCoordinatorHolder.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.notifyCoordinatorOfCancellation(DefaultScheduler.java:561) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelExecutionVertex(DefaultScheduler.java:317) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_181]
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_181]
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_181]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_181]
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_181]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelTasksAsync(DefaultScheduler.java:309) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:253) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:234) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:229) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:133) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        ... 9 more
2022-02-11 18:34:00,840 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job dev-SingleInstanceData2doris(01f4e4416ccf488091611165e921b83b).
2022-02-11 18:34:00,842 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: Scheduler is being stopped.
        at org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:604) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,845 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from RUNNING to CANCELING.
2022-02-11 18:34:00,848 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from CANCELING to CANCELED.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 1a3ac3e7fbadc81e49d53b75795c2c63.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2022-02-11 18:34:00,850 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.

问题分析:

看日志,监控等等,没有发现是代码报错,任务自动重启重试也是失败。

1,首先我尝试通过checkpoint恢复任务,发现恢复之后运行一段时间也是同样的错误
2,判断是不是某个表数据导致的问题,清空遇到问题的那个表数据,通过checkpoint恢复任务之后,运行一段时间还是出现同样的错误,而且发现一个规律就是大表会导致同样的问题,小表数据会被写入(cdc读取库是按表一个一个读取的)

3,那么解决问题可能就是资源问题,尝试加大flink 任务资源。(因为测试写入数据量100G,所以给的资源不大)

问题搜索:

在flink cdc社区搜索关键字:

This indicates that a fatal error has happened and caused the coordinator executor thread to exit  

地址:

https://github.com/ververica/flink-cdc-connectors/issues?q=This+indicates+that+a+fatal+error+has+happened+and+caused+the+coordinator+executor+thread+to+exit

遂加大jobManager和 taskManger内存(扩大4倍),目前任务正在运行中

问题2:
读取离线数据完成之后 yarn任务自己死掉了。

2022-02-14 12:34:53,207 INFO  com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.006360, pos=93487577, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18300905, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}} to subtask 0
2022-02-14 12:34:58,680 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/3) (e6807bd2ac2a982054dd3bb62006a462) switched from RUNNING to FAILED on container_e05_1641803753156_0111_01_000002 @ prod-qd-ct7-cdh-data-node02 (dataPort=1818).
java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.RuntimeException: SplitFetcher thread 6349 received unexpected exception while polling the records
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644813298581,db=,server_id=0,file=mysql-bin.006360,pos=93487577,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
        at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ... 1 more
2022-02-14 12:34:58,715 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,715 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,716 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 6 tasks should be restarted to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0. 
2022-02-14 12:34:58,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RUNNING to RESTARTING.
2022-02-14 12:34:58,719 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 1 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,719 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 2 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,726 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from CANCELING to CANCELED.
2022-02-14 12:34:58,727 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,365 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,367 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}]
2022-02-14 12:34:59,426 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,426 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2022-02-14 12:34:59,569 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,570 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 8082a3ead0f36284e54f4bf28b8a695e
2022-02-14 12:35:18,727 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RESTARTING to RUNNING.
2022-02-14 12:35:18,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 8082a3ead0f36284e54f4bf28b8a695e from Checkpoint 93 @ 1644813292042 for 8082a3ead0f36284e54f4bf28b8a695e located at hdfs://nameservice1/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/8082a3ead0f36284e54f4bf28b8a695e/chk-93.
2022-02-14 12:35:18,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore
2022-02-14 12:35:18,732 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 2 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (1/3) (9975f5c8e492cda22add5c1abd34ba64) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (2/3) (097d045473dfbc8e980daf2ed5095b96) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: dataSourceStream -> processStream (3/3) (bfeda1f01099ea1fe1656a6f431a56f7) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (1/3) (41433c56861b77cb145c5e2eabda66ce) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (2/3) (d88b153f6c0db78c8b461b768d90cb0f) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - sinkDoris (3/3) (eee0927845d4dcff02ecf8a7af2810f8) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 1 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.

分析:

可能是binlog文件被删除掉了 ,在随后的通过checkpoint savepoint恢复过程中也发现 binlog文件不存在,任务恢复失败。

问题 :

2022-02-14 19:14:24
com.alibaba.fastjson.JSONException: write javaBean error, fastjson version 1.2.47, class java.nio.HeapByteBuffer, fieldName : post_dept
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:465)
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:120)
    at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:270)
    at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:44)
    at com.alibaba.fastjson.serializer.ListSerializer.write(ListSerializer.java:137)
    at com.alibaba.fastjson.serializer.JSONSerializer.write(JSONSerializer.java:281)
    at com.alibaba.fastjson.JSON.toJSONString(JSON.java:673)
    at com.alibaba.fastjson.JSON.toJSONString(JSON.java:611)
    at com.alibaba.fastjson.JSON.toJSONString(JSON.java:576)
    at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:248)
    at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:225)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:434)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.alibaba.fastjson.util.FieldInfo.get(FieldInfo.java:484)
    at com.alibaba.fastjson.serializer.FieldSerializer.getPropertyValueDirect(FieldSerializer.java:140)
    at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:249)
    ... 25 more
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:506)
    at java.nio.HeapByteBuffer.getChar(HeapByteBuffer.java:259)
    ... 32 more

2022-02-15 11:53:09
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to [B
    at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getBytes(Struct.java:168)
    at com.sjb.cdc.customization.JsonSchemaCustomization.foreachStruct(JsonSchemaCustomization.java:147)
    at com.sjb.cdc.customization.JsonSchemaCustomization.createData(JsonSchemaCustomization.java:110)
    at com.sjb.cdc.customization.JsonSchemaCustomization.deserialize(JsonSchemaCustomization.java:89)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)

分析:

遇到了二进制字段  需要单独判断 ,二进制字段放入json报错

for (Field field : afterStruct.schema().fields()) {String fieldName = field.name();
  if ("bytes".equals(field.schema().type().getName())) {byte[] bytes = afterStruct.getBytes(fieldName);if (bytes != null) {String fieldValue = new String(bytes);dataJson.put(fieldName, fieldValue);}

}

问题:

cdc读取mysql 写入doris 字段映射处理问题,直接上测试代码吧

for (Field field : struct.schema().fields()) {String fieldName = field.name();String schemaName = field.schema().name();Object fieldValue = struct.get(field);if (pkNameList.contains(fieldName)) {partitionerNum += fieldValue.hashCode();dataJson.put("partitionerNum", partitionerNum);}//todo 这里是时间字段try {if ("int32".equals(field.schema().type().getName()) && Date.SCHEMA_NAME.equals(schemaName)) {if (fieldValue != null) {int day = (int) fieldValue;long second = day * 24 * 60 * 60L;String dateStr = LocalDateTime.ofEpochSecond(second, 0, ZoneOffset.ofHours(8)).format(dateFormatter);dataJson.put(fieldName, dateStr);}} else if ("int64".equals(field.schema().type().getName()) && Timestamp.SCHEMA_NAME.equals(schemaName)) {if (fieldValue != null) {long times = (long) fieldValue;String dateTime = LocalDateTime.ofEpochSecond(times / 1000 - 8 * 60 * 60,0,ZoneOffset.ofHours(8)).format(dateTimeFormatter);dataJson.put(fieldName, dateTime);}} else if ("string".equals(field.schema().type().getName()) && "io.debezium.time.ZonedTimestamp".equals(schemaName)) {String timestampValueStr = struct.getString(fieldName);if (fieldValue != null) {LocalDateTime localDateTime = LocalDateTime.parse(timestampValueStr,timestampFormatter);LocalDateTime rsTime = localDateTime.plusHours(8);String timestampValue = rsTime.format(dateTimeFormatter);dataJson.put(fieldName, timestampValue);}} else if ("bytes".equals(field.schema().type().getName()) && StringUtils.isEmpty(schemaName)) {byte[] bytes = struct.getBytes(fieldName);if (bytes != null) {String bytesValue = new String(bytes);dataJson.put(fieldName, bytesValue);}} else {if (fieldValue != null) {dataJson.put(fieldName, fieldValue);}}} catch (Exception ex) {String errorInfo = StringUtils.join(ex.getStackTrace(), "");logger.error("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo);DingProd.sendDingRabotProd("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo);}}

更新:

mysql  `timestmp_test2` timestamp(3)

这个代码转换会报错

所以要用 hutool 工具类去转换。

Hutool参考文档

问题 

mysql char字段类型 跟doris 大小不是一对一

mysql char类型 我们映射为doris varchar  发小扩大3倍

原因: 英文跟中文大小不一样

备注:

mysql 对应的drois 如何处理? 测试结构 drois 备注说明,针对E列
bigint bigint bigint
bit #N/A int bitmap
blob #N/A varchar boolean
char char char
date date date
datetime datetime datetime
decimal decimal description
double double keyword
enum #N/A varchar decimal
float float double
int int float
json #N/A string hll 在doris里面就是TEXT
longblob #N/A varchar int
longtext #N/A varchar largeint
mediumblob #N/A varchar smallint
mediumint #N/A bigint string
mediumtext #N/A string tinyint
set #N/A ? varchar varchar
smallint smallint
text #N/A string
time #N/A varchar
timestamp #N/A varchar mysql实际:2022-02-16 15:34:01
cdc读取出来格式:2022-02-16T07:34:01Z 需要加8个小时
tinyint tinyint
tinytext #N/A varchar
varbinary #N/A ? varchar 二进制一样
varchar varchar
year #N/A varchar

问题:

cdc ddl语句监控

if (valueStruct.schema().name().equals("io.debezium.connector.mysql.SchemaChangeValue")) {String historyRecord = valueStruct.getString("historyRecord");JSONObject schemaChangeValueJson = JSONObject.parseObject(historyRecord);logger.error("元数据变更信息:" + schemaChangeValueJson);dataJson = schemaChangeValueJson;if (targetDb != null) {dataJson.put("targetDb", targetDb);dataJson.put("targetTb", table);//向下游传递数据out.collect(dataJson);String content = "warn!!!,钉钉预警!元数据发生变更...目标库:" + targetDb + ",目标表:" + table;DingProd.sendDingRabotProd(content);}} else {

问题:

flink stream load api方式写入doris 发现漏数据了,但是程序没有任务错误异常

分析:

可能问题出在doris内部处理,我们发现丢失的数据是因为有的字段特别大,超出了doris的字段大小,问题出来这里,数据被过滤了

解决方式:

添加一个判断

numberFilteredRows > 0 就预警 打印信息

更新:

海豚调度器定时调度doris sql脚本

mysql -h 192.168.xx.xx -P 9030 -uroot -proot -Dexample_db < sqlFile/dwd_finance_stable合并_20220304114348.sql

更新

cdc的bug:


1,正常的表结构 mysql 5.x版本CREATE TABLE `order_sign_received` (`order_number` varchar(32) NOT NULL COMMENT '运单编号',`sign_people_mobile` varchar(12) DEFAULT NULL COMMENT '签收人电话',`function_code` varchar(32) DEFAULT NULL COMMENT '方法编码',PRIMARY KEY (`order_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;2,执行语句
ALTER TABLE order_sign_received ADD(received_pay_method_changed TINYINT DEFAULT 0 NOT NULL COMMENT '签收修改了收款方式,0:否 1:是'
);发现报错:18:51:38,237 ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource   - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.000119/315091120
18:51:38,238 ERROR io.debezium.pipeline.ErrorHandler                             - Producer failure
io.debezium.DebeziumException: Error processing binlog eventat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more
18:51:38,307 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager  - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:748)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more
18:51:38,315 WARN  org.apache.flink.runtime.taskmanager.Task                     - Source: config Source -> Sink: Print to Std. Out (1/1)#0 (d7461d16ef50d2d32d78e64e3718734f) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exceptionat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 moreException in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)at akka.dispatch.OnComplete.internal(Future.scala:264)at akka.dispatch.OnComplete.internal(Future.scala:261)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)at akka.actor.Actor$class.aroundReceive(Actor.scala)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)... 4 more
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exceptionat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more

本地测试  这个地方debug就不进。

--doris 报错

实际遇到之后:

[taskAppId=TASK-4882385308064_1-5934-10038]:[61] -  -> ERROR 1105 (HY000) at line 21: errCode = 2, detailMessage = Failed to create partition[rep_op_com_reconciliation_stat]. Timeout. Unfinished mark: 10004=257590, 10004=257594, 10004=257598

发生时间大致为 :
[INFO] 2022-04-24 01:14:51.998

我们划分时间范围为:

【2022-04-24 01:10:51.998,2022-04-24 01:15:51.998】

然后找到1004节点(这个在doris web端可以看到 )

我们去1004阶段查看

be.INFO 中找到 tablet id 相关日志

tablet id = 257590

tablet id = 257594

tablet id​​​​​​​ = 257598

参考文章:

Doris 建表是按照 Partition 粒度依次创建的。当一个 Partition 创建失败时,可能会报这个错误。即使不使用 Partition,当建表出现问题时,也会报 Failed to create partition,因为如前文所述,Doris 会为没有指定 Partition 的表创建一个不可更改的默认的 Partition。

当遇到这个错误是,通常是 BE 在创建数据分片时遇到了问题。可以参照以下步骤排查:

在 fe.log 中,查找对应时间点的 Failed to create partition 日志。在该日志中,会出现一系列类似 {10001-10010} 字样的数字对。数字对的第一个数字表示 Backend ID,第二个数字表示 Tablet ID。如上这个数字对,表示 ID 为 10001 的 Backend 上,创建 ID 为 10010 的 Tablet 失败了。
前往对应 Backend 的 be.INFO 日志,查找对应时间段内,tablet id 相关的日志,可以找到错误信息。
以下罗列一些常见的 tablet 创建失败错误,包括但不限于:
BE 没有收到相关 task,此时无法在 be.INFO 中找到 tablet id 相关日志。或者 BE 创建成功,但汇报失败。以上问题,请参阅 [部署与升级文档] 检查 FE 和 BE 的连通性。
预分配内存失败。可能是表中一行的字节长度超过了 100KB。
Too many open files。打开的文件句柄数超过了 Linux 系统限制。需修改 Linux 系统的句柄数限制。
如果创建数据分片时超时,也可以通过在 fe.conf 中设置 tablet_create_timeout_second=xxx 以及 max_create_table_timeout_second=xxx 来延长超时时间。其中 tablet_create_timeout_second 默认是1秒, max_create_table_timeout_second 默认是60秒,总体的超时时间为min(tablet_create_timeout_second * replication_num, max_create_table_timeout_second);

建表命令长时间不返回结果。

Doris 的建表命令是同步命令。该命令的超时时间目前设置的比较简单,即(tablet num * replication num)秒。如果创建较多的数据分片,并且其中有分片创建失败,则可能导致等待较长超时后,才会返回错误。

正常情况下,建表语句会在几秒或十几秒内返回。如果超过一分钟,建议直接取消掉这个操作,前往 FE 或 BE 的日志查看相关错误。
————————————————
版权声明:本文为CSDN博主「墨卿风竹」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_43688472/article/details/122721626

Flink cdc +doris生产遇到的问题汇总-持续更新相关推荐

  1. 【帆软报表】使用技巧及常见问题汇总-持续更新

    [帆软报表]使用技巧及常见问题汇总-持续更新 1.重复与冻结设置,做用:冻结区域 模板-重复与冻结设置 2.单元格有效小数设置 选中单元格-格式-数字-#0.00 3.图表中有效小数设置 图表属性表- ...

  2. iOS精品资源汇总(持续更新)

    文章目录 引言 I.iOS自定义视图相关热门资源 1.1 <用户协议及隐私政策>弹框 1.2 电子签名 1.3 商品详情页 1.4 上传图片视图的封装[支持删除和添加] 1.5 查看风险商 ...

  3. Telegram Android源码问题汇总 持续更新

    libtgvoip目录为空 git clone下来的工程中带有submodule时,submodule的内容没有下载下来,执行如下命令 cd Telegram git submodule update ...

  4. 吉大计算机专硕报录比,22考研院校报录比汇总(持续更新)

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 22考研院校报录比汇总(持续更新) 每个考研的小伙伴最关心的就是你所报考学校的报录比,因为这直接关系到你报考院校的难易程度,从中可以看出各高校的整体录取情 ...

  5. Unity学习知识和链接汇总-持续更新

    Unity学习知识和链接汇总-持续更新 Unity Scripting Reference- https://docs.unity3d.com/Manual/index.html gitee http ...

  6. 【教程汇总+持续更新】Unity游戏开发从入门到入坟

    新的一年,本该在年前整理的年终总结被拖到了年后开工.去年大量时间投入在Catlike教程的翻译上,截止目前位置,教程的进度已经完全追平原作者. 去年还有一部分是断断续续的更新SLG实战教程,但遗憾的是 ...

  7. Flink cdc+ doris 大宽表实践~

    还没整理好,别慌. 一,业务问题: 多个表关联join(涉及时间维度跨度很长),几乎等同于全量关联,这个时候flink sql join没法做,因为state会无线增大,然后OOM. 二,解决方案 : ...

  8. 大数据组件需要额外添加的依赖包汇总(持续更新中)

    现在FLINK已经被阿里收购了,所以会有blink的jar包 大数据组件 依赖包来源 具体的依赖包 Hadoop3.1.2 用户添加 javax.ws.rs-api-2.0-m11.jar jerse ...

  9. unity ui框架_[教程汇总+持续更新]Unity从入门到入坟——收藏这一篇就够了

    ----------------塔防(更新中),作者重写了基础篇(下方目录为:1.1(新) 基础)目前还在持续连载了5篇,因为不多我们更新完就能追到原作者的进度了------------------- ...

最新文章

  1. 谷歌大脑发布神经网络的「核磁共振」,并公开相关代码
  2. Tomcat 与 Resin PK大战
  3. Eclipse 安装Spring tool suite 解决官网下载jar文件无法安装/安装过程出错,及如何下载对应版本zip文件等问题,避坑
  4. TabControl控件
  5. linux 换行符_一个linux帮你做高效数据统计
  6. java windows7 环境变量_Windows7环境变量中,系统变量与用户变量的优先级
  7. 一个Form中2个按钮,PHP后台如何判断提交的是哪一个按钮
  8. java中map的使用和排序使用
  9. PIC单片机应用开发实践教程(五): 烧录器简介
  10. Python 矩形法求1/x的定积分(完美实现)
  11. 高仿腾讯QQ即时通讯IM项目
  12. uart协议测试软件,Uart2any(windows串口调试工具)
  13. 思购臻选模式,秒杀的底层逻辑—微三云贺心悦
  14. Hadoop性能调优总结
  15. 雷电2接口_了解这些常用接口一定会有用的
  16. termux安装docker
  17. C语言实战篇-----调试关键参数+printf输出_文件名_函数名_执行数!!!
  18. Python模拟轮盘抽奖游戏 轮盘分为三部分: 一等奖, 二等奖和三等奖;轮盘转的时候是随机的, 如果范围在[0,0.08)之间,代表一等奖,如果范围在[0.08,0.3)之间,代表2等奖, 如果范围
  19. 超级全面-深拷贝与浅拷贝的实现方式
  20. 快递100发送短信提醒

热门文章

  1. jpa 托管_jpa深入
  2. 相对论通俗演义(1-10) 第十章
  3. 英语学习 20190922
  4. 文件管理系统(操作系统)——9张思维导图
  5. 漫漫人生录 | 程序员年终总结,情绪负债不可取
  6. 全球首个华纳兄弟酒店正式营业,《老友记》喷泉、全新蝙蝠侠战车亮相
  7. 阿里巴巴建成全球超大规模数据中心内“RDMA高速网”,以支撑人工智能科学计算
  8. 2021年安全员-C证考试资料及安全员-C证模拟考试题库
  9. 【树莓派】关于树莓派2代,更新最新内核后,DS18B20温度传感器无法找到对应文件的问题的解决
  10. C++基础面试(持续更新~~~)