前言碎语

昨天博主写了《windows环境下flink入门demo实例》实现了官方提供的最简单的单词计数功能,今天升级下,将数据源从socket流换成生产级的消息队列kafka来完成一样的单词计数功能。本文实现的重点主要有两个部分,一是kafka环境的搭建,二是如何使用官方提供的flink-connector-kafka_2.12来消费kafka消息,其他的逻辑部分和上文类似。

进入正题

本篇博文涉及到的软件工具以及下载地址:

Apache Flink :https://flink.apache.org/downloads.html ,请下载最新版1.7.x,选择单机版本

kafka:http://kafka.apache.org/downloads ,请下载最新的2.1.0

第一步:安装kafka,并验证

从上面的下载地址选择二进制包下载后是个压缩包,解压后的目录如下:

进入binwindows下,找到kafka-server-start.bat和zookeeper-server-start.bat。配置文件在config目录下,主要配置一些日志和kafka server和zookeeper,都默认就好。如果你本地已经有zk的环境,就可以忽略zk,不然按照下面的步骤执行即可。

1. 启动zk服务

执行:zookeeper-server-start.bat ....configzookeeper.properties

2.启动kafka服务

执行:kafka-server-start.bat ....configserver.properties

3.创建test主题

执行:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.查看上一步的主题是否创建成功,成功的话控制台会输出test

执行:kafka-topics.bat --list --zookeeper localhost:2181

5.订阅test主题消息

执行:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

6.发布消息

执行:kafka-console-producer.bat --broker-list localhost:9092 --topic test

以上步骤成功后,我们需要验证下是否都成功了。在第六条指令的窗口中输入abc。如果在第5个指令窗口输出了就代表kafka环境ok了。然后可以关掉第5个指令窗口,下面就让Flink来消费kafka的消息

第二步:编写消费kafka消息的Flink job

基础步骤参考《windows环境下flink入门demo实例》一文。唯一的区别就是因为要消费kafka中的数据,所以需要引入一个kafka连接器,官方已提供到maven仓库中,引入最新版本即可,如下:

org.apache.flink    flink-connector-kafka_2.12    1.7.1

然后新建一个KafkaToFlink类 ,代码逻辑和昨天的一样,都是从一段字符串中统计每个词语出现的次数,这个场景比较像我们的热搜关键字,我标题简化为热词统计了。主要的代码如下:

/** * Created by kl on 2019/1/30. * Content :消费kafka数据 */public class KafkaToFlink {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        /**         * 这里主要配置KafkaConsumerConfig需要的属性,如:         * --bootstrap.servers localhost:9092 --topic test --group.id test-consumer-group         */        ParameterTool parameterTool = ParameterTool.fromArgs(args);        DataStream dataStream = env.addSource(new FlinkKafkaConsumer(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));        DataStream windowCounts = dataStream.rebalance().flatMap(new FlatMapFunction() {            public void flatMap(String value, Collector out) {                System.out.println("接收到kafka数据:" + value);                for (String word : value.split("s")) {                    out.collect(new WordWithCount(word, 1L));                }            }        }).keyBy("word")                .timeWindow(Time.seconds(2))                .reduce(new ReduceFunction() {                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {                        return new WordWithCount(a.word, a.count + b.count);                    }                });        windowCounts.print().setParallelism(1);        env.execute("KafkaToFlink");    }}

注意下这个地方:ParameterTool.fromArgs(args);我们所有的关于KafkaConsumerConfig的配置,都是通过启动参数传入的,然后Flink提供了一个从args中获取参数的工具类。这里需要配置的就三个信息,和我们在命令窗口创建订阅一样的参数即可

第三步:验证Flink job是否符合预期

将应用打成jar包后通过Flink web上传到Flink Server。然后,找到你提交的job,输入如下的启动参数,提交submit即可:

成功运行的job的页面如下图,如果下图框框中的指标一直在转圈圈,那么很有可能是因为你运行了其他的job,导致Available Task Slots不够用了。

默认的Flink的Slots配置是1,当出现任务插槽不够用时,上图圈圈转一会就会失败,然后打开job manager 点击log就可以看到job因为没有可用的任务插槽而失败了。

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:535)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)at akka.dispatch.OnComplete.internal(Future.scala:258)at akka.dispatch.OnComplete.internal(Future.scala:256)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

上面的问题可以通过修改conf/flink-conf.yaml中的taskmanager.numberOfTaskSlots来设置,具体指单个TaskManager可以运行的并行操作员或用户功能实例的数量。如果此值大于1,则单个TaskManager将获取函数或运算符的多个实例。这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的操作员或功能实例之间划分。此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半)。当然,如果你修改了配置文件,Flink Server是需要重启的。重启成功后,可以在大盘看到,如下图箭头:

一切就绪后,在kafka-console-producer窗口中输入字符串回车,就会在flink job窗口中看到相关的信息了,效果前文一样,如图:

文末结语

本文算昨天hello wrod入门程序的升级版,实现了消费kafka中的消息来统计热词的功能。后面生产环境也打算使用kafka来传递从mysql binlog中心解析到的消息,算是一个生产实例的敲门砖吧。正如博主昨天所说的,落地的过程肯定会有很多问题,像上面的taskmanager.numberOfTaskSlots的设置。后面会继续将我们落地过程中的问题记录下来,欢迎关注凯京科技一起交流。

kafka jar包_Windows环境下Flink消费Kafka实现热词统计相关推荐

  1. 【Kafka】Flink 消费 kafka 部分 分区 一直不提交 offset

    文章目录 1.概述 1.1 查看消费组 1.2 检测topic 1.3 内置topic 1.4 flink数据监控 1.5 提交offset指标 M.扩展 1.概述 一个环境,突然发现,Flink消费 ...

  2. 【kafka】Flink 消费 kafka Received unknown topic topic/partition may not exist Describe access to it

    1.场景1 很相似的问题参考:[Kafka]kafka console received unknown topic or partition error listOffset request 1.1 ...

  3. 【FLink】Flink 消费 kafka 消费组 死掉 Marking the coordinator dead for group 造成数据重复消费

    文章目录 1.概述 2.源码分析 2.2 能不能设置多次提交呢? 2.3 监控日志 1.概述 首先参考几个案例: [Flink]Flink Kafka 消费卡死 消费组卡死 topic无写入 实际有数 ...

  4. 【Flink】flink消费kafka报错 KafkaConsumer.assign Ljava/util/List

    文章目录 1.概述 1.概述 flink消费kafka上数据时报错: Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients. ...

  5. flink 消费 kafka offset 自动提交

    flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费 如果开启了checkpoint 以 checkpoint为准 ,enable.auto.com ...

  6. Flink消费kafka,某partition突然从头开始消费,yarn部署,无报错,很奇怪

    Flink消费kafka,某partition突然从头开始消费,yarn per job部署,ui页面无报错,检查点也没有异常,很神奇,不知道什么原因?

  7. phpstudy集成包Windows环境下数据库自动备份

    phpstudy集成包Windows环境下数据库自动备份 这里我们会用到Windows下的任务计划程序,Windows服务器同理,如下图所示: 接着我们还需要用到mysqldump这个工具,phpst ...

  8. Flink学习笔记(八):flink热词统计

    我们在网页上经常可以看到比如 百度热榜,微博热搜 这样的排行数据,那么我们在进行网络搜索的时候如何统计这些数据呢? 热词统计有很多中方法,这里主要记录下flink如何进行热词统计. 一.场景 小白在网 ...

  9. java 热词推荐搜索实现,Flink 热词统计(1): 基础功能实现

    本系列文章将从热词统计需求讲起,讲解flink在实际项目中的应用.部署.容错.项目中的代码为scala所编写,没有用 java 的原因是scala语言看起来更加简练,同时与java语言的兼容性较好,可 ...

最新文章

  1. U平方Net:深入使用嵌套的U型结构进行显著目标检测
  2. 使用Flex Bison 和LLVM编写自己的编译器[zz]
  3. 每日英语:Go Ahead, Hit the Snooze Button
  4. python的power bi转换基础
  5. 无法安装某些更新或程序
  6. 自学Python Day1
  7. 3dmax如何删除重叠部分
  8. 内存超频trfc_就是这么的简单,微星主板内存超频分享
  9. explain mysql 耗时_借助慢查询日志和explain命令分析 MySQL慢查询语句分析总结
  10. obj文件格式学习(自用)
  11. mysql 分区 线性hash_MySQL表分区(3)哈希分区-hash
  12. vcpu和cpu的关系
  13. 为什么要用python处理excel-以Excel处理为目的学习python还是VBA?
  14. The JAVA_HOME environment variable is not defined correctly This environment variable is needed to r
  15. 小程序滚动穿透解决方案
  16. mysql的group语句_MySQL中distinct与group by语句的一些比较及用法讲解
  17. 示波器的存储深度设置多大好
  18. 2018考研数学一解析 ​​​
  19. H - 互质数的个数(一)
  20. php多进程更新微信用户信息,php之swoole多进程发送微信模板消息

热门文章

  1. MVC 使用 Ueditor富文本编辑器
  2. JavaScript Oriented[探究面向对象的JavaScript高级语言特性]
  3. jquery 表单 清空
  4. 【c】正负数二进制表示
  5. 初学UML之-------用例图
  6. 电脑开机3秒就重启循环_手机怎么才能投屏到电脑上?3个方法,小屏1秒变大屏,涨知识了...
  7. Git(8)-- 撤消操作(git commit --amend、git reset 和 git checkout 命令详解)
  8. 抖音新特效:蚂蚁呀嘿安卓教程
  9. Win10 Powershell ssh到WSL
  10. 浏览器获取CA认证流程