本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。

代码拆解

首先要设置Flink的执行环境:

// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream

// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";// Source
FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);

使用Flink算子处理这个数据流:

// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("s");// 输出结果 (word, 1)for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

这里使用的是Flink提供的DataStream级别的API,主要包括转换、分组、窗口和聚合等操作。

将数据流打印:

// Sink
wordCount.print();

最后执行这个程序:

// execute
env.execute("kafka streaming word count");

env.execute 是启动Flink作业所必需的,只有在execute()被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行。

完整代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {public static void main(String[] args) throws Exception {// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// SourceFlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(consumer);// Transformations// 使用Flink算子对输入流的文本进行操作// 按空格切词、计数、分区、设置时间窗口、聚合DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("s");// 输出结果 (word, 1)for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// SinkwordCount.print();// executeenv.execute("kafka streaming word count");}
}

执行程序

我们在Kafka入门简介这篇文章中曾提到如何启动一个Kafka集群,并向某个Topic内发送数据流。在本次Flink作业启动之前,我们还要按照那篇文章中提到的方式启动一个Kafka集群,创建对应的Topic,并向Topic中写入数据。

Intellij Idea调试执行

在IntelliJ Idea中,点击绿色按钮,执行这个程序。下图中任意两个绿色按钮都可以启动程序。

IntelliJ Idea下方会显示程序中输出到标准输出上的内容,包括本次需要打印的结果。

恭喜你,你的第一个Flink程序运行成功!

在集群上提交作业

第一步中我们已经下载并搭建了本地集群,接着我们在模板的基础上添加了代码,并可以在IntelliJ Idea中调试运行。在生产环境,一般需要将代码编译打包,提交到集群上。

注意,这里涉及两个目录,一个是我们存放我们刚刚编写代码的工程目录,简称工程目录,另一个是从Flink官网下载解压的Flink主目录,主目录下的bin目录中有Flink提供好的命令行工具。

进入工程目录,使用Maven命令行编译打包:

# 使用Maven将自己的代码编译打包
# 打好的包一般放在工程目录的target子文件夹下
$ mvn clean package

回到刚刚下载解压的Flink主目录,使用Flink提供的命令行工具flink,将我们刚刚打包好的作业提交到集群上。命令行的参数--class用来指定哪个主类作为入口。我们之后会介绍命令行的具体使用方法。

$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut /Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar

这时,仪表盘上就多了一个Flink程序。

程序的输出会打到Flink主目录下面的log目录下的.out文件中,使用下面的命令查看结果:

$ tail -f log/flink-*-taskexecutor-*.out

停止本地集群:

$ ./bin/stop-cluster.sh

Flink开发和调试过程中,一般有几种方式执行程序:

  1. 使用IntelliJ Idea内置的运行按钮。这种方式主要在本地调试时使用。
  2. 使用Flink提供的标准命令行工具向集群提交作业,包括Java和Scala程序。这种方式更适合生产环境。
  3. 使用Flink提供的其他命令行工具,比如针对Scala、Python和SQL的交互式环境。这种方式也是在调试时使用。

flink入门_Flink入门:读取Kafka实时数据流,实现WordCount相关推荐

  1. 使用python读取kafka实时topic数据demo,包括安装kafka module

    1. 安装kafka module kafka-python安装,转载:https://blog.csdn.net/see_you_see_me/article/details/78468421 1. ...

  2. ETL数据交换平台,支持多种实时数据流接入

    RestCloud ETL数据交换平台是基于微服务架构完全自主研发和创新的新一代数据集成平台,通过叠加API服务平台即可快速落地构建一个轻量级的数据中台.平台通过可视化的拖.拉.拽即可完成数据集成流程 ...

  3. Flink:从入门到放弃

    文章目录 前言 一.Flink简介 1. Flink组件栈 2. Flink基石 3. Fink的应用场景 3.1 Event-driven Applications[事件驱动] 3.2 Data A ...

  4. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  5. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  6. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  7. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  8. Flink学习记录--入门篇

    前言 流式计算可能在日常不多见,主要统计一个阶段内的PV.UV,在风控场景很常见,比如统计某个用户一天内同地区下单总量来判断该用户是否为异常用户.还有一些大数据处理场景,如将某一段时间生成的日志按需要 ...

  9. python读取输入流_Python读取实时数据流教程

    今天小编就为大家分享一篇Python读取实时数据流示例,具有很好的参考价值,希望对大家有所帮助.一起跟随小编过来看看吧 1.#coding:utf-8 chose = [ ('foo',1,2), ( ...

最新文章

  1. 光流 速度_[论文笔记] FlowNet 光流估计
  2. 依赖注入在 dotnet core 中实现与使用:2 使用 Extensions DependencyInjection
  3. P1501 [国家集训队]Tree II
  4. Oracle Warehouse Builder 自动化ETL处置处罚历程(1)
  5. 《KyLin学习理解》-01-KyLin麒麟的简介及其思想
  6. ES11新特性_绝对全局对象globalThis---JavaScript_ECMAScript_ES6-ES11新特性工作笔记067
  7. [当当网,你意欲何为]之二:无奈,配送之痛
  8. 基于卷积神经网络的草莓病害检测(新数据+基础算法?=SCI)
  9. 开课吧Java课堂之PrintWriter类的运用
  10. WPF中查找指定类型的父控件
  11. 阿里云:我们为全面服务政企市场做好了准备!
  12. CAM350 简单使用
  13. 信息化与计算机基础课课堂融合,高等学校计算机基础课程多元教学系列教材:网页设计与制作...
  14. 微信OpenIdUnionID
  15. Flash 控件的安装
  16. 记录一下unity3d资源加载Resources.Load资源加载的坑
  17. 微信公众号开发(一)——测试账号申请
  18. lambda表达式写法
  19. 远程过程调用失败 异常来自 HRESULT:0x800706BE
  20. 程序员的真实工资是多少?

热门文章

  1. 监控利器--Cacti
  2. model里使用汉字页面崩掉
  3. 英文字典。怎样设计数据结构
  4. 已知数组存放一批QQ号码,QQ号码最长为11位,最短为5位String[] strs = {“12345“,“67891“,“12347809933“,“98765432102“,“67891“,“1
  5. java string 加密_java字符串加密解密
  6. 信息学奥赛一本通 2042:【例5.10】稀疏矩阵
  7. 信息学奥赛一本通 2073:【例2.16 】三角形面积
  8. 信息学奥赛一本通(1016:整型数据类型存储空间大小)
  9. 4-adjacent(AtCoder-2686)
  10. 家谱树(信息学奥赛一本通-T1351)