Flink cdc +doris生产遇到的问题汇总-持续更新
问题:
我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info :
checkpoint一直卡在那里
程序一直等待中:
原因:倒全量数据 chunlSplitter 花费了太长时间,这个在社区提问看有没有解决方案。
知识备份:
阿里云Flink CDC文档地址:
MySQL的CDC源表 - 实时计算Flink版 - 阿里云
cdc参数:
WITH参数
参数 | 说明 | 是否必填 | 数据类型 | 备注 |
---|---|---|---|---|
connector | 源表类型。 | 是 | STRING |
可以填写为mysql-cdc 或者mysql ,二者等价。
|
hostname | MySQL数据库的IP地址或者Hostname。 | 是 | STRING | 无。 |
username | MySQL数据库服务的用户名。 | 是 | STRING | 无。 |
password | MySQL数据库服务的密码。 | 是 | STRING | 无。 |
database-name | MySQL数据库名称。 | 是 | STRING | 数据库名称支持正则表达式以读取多个数据库的数据。 |
table-name | MySQL表名。 | 是 | STRING | 表名支持正则表达式以读取多个表的数据。 |
port | MySQL数据库服务的端口号。 | 否 | INTEGER | 默认值为3306。 |
server-id | 数据库客户端的一个数字 ID。 | 否 | STRING |
该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。
该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。 |
scan.incremental.snapshot.enabled | 是否开启增量快照。 | 否 | BOOLEAN |
默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:
如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。 |
scan.incremental.snapshot.chunk.size | 表的chunk的大小(行数)。 | 否 | Integer | 默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。 |
scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | 否 | Integer | 默认值为1024。 |
scan.startup.mode | 消费数据时的启动模式。 | 否 | STRING |
参数取值如下:
。 |
server-time-zone | 数据库在使用的会话时区。 | 否 | STRING | 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。 |
debezium.min.row.count.to.stream.results | 当表的条数大于该值时,会使用分批读取模式。 | 否 | INTEGER |
默认值为1000。Flink采用以下方式读取MySQL源表数据:
|
connect.timeout | 在尝试连接MySQL数据库服务器之后,连接器在超时之前应该等待的最大时间。 | 否 | Duration | 默认值为30秒。 |
版本:
Flink版本 1.13
Flink cdc版本 2.1.1
场景说明:
使用flink cdc stream api 读取mysql整库数据直接写入doris
大概100G数据量,大概几十个表,大表小表,字段多,字段少,单个字段类型复杂等等情况都包含了。
出现情况:
任务运行一段时间之后挂掉,出现问题:
2022-02-11 18:33:59,461 INFO com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:196', splitKeyType=[`id` INT NOT NULL], splitStart=[17079248], splitEnd=[17165910], highWatermark=null} to subtask 0
2022-02-11 18:33:59,976 INFO com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator receives finished split offsets FinishedSnapshotSplitsReportEvent{finishedOffsets={plateform_stable_copy.order_address:196={ts_sec=0, file=mysql-bin.006361, pos=441499143, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18305222, row=0, event=0}}} from subtask 0.
2022-02-11 18:33:59,977 INFO com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlSnapshotSplit{tableId=plateform_stable_copy.order_address, splitId='plateform_stable_copy.order_address:197', splitKeyType=[`id` INT NOT NULL], splitStart=[17165910], splitEnd=[17252572], highWatermark=null} to subtask 0
2022-02-11 18:34:00,079 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 57 (type=CHECKPOINT) @ 1644575640072 for job 01f4e4416ccf488091611165e921b83b.
2022-02-11 18:34:00,760 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread 'SourceCoordinator-Source: dataSourceStream -> processStream' produced an uncaught exception. Stopping the process...
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2022-02-11 18:34:00,768 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,788 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-02-11 18:34:00,789 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-02-11 18:34:00,791 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RUNNING to RESTARTING.
2022-02-11 18:34:00,795 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:12287
2022-02-11 18:34:00,800 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleEventFromOperator(SourceCoordinator.java:156) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:81) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:209) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:130) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,816 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-dd18d903-98a1-4ae6-8e07-cd5a7bae3801/flink-web-ui
2022-02-11 18:34:00,828 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://prod-qd-ct7-cdh-data-node03:9490 lost leadership
2022-02-11 18:34:00,828 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2022-02-11 18:34:00,828 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2022-02-11 18:34:00,829 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2022-02-11 18:34:00,829 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,829 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping all currently running jobs of dispatcher akka.tcp://flink@prod-qd-ct7-cdh-data-node03:5662/user/rpc/dispatcher_1.
2022-02-11 18:34:00,830 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_181]
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_181]
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) ~[?:1.8.0_181]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) [flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-11.0.jar:2.6.0-cdh5.16.2-11.0]
2022-02-11 18:34:00,832 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job 01f4e4416ccf488091611165e921b83b was not finished by JobManager.
2022-02-11 18:34:00,832 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting down cluster because job not finished
2022-02-11 18:34:00,834 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (1/1) (d6710965feba37b327b8f6083c4f53cc) switched from RUNNING to FAILED on container_e05_1641803753156_0100_01_000002 @ prod-qd-ct7-cdh-data-node03 (dataPort=29600).
java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:59) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
Caused by: java.lang.Error: This indicates that a fatal error has happened and caused the coordinator executor thread to exit. Check the earlier logsto see the root cause of the problem.
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:114) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_181]
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_181]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.runInEventLoop(SourceCoordinator.java:312) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.subtaskFailed(SourceCoordinator.java:182) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$subtaskFailed$1(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.subtaskFailed(RecreateOnResetOperatorCoordinator.java:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.subtaskFailed(OperatorCoordinatorHolder.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.notifyCoordinatorOfCancellation(DefaultScheduler.java:561) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelExecutionVertex(DefaultScheduler.java:317) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_181]
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) ~[?:1.8.0_181]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_181]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_181]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_181]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_181]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_181]
at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelTasksAsync(DefaultScheduler.java:309) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:253) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:234) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:229) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:133) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:997) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:548) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
... 9 more
2022-02-11 18:34:00,840 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job dev-SingleInstanceData2doris(01f4e4416ccf488091611165e921b83b).
2022-02-11 18:34:00,842 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (01f4e4416ccf488091611165e921b83b) switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: Scheduler is being stopped.
at org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:604) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.0.jar:1.13.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.0.jar:1.13.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.0.jar:1.13.0]
2022-02-11 18:34:00,845 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from RUNNING to CANCELING.
2022-02-11 18:34:00,848 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/1) (1a3ac3e7fbadc81e49d53b75795c2c63) switched from CANCELING to CANCELED.
2022-02-11 18:34:00,850 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the results produced by task execution 1a3ac3e7fbadc81e49d53b75795c2c63.
2022-02-11 18:34:00,850 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2022-02-11 18:34:00,850 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
问题分析:
看日志,监控等等,没有发现是代码报错,任务自动重启重试也是失败。
1,首先我尝试通过checkpoint恢复任务,发现恢复之后运行一段时间也是同样的错误
2,判断是不是某个表数据导致的问题,清空遇到问题的那个表数据,通过checkpoint恢复任务之后,运行一段时间还是出现同样的错误,而且发现一个规律就是大表会导致同样的问题,小表数据会被写入(cdc读取库是按表一个一个读取的)
3,那么解决问题可能就是资源问题,尝试加大flink 任务资源。(因为测试写入数据量100G,所以给的资源不大)
问题搜索:
在flink cdc社区搜索关键字:
This indicates that a fatal error has happened and caused the coordinator executor thread to exit
地址:
https://github.com/ververica/flink-cdc-connectors/issues?q=This+indicates+that+a+fatal+error+has+happened+and+caused+the+coordinator+executor+thread+to+exit
遂加大jobManager和 taskManger内存(扩大4倍),目前任务正在运行中
问题2:
读取离线数据完成之后 yarn任务自己死掉了。
2022-02-14 12:34:53,207 INFO com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.006360, pos=93487577, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18300905, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}} to subtask 0
2022-02-14 12:34:58,680 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (1/3) (e6807bd2ac2a982054dd3bb62006a462) switched from RUNNING to FAILED on container_e05_1641803753156_0111_01_000002 @ prod-qd-ct7-cdh-data-node02 (dataPort=1818).
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.RuntimeException: SplitFetcher thread 6349 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644813298581,db=,server_id=0,file=mysql-bin.006360,pos=93487577,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
2022-02-14 12:34:58,715 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,715 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,716 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 6 tasks should be restarted to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RUNNING to RESTARTING.
2022-02-14 12:34:58,719 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 1 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 2 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,726 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from CANCELING to CANCELED.
2022-02-14 12:34:58,727 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,365 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,367 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}]
2022-02-14 12:34:59,426 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,426 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2022-02-14 12:34:59,569 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,570 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 8082a3ead0f36284e54f4bf28b8a695e
2022-02-14 12:35:18,727 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RESTARTING to RUNNING.
2022-02-14 12:35:18,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 8082a3ead0f36284e54f4bf28b8a695e from Checkpoint 93 @ 1644813292042 for 8082a3ead0f36284e54f4bf28b8a695e located at hdfs://nameservice1/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/8082a3ead0f36284e54f4bf28b8a695e/chk-93.
2022-02-14 12:35:18,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore
2022-02-14 12:35:18,732 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 2 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (1/3) (9975f5c8e492cda22add5c1abd34ba64) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (097d045473dfbc8e980daf2ed5095b96) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (bfeda1f01099ea1fe1656a6f431a56f7) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (41433c56861b77cb145c5e2eabda66ce) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d88b153f6c0db78c8b461b768d90cb0f) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (eee0927845d4dcff02ecf8a7af2810f8) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 1 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
分析:
可能是binlog文件被删除掉了 ,在随后的通过checkpoint savepoint恢复过程中也发现 binlog文件不存在,任务恢复失败。
问题 :
2022-02-14 19:14:24
com.alibaba.fastjson.JSONException: write javaBean error, fastjson version 1.2.47, class java.nio.HeapByteBuffer, fieldName : post_dept
at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:465)
at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:120)
at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:270)
at com.alibaba.fastjson.serializer.MapSerializer.write(MapSerializer.java:44)
at com.alibaba.fastjson.serializer.ListSerializer.write(ListSerializer.java:137)
at com.alibaba.fastjson.serializer.JSONSerializer.write(JSONSerializer.java:281)
at com.alibaba.fastjson.JSON.toJSONString(JSON.java:673)
at com.alibaba.fastjson.JSON.toJSONString(JSON.java:611)
at com.alibaba.fastjson.JSON.toJSONString(JSON.java:576)
at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:248)
at com.sjb.cdc.customization.Data2dorisCustomization$3.process(Data2dorisCustomization.java:225)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:434)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.alibaba.fastjson.util.FieldInfo.get(FieldInfo.java:484)
at com.alibaba.fastjson.serializer.FieldSerializer.getPropertyValueDirect(FieldSerializer.java:140)
at com.alibaba.fastjson.serializer.JavaBeanSerializer.write(JavaBeanSerializer.java:249)
... 25 more
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getChar(HeapByteBuffer.java:259)
... 32 more2022-02-15 11:53:09
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to [B
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getBytes(Struct.java:168)
at com.sjb.cdc.customization.JsonSchemaCustomization.foreachStruct(JsonSchemaCustomization.java:147)
at com.sjb.cdc.customization.JsonSchemaCustomization.createData(JsonSchemaCustomization.java:110)
at com.sjb.cdc.customization.JsonSchemaCustomization.deserialize(JsonSchemaCustomization.java:89)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
分析:
遇到了二进制字段 需要单独判断 ,二进制字段放入json报错
for (Field field : afterStruct.schema().fields()) {String fieldName = field.name();
if ("bytes".equals(field.schema().type().getName())) {byte[] bytes = afterStruct.getBytes(fieldName);if (bytes != null) {String fieldValue = new String(bytes);dataJson.put(fieldName, fieldValue);}
}
问题:
cdc读取mysql 写入doris 字段映射处理问题,直接上测试代码吧
for (Field field : struct.schema().fields()) {String fieldName = field.name();String schemaName = field.schema().name();Object fieldValue = struct.get(field);if (pkNameList.contains(fieldName)) {partitionerNum += fieldValue.hashCode();dataJson.put("partitionerNum", partitionerNum);}//todo 这里是时间字段try {if ("int32".equals(field.schema().type().getName()) && Date.SCHEMA_NAME.equals(schemaName)) {if (fieldValue != null) {int day = (int) fieldValue;long second = day * 24 * 60 * 60L;String dateStr = LocalDateTime.ofEpochSecond(second, 0, ZoneOffset.ofHours(8)).format(dateFormatter);dataJson.put(fieldName, dateStr);}} else if ("int64".equals(field.schema().type().getName()) && Timestamp.SCHEMA_NAME.equals(schemaName)) {if (fieldValue != null) {long times = (long) fieldValue;String dateTime = LocalDateTime.ofEpochSecond(times / 1000 - 8 * 60 * 60,0,ZoneOffset.ofHours(8)).format(dateTimeFormatter);dataJson.put(fieldName, dateTime);}} else if ("string".equals(field.schema().type().getName()) && "io.debezium.time.ZonedTimestamp".equals(schemaName)) {String timestampValueStr = struct.getString(fieldName);if (fieldValue != null) {LocalDateTime localDateTime = LocalDateTime.parse(timestampValueStr,timestampFormatter);LocalDateTime rsTime = localDateTime.plusHours(8);String timestampValue = rsTime.format(dateTimeFormatter);dataJson.put(fieldName, timestampValue);}} else if ("bytes".equals(field.schema().type().getName()) && StringUtils.isEmpty(schemaName)) {byte[] bytes = struct.getBytes(fieldName);if (bytes != null) {String bytesValue = new String(bytes);dataJson.put(fieldName, bytesValue);}} else {if (fieldValue != null) {dataJson.put(fieldName, fieldValue);}}} catch (Exception ex) {String errorInfo = StringUtils.join(ex.getStackTrace(), "");logger.error("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo);DingProd.sendDingRabotProd("table:" + dataJson.getString("canal_table") + ",fieldName = " + fieldName + ",msg:" + errorInfo);}}
更新:
mysql `timestmp_test2` timestamp(3)
这个代码转换会报错
所以要用 hutool 工具类去转换。
Hutool参考文档
问题
mysql char字段类型 跟doris 大小不是一对一
mysql char类型 我们映射为doris varchar 发小扩大3倍
原因: 英文跟中文大小不一样
备注:
mysql | 对应的drois | 如何处理? | 测试结构 | drois | 备注说明,针对E列 |
bigint | bigint | bigint | |||
bit | #N/A | ? | int | bitmap | |
blob | #N/A | varchar | boolean | ||
char | char | char | |||
date | date | date | |||
datetime | datetime | datetime | |||
decimal | decimal | description | |||
double | double | keyword | |||
enum | #N/A | ? | varchar | decimal | |
float | float | double | |||
int | int | float | |||
json | #N/A | ? | string | hll | 在doris里面就是TEXT |
longblob | #N/A | varchar | int | ||
longtext | #N/A | varchar | largeint | ||
mediumblob | #N/A | varchar | smallint | ||
mediumint | #N/A | bigint | string | ||
mediumtext | #N/A | string | tinyint | ||
set | #N/A | ? | varchar | varchar | |
smallint | smallint | ||||
text | #N/A | string | |||
time | #N/A | varchar | |||
timestamp | #N/A | varchar |
mysql实际:2022-02-16 15:34:01 cdc读取出来格式:2022-02-16T07:34:01Z 需要加8个小时 |
||
tinyint | tinyint | ||||
tinytext | #N/A | varchar | |||
varbinary | #N/A | ? | varchar | 二进制一样 | |
varchar | varchar | ||||
year | #N/A | varchar |
问题:
cdc ddl语句监控
if (valueStruct.schema().name().equals("io.debezium.connector.mysql.SchemaChangeValue")) {String historyRecord = valueStruct.getString("historyRecord");JSONObject schemaChangeValueJson = JSONObject.parseObject(historyRecord);logger.error("元数据变更信息:" + schemaChangeValueJson);dataJson = schemaChangeValueJson;if (targetDb != null) {dataJson.put("targetDb", targetDb);dataJson.put("targetTb", table);//向下游传递数据out.collect(dataJson);String content = "warn!!!,钉钉预警!元数据发生变更...目标库:" + targetDb + ",目标表:" + table;DingProd.sendDingRabotProd(content);}} else {
问题:
flink stream load api方式写入doris 发现漏数据了,但是程序没有任务错误异常
分析:
可能问题出在doris内部处理,我们发现丢失的数据是因为有的字段特别大,超出了doris的字段大小,问题出来这里,数据被过滤了
解决方式:
添加一个判断
numberFilteredRows > 0 就预警 打印信息
更新:
海豚调度器定时调度doris sql脚本
mysql -h 192.168.xx.xx -P 9030 -uroot -proot -Dexample_db < sqlFile/dwd_finance_stable合并_20220304114348.sql
更新
cdc的bug:
1,正常的表结构 mysql 5.x版本CREATE TABLE `order_sign_received` (`order_number` varchar(32) NOT NULL COMMENT '运单编号',`sign_people_mobile` varchar(12) DEFAULT NULL COMMENT '签收人电话',`function_code` varchar(32) DEFAULT NULL COMMENT '方法编码',PRIMARY KEY (`order_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;2,执行语句
ALTER TABLE order_sign_received ADD(received_pay_method_changed TINYINT DEFAULT 0 NOT NULL COMMENT '签收修改了收款方式,0:否 1:是'
);发现报错:18:51:38,237 ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.000119/315091120
18:51:38,238 ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: Error processing binlog eventat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more
18:51:38,307 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:748)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more
18:51:38,315 WARN org.apache.flink.runtime.taskmanager.Task - Source: config Source -> Sink: Print to Std. Out (1/1)#0 (d7461d16ef50d2d32d78e64e3718734f) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exceptionat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 moreException in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)at akka.dispatch.OnComplete.internal(Future.scala:264)at akka.dispatch.OnComplete.internal(Future.scala:261)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)at akka.actor.Actor$class.aroundReceive(Actor.scala)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)... 4 more
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exceptionat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default valueat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)at java.lang.Iterable.forEach(Iterable.java:75)at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)... 27 more
本地测试 这个地方debug就不进。
--doris 报错
实际遇到之后:
[taskAppId=TASK-4882385308064_1-5934-10038]:[61] - -> ERROR 1105 (HY000) at line 21: errCode = 2, detailMessage = Failed to create partition[rep_op_com_reconciliation_stat]. Timeout. Unfinished mark: 10004=257590, 10004=257594, 10004=257598
发生时间大致为 :
[INFO] 2022-04-24 01:14:51.998
我们划分时间范围为:
【2022-04-24 01:10:51.998,2022-04-24 01:15:51.998】
然后找到1004节点(这个在doris web端可以看到 )
我们去1004阶段查看
be.INFO 中找到 tablet id 相关日志
tablet id = 257590
tablet id = 257594
tablet id = 257598
参考文章:
Doris 建表是按照 Partition 粒度依次创建的。当一个 Partition 创建失败时,可能会报这个错误。即使不使用 Partition,当建表出现问题时,也会报 Failed to create partition,因为如前文所述,Doris 会为没有指定 Partition 的表创建一个不可更改的默认的 Partition。
当遇到这个错误是,通常是 BE 在创建数据分片时遇到了问题。可以参照以下步骤排查:
在 fe.log 中,查找对应时间点的 Failed to create partition 日志。在该日志中,会出现一系列类似 {10001-10010} 字样的数字对。数字对的第一个数字表示 Backend ID,第二个数字表示 Tablet ID。如上这个数字对,表示 ID 为 10001 的 Backend 上,创建 ID 为 10010 的 Tablet 失败了。
前往对应 Backend 的 be.INFO 日志,查找对应时间段内,tablet id 相关的日志,可以找到错误信息。
以下罗列一些常见的 tablet 创建失败错误,包括但不限于:
BE 没有收到相关 task,此时无法在 be.INFO 中找到 tablet id 相关日志。或者 BE 创建成功,但汇报失败。以上问题,请参阅 [部署与升级文档] 检查 FE 和 BE 的连通性。
预分配内存失败。可能是表中一行的字节长度超过了 100KB。
Too many open files。打开的文件句柄数超过了 Linux 系统限制。需修改 Linux 系统的句柄数限制。
如果创建数据分片时超时,也可以通过在 fe.conf 中设置 tablet_create_timeout_second=xxx 以及 max_create_table_timeout_second=xxx 来延长超时时间。其中 tablet_create_timeout_second 默认是1秒, max_create_table_timeout_second 默认是60秒,总体的超时时间为min(tablet_create_timeout_second * replication_num, max_create_table_timeout_second);建表命令长时间不返回结果。
Doris 的建表命令是同步命令。该命令的超时时间目前设置的比较简单,即(tablet num * replication num)秒。如果创建较多的数据分片,并且其中有分片创建失败,则可能导致等待较长超时后,才会返回错误。
正常情况下,建表语句会在几秒或十几秒内返回。如果超过一分钟,建议直接取消掉这个操作,前往 FE 或 BE 的日志查看相关错误。
————————————————
版权声明:本文为CSDN博主「墨卿风竹」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_43688472/article/details/122721626
Flink cdc +doris生产遇到的问题汇总-持续更新相关推荐
- 【帆软报表】使用技巧及常见问题汇总-持续更新
[帆软报表]使用技巧及常见问题汇总-持续更新 1.重复与冻结设置,做用:冻结区域 模板-重复与冻结设置 2.单元格有效小数设置 选中单元格-格式-数字-#0.00 3.图表中有效小数设置 图表属性表- ...
- iOS精品资源汇总(持续更新)
文章目录 引言 I.iOS自定义视图相关热门资源 1.1 <用户协议及隐私政策>弹框 1.2 电子签名 1.3 商品详情页 1.4 上传图片视图的封装[支持删除和添加] 1.5 查看风险商 ...
- Telegram Android源码问题汇总 持续更新
libtgvoip目录为空 git clone下来的工程中带有submodule时,submodule的内容没有下载下来,执行如下命令 cd Telegram git submodule update ...
- 吉大计算机专硕报录比,22考研院校报录比汇总(持续更新)
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 22考研院校报录比汇总(持续更新) 每个考研的小伙伴最关心的就是你所报考学校的报录比,因为这直接关系到你报考院校的难易程度,从中可以看出各高校的整体录取情 ...
- Unity学习知识和链接汇总-持续更新
Unity学习知识和链接汇总-持续更新 Unity Scripting Reference- https://docs.unity3d.com/Manual/index.html gitee http ...
- 【教程汇总+持续更新】Unity游戏开发从入门到入坟
新的一年,本该在年前整理的年终总结被拖到了年后开工.去年大量时间投入在Catlike教程的翻译上,截止目前位置,教程的进度已经完全追平原作者. 去年还有一部分是断断续续的更新SLG实战教程,但遗憾的是 ...
- Flink cdc+ doris 大宽表实践~
还没整理好,别慌. 一,业务问题: 多个表关联join(涉及时间维度跨度很长),几乎等同于全量关联,这个时候flink sql join没法做,因为state会无线增大,然后OOM. 二,解决方案 : ...
- 大数据组件需要额外添加的依赖包汇总(持续更新中)
现在FLINK已经被阿里收购了,所以会有blink的jar包 大数据组件 依赖包来源 具体的依赖包 Hadoop3.1.2 用户添加 javax.ws.rs-api-2.0-m11.jar jerse ...
- unity ui框架_[教程汇总+持续更新]Unity从入门到入坟——收藏这一篇就够了
----------------塔防(更新中),作者重写了基础篇(下方目录为:1.1(新) 基础)目前还在持续连载了5篇,因为不多我们更新完就能追到原作者的进度了------------------- ...
最新文章
- 谷歌大脑发布神经网络的「核磁共振」,并公开相关代码
- Tomcat 与 Resin PK大战
- Eclipse 安装Spring tool suite 解决官网下载jar文件无法安装/安装过程出错,及如何下载对应版本zip文件等问题,避坑
- TabControl控件
- linux 换行符_一个linux帮你做高效数据统计
- java windows7 环境变量_Windows7环境变量中,系统变量与用户变量的优先级
- 一个Form中2个按钮,PHP后台如何判断提交的是哪一个按钮
- java中map的使用和排序使用
- PIC单片机应用开发实践教程(五): 烧录器简介
- Python 矩形法求1/x的定积分(完美实现)
- 高仿腾讯QQ即时通讯IM项目
- uart协议测试软件,Uart2any(windows串口调试工具)
- 思购臻选模式,秒杀的底层逻辑—微三云贺心悦
- Hadoop性能调优总结
- 雷电2接口_了解这些常用接口一定会有用的
- termux安装docker
- C语言实战篇-----调试关键参数+printf输出_文件名_函数名_执行数!!!
- Python模拟轮盘抽奖游戏 轮盘分为三部分: 一等奖, 二等奖和三等奖;轮盘转的时候是随机的, 如果范围在[0,0.08)之间,代表一等奖,如果范围在[0.08,0.3)之间,代表2等奖, 如果范围
- 超级全面-深拷贝与浅拷贝的实现方式
- 快递100发送短信提醒