link实例之Wordcount详细步骤

1.我的IDE是IntelliJ IDEA.在官网上https://www.jetbrains.com/idea/下载最新版2018.2的IDEA,如下图。破解可以再http://idea.lanyus.com/上获取破解码进行破解,如下图。

2.当IDE准备就绪后,开始创建一个项目名为bbb的maven项目,如下图。

3.在新窗口打开bbb项目时,IDEA会提示我们是否自动导包。选择自动导包,如下图。

4.对pom.xml配置文件进行修改,如下代码。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiao</groupId><artifactId>bbb</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.10</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.10</artifactId><version>1.2.0</version></dependency></dependencies></project>

5.在src/main/java/目录下新建一个类,我的类名为WordCount,如下代码。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {//定义socket的端口号int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("没有指定port参数,使用默认值9000");port = 9000;}//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");//计算数据DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word:splits) {out.collect(new WordWithCount(word,1L));}}})//打平操作,把每行的单词转为<word,count>类型的数据.keyBy("word")//针对相同的word数据进行分组.timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小.sum("count");//把数据打印到控制台
        windowCount.print().setParallelism(1);//使用一个并行度//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("streaming word count");}/*** 主要为了存储单词以及单词出现的次数*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}

6.开启IP为10.192.12.106的虚拟机,并开启该虚拟机的终端,在终端输入如下命令,该命令可以打开一个端口号为9000的监听,输入命令后光标会停留在如下图的地方。

nc -l 9000

7.切换回IDEA,在菜单栏Build->Build Project,然后运行该类,当控制台console输出如下图所示的信息时表示Wordcount成功的与9000的监听端口建立了连接。

8.在虚拟机终端开的光标停留出,输入hello hello world world world world,然后 回车。在IDEA的控制台会显示如下单词和词频的信息,表示成功。

9.接下来把项目bbb打jar包,上传Flink后台运行,进行如下图操作。

首先要保证Java Compiler版本为1.8。

然后选择File->Project Structure,进行修改。

10.在配置好Flink的虚拟机下,进入目录/opt/data/flink-1.3.2/bin中,输入如下命令,开启Flink的本地模式。(不会配置flink的小伙伴可以打开链接https://www.cnblogs.com/ALittleMoreLove/p/9396118.html)

./start-local.sh

11.在浏览器里输入开启Flink守护进程的虚拟机的IP和8081端口,进入如下Flink前端页面。

12.上传bbb.jar文件到Flink后端运行。

备注:在学习大数据的漫长道路上,我们会遇到各种各样奇怪的问题,在尝试了多种方法仍然无法解决后 如果再没有高人指点,经常一个问题就卡好几天。这种无奈与绝望的感觉我想各位自学大数据的小伙伴们应该深有体会。我个人解决问题通常有两种方法:一种是直接找大牛帮忙,另外一种是在网上找各种相关的博客和帖子,再从中总结出一套可以解决自己问题的方法。自己探索新知识时,往往是很艰辛的,遇到好多天也解决不了的问题也是很正常的,但是千万不要放弃,坚持下来就一定会有收获的!Wordcount实例令我躺了两天的坑,最后终于找到了解决的方法,希望这篇随笔可以对自学大数据的小伙伴提供一定的帮助。

转载于:https://www.cnblogs.com/ALittleMoreLove/p/9449992.html

Flink实例-Wordcount详细步骤相关推荐

  1. CAS_SSO单点登录实例详细步骤(转)、Tomcat ssl(https) 配置

    CAS_SSO单点登录实例详细步骤(转).Tomcat ssl(https) 配置 博客分类: SSO&CAS&Identity Java.Tomcat 0, 从CAS官网下载最新版本 ...

  2. flink on k8s部署方案实践--详细步骤

    背景 Flink-operator极大的方便了我们管理 Flink 集群及其作业,我们只需要自定义yaml文件就可以做到. Flink 官方还未给出 flink-operator 方案,不过 Goog ...

  3. 评分卡模型建模详细步骤-评分卡建模实例之scorecardpy

    目录 0.引言 1.scorecardpy介绍 2.评分卡建模过程 2.1数据加载 2.1变量筛选 2.2数据划分 2.3变量分箱 2.3.1 自动分箱 2.3.2 手动调整分箱 2.4变量转化woe ...

  4. oracle归档模式教程,Oracle从归档模式变成非归档模式详细步骤

    更改Oracle数据库的非归档模式需要重新启动数据库,在mount模式下修改,简要步骤1 以shutdown immediate方式关闭数据库2 启动实 Oracle从归档模式变成非归档模式详细步骤 ...

  5. redis3.0.0 集群安装详细步骤

    2019独角兽企业重金招聘Python工程师标准>>> redis3.0.0 集群安装详细步骤 博客分类: 缓存 Redis集群部署文档(centos6系统) (要让集群正常工作至少 ...

  6. mysql5.5在windows7下编译的详细步骤_Windows7下编译MySQL5.5的详细步骤

    由于在window7下编译MySQL5.5.19过程中遇到很多问题,所以再次把详细步骤写出来,以供大家参阅,这个是完整通过实验的步骤,网上一堆的东西不是少这个就是少那个,所以整理了下,希望帮助大家更好 ...

  7. redis3.0.2 分布式集群安装详细步骤

    redis3.0.2 分布式集群安装详细步骤 --(centos5.8 X64系统) 版本历史 时间 版本 说明 编写者 2015-06-5 1.0 redis3.0.2 分布式集群安装详细步骤 cs ...

  8. oracle11g32位安装流程_Oracle 11g服务器安装详细步骤图文详解

    Oracle 11g是在推出的最新数据库软件,Oracle 11g有400多项功能,经过了1500多个小时的测试,开发工作量达到了3.6万人/月,相当于1000名员工连续研发3年.Oracle 11g ...

  9. linux下安装DB2的详细步骤

    我也是才学习,在网上找了一个在linux下安装DB2 的步骤,共享给大家看看了. linux下安装DB2的详细步骤! 第一步:检查程序包及其版本 在软件包管理中查看下列软件包是否安装,如没有安装,先安 ...

最新文章

  1. 罗浩.ZJU | 如何看待 2020 届校招算法岗「爆炸」的情况?
  2. Asp.Net编码模型
  3. IntelliJ IDEA出现:This file is indented with tabs instead of 4 spaces的问题解决
  4. CSDN 字体颜色表
  5. 图像处理:python实现canny算子
  6. 今日arXiv精选 | 11篇EMNLP 2021最新论文
  7. nginx main error_page
  8. 的内怎么放_燕窝买回来怎么炖?资深窝友告诉你!
  9. MySQL客户端使用
  10. 用scratch实现网上“超人训练”游戏
  11. 【开源分享】多端发布的单商户商城系统
  12. 如何选择IT人才外包服务商?
  13. c语言 输入一行字符,分别统计出其中英文字母 空格 数字和其他字符的个数
  14. 计算机如何增加网络地址,如何添加网络打印机到电脑
  15. 电视hdr测试软件,HDR测试:各家效果差异太夸张_索尼 KD-65A1_液晶电视评测-中关村在线...
  16. 探秘Sophos反病毒实验室监测主流病毒全过程
  17. 网站设计的思考(藏)
  18. 长安十二时望楼传讯表情包生成器 —(后面有彩蛋)
  19. 指定时间几个月(自然月)之后的时间
  20. mysql sql语句生成日历表

热门文章

  1. 超强、超详细Redis数据库入门教程
  2. [实现]Javascript代码的另一种压缩与加密方法——代码图片转换
  3. IOS-资源最小化之点九图片的使用
  4. ArrayList转Json的2个坑
  5. springboot整合mysql5.7_springboot整合mybatis访问mysql,数据库
  6. python 魔法函数 运行时_16个python常用魔法函数
  7. php 表单变量,PHP学习笔记——访问表单变量
  8. alt+数字 符号大全_【BIM工具箱】Revit中特殊符号大全和输入技巧
  9. gridview 中使用 if else_前端代码中如何优化if/else
  10. 网站服务器停止响应,如何解决apache停止响应的问题