完整报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)at KeyBy.main(KeyBy.java:63)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)at akka.dispatch.OnComplete.internal(Future.scala:264)at akka.dispatch.OnComplete.internal(Future.scala:261)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:506)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)... 4 more
Caused by: java.lang.RuntimeException: Assigned key must not be null!at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NullPointerException: Assigned key must not be null!at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:49)at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:58)at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)... 9 more

原始代码如下:


import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** Summary:*     Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。*/
public class KeyBy {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。//         下面的数据中,分别是ID,时间戳,用户动作,商品ID,商品价格.DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(new UserAction("userID1", 1293984000, "click", "productID1", 10),new UserAction("userID2", 1293984001, "browse", "productID2", 8),new UserAction("userID2", 1293984002, "browse", "productID2", 8),new UserAction("userID2", 1293984003, "browse", "productID2", 8),new UserAction("userID1", 1293984002, "click", "productID1", 10),new UserAction("userID1", 1293984003, "click", "productID3", 10),new UserAction("userID1", 1293984004, "click", "productID1", 10)));// 转换: KeyBy对数据重分区KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {@Overridepublic String getKey(UserAction value) throws Exception{return value.getUserID();}});// 转换: Reduce滚动聚合。这里,滚动聚合每个用户对应的商品总价格。SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>(){@Overridepublic UserAction reduce(UserAction value1, UserAction value2) throws Exception{int newProductPrice = value1.getProductPrice() + value2.getProductPrice();return new UserAction(value1.getUserID(), -1, "", "", newProductPrice);}});// 输出: 将每次滚动聚合后的结果输出到控制台。//3> UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=8)//3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=16)//3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=24)//4> UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)//4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=20)//4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=30)//4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=40)result.print();env.execute();}
}

UserAction.java

public class UserAction {private String userId; //用户idprivate long timestamp; //商品idprivate String behavior; //用户行为(pv, buy, cart, fav)private String itemId; //商品分类idprivate int price; //商品价格public UserAction(String user,long timestamp,String behavior,String itemId,int price){this.userId=userId;this.timestamp=timestamp;this.itemId=itemId;this.behavior=behavior;this.price=price;}public String getUserID() {return userId;}public int getProductPrice(){return price;}//    public String getCategoryId() {
//        return categoryId;
//    }
//
//    public void setCategoryId(String categoryId) {
//        this.categoryId = categoryId;
//    }public String getBehavior() {return behavior;}public long getTimestamp() {return timestamp;}}

解决方案:

UserAction中的

public UserAction(String  user,long timestamp,String behavior,String itemId,int price)

改成

public UserAction(String userId,long timestamp,String behavior,String itemId,int price)

Flink运行出现Assigned key must not be null相关推荐

  1. 【Flink】FLink Assigned key must not be null

    1.场景1 1.1 概述 flink报错: Caused by: java.lang.NullPointerException: Assigned key must not be null! 具体如下 ...

  2. 【FLink】Assigned key must not be null

    1.概述 我的flink任务报错Assigned key must not be null,但是我把keyby用到的字段已经提前做了非空过滤,还是偶尔会报这个错. 还没法复现这个问题. 对应的源码位于 ...

  3. Flink运行时架构

    1 运行时相关的组件     Flink运行时架构主要包括四个不同的组件:作业管理器(JobManager).资源管理器(ResourceManager).任务管理器(TaskManager),以及分 ...

  4. Flink运行时架构 完整使用 (第四章)

    Flink运行时架构 完整使用 一.系统架构 1.整体构成 2.作业管理器(JobManager) 1. JobMaster 2. 资源管理器(ResourceManager) 3. 分发器(Disp ...

  5. flink 运行一段时间 内存溢出_Flink之运行时环境

    Flink 运行时环境由两种类型进程组成,JobManager和TaskManager JobManager,也称为 master,用于协调分布式执行.负责调度任务,检查点,失败恢复等. TaskMa ...

  6. flink运行job任务时报错 Could not retrieve the execution result

    flink运行job任务时报错 org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the e ...

  7. java.util.set cannot be assigned from null_Java中有关Null的9件事

    对于Java程序员来说,null是令人头痛的东西.时常会受到空指针异常(NPE)的骚扰.连Java的发明者都承认这是他的一项巨大失误.Java为什么要保留null呢?null出现有一段时间了,并且我认 ...

  8. 为什么Hashtable ConcurrentHashmap不支持key或者value为null

    在很多java资料中,都有提到 ConcurrentHashmap HashMap和Hashtable都是key-value存储结构,但他们有一个不同点是 ConcurrentHashmap.Hash ...

  9. flink运行原理_浅谈Flink分布式运行时和数据流图的并行化

    本文将以WordCount的案例为主线,主要介绍Flink的设计和运行原理.关于Flink WordCount程序可以参考我之前的文章:读取Kafka实时数据流,实现Flink WordCount.阅 ...

最新文章

  1. golang适合做什么_这年头中年女人适合做什么兼职
  2. Java服务启动慢,JVM预热的问题,我在k8s上改进了
  3. 大佬的设计模式PDF学习笔记!
  4. MySQL事务的的介绍及使用
  5. STL中算法锦集(二)
  6. 简述SAS逻辑库的概念及建立方法。什么是临时库和永久库?
  7. 计算机统考测试,计算机统考专业测试题.doc
  8. 安卓牛客专项练习2020.12.10
  9. 深度学习可视化的一些工具+pytorch实现回归与卷积可视化
  10. java算法实验标尺问题_在codeigniter项目中使用标尺库
  11. java 环境 搭建
  12. 设计模式实例(Lua)笔记之六(Adapter模式)
  13. ansible 小试身手
  14. 深度学习DeBug小笔记(一)——visdom服务启动时提示Downloading scripts, this may take a little while解决办法
  15. java操作pdf之iText快速入门
  16. laydate定位修改
  17. Python 复数类型(详解)
  18. 5个促进 OKR 成功的文化准则
  19. 【numpy】TypeError: only size-1 arrays can be converted to Python scalars
  20. JAVA版农历和阳历相互转换源码

热门文章

  1. crawler碎碎念5 豆瓣爬取操作之登录练习
  2. python06: 运算符. if
  3. concat与concat_ws区别
  4. py库: django (web框架)
  5. vs2015 行数统计
  6. centos7 设置ip地址
  7. jQuery获取URL的GET参数值
  8. apk静态注射[转]-未实践
  9. javascript event详解
  10. linux无法跳转到home,linux No directory, logging in with HOME=/