使用Flink集群环境进行数据处理
前言
上篇文章记录了搭建分布式Flink集群环境的过程 搭建Flink集群环境
这篇文章咱们聊一聊Flink客户端如何对接Flink集群环境的过程
示例:Flink读取Hadoop中的文件 然后通过集群环境进行数据处理的过程
Hadoop
Hadoop集群环境搭建
搭建大数据运行环境之一
搭建大数据运行环境之二
Hadoop集群端口说明
Hadoop集群搭建过程异常情况
不能格式化存储目录
详细异常信息
org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:192.168.84.132:8485: Directory /usr/local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty
journalnode的端口是8485
处理方式
每一个hadoop journalnode节点上将指定目录删除即可
rm -rf /usr/local/hadoop/jn/data/nameservices001
上传文件到hdfs
cd /usr/local/hadoop/sbin# 创建文件夹hdfs dfs -mkdir /hdfsdata# 文件sudo vi /home/aaa.txt# 上传文件到指定文件夹hdfs dfs -put /home/aaa.txt /hdfsdata
上传文件异常
Hadoop DataNode 节点启不来
详细异常信息
File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation
查看WebUI DataNode情况
http://192.168.84.128:50070/dfshealth.html#tab-datanode
解决方法一
停止集群
cd /usr/local/hadoop/sbin./stop-all.sh
删除在hdfs中配置的data目录
查看data目录
在core-site.xml中配置的hadoop.tmp.dir对应文件件
cat /usr/local/hadoop/etc/hadoop/core-site.xml
删除
rm -rf /usr/local/hadoop/tmp/*
重新格式化
./hadoop namenode -format
重新启动集群
./start-all.sh
解决方法二
如果上面的方法还是不能启动DataNode那么使用这个方法
当执行文件系统格式化时会在namenode数据文件夹(即配置文件中dfs.name.dir在本地系统的路径)中保存一个current/VERSION文件记录namespaceID标志了所有格式化的namenode版本如果我们频繁的格式化namenode那么datanode中保存(即dfs.data.dir在本地系统的路径)的current/VERSION文件只是你地第一次格式化时保存的namenode的ID因此就会造成namenode和datanode之间的ID不一致
解决方法A:(推荐)
删除DataNode的所有资料及将集群中每个datanode节点的/dfs/data/current中的VERSION删除然后重新执行hadoop namenode -format进行格式化重启集群,错误消失
解决方法B:
将name/current下的VERSION中的clusterID复制到data/current下的VERSION中,覆盖掉原来的clusterID
查看DataNode情况
DataNode已经起来了
查看上传文件
http://192.168.84.128:50070
该文件路径
hdfs://192.168.84.128:8020/hdfsdata/aaa.txt
Flink读取数据源并处理数据
DEMO源码
https://gitee.com/pingfanrenbiji/flink-examples-streaming
Flink读取hdfs文件并处理数据
创建flink执行环境
第一个参数:远程flink集群 jobmanager ip地址
第二个参数:8081是jobmanager webui端口
第三个参数:是当前文件夹所在的jar包
数据源
读取hdfs文件数据
各种算子简介
以单词计数为例
先要将字符串数据解析成单词和次数 使用tuple2表示第一个字段是单词 第二个字段是次数次数初始值设置成1
flatmap
flatmap来做解析的工作一行数据可能有多个单词
keyBy
将数据流按照单词字段即0号索引字段做分组keyBy(int index) 得到一个以单词为key的tuple2数据流
timeWindow
在流上指定想要的窗口并根据窗口中的数据计算结果每5秒聚合一次单词数每个窗口都是从零开始统计的
timeWindow 指定想要5秒的翻滚窗口(Tumble)
sum
第三个调用为每个key每个窗口指定了sum聚合函数按照次数字段(即1号索引字段想家)得到结果数据流将每5秒输出一次 这5秒内每个单词出现的次数
将数据打印到控制台
所有算子操作(创建源、聚合、打印)只是构建了内部算子操作的图形
只有在execute被调用时才会在提交到集群或本地计算机上执行
执行报错 找不到代码异常
具体异常信息
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2
解决方法
将当前目录文件夹打包成jar包
使用maven插件maven-jar-plugin
第三个参数指向该jar包
在FLink Web UI查看该任务的执行过程
编译异常
无效的标记
--add-exports=java.base/sun.net.util=ALL-UNNAMED
不支持hdfs文件系统
具体异常信息
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded
处理方式
下载 flink hadoop资源jar包
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
放入flink 安装包 lib目录下
每个节点都需要放上该jar包 然后重启flink集群环境
当前操作节点hadoop namenode节点为standby状态
具体详细信息
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
解决方法
重新格式化2个namenode节点即可
具体详见
搭建大数据运行环境之二
遗留问题
flink数据源来自于socket数据
启动socket服务并输入数据
问题是
Flink并没有监听到该socket数据暂时还没有找到原因 了解的朋友们请联系我 指导我一下哦
如果本地环境是可以监听到的
后记
为了解决这个问题我请教了下 “Apache Flink China社区”钉钉群里面的谢波老师他告诉我:
通过java或scala一般创建本地执行环境 即
'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();'
很少有
'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles);'
这样用的
若使用flink分布式环境 那么通过web ui界面 上传jar包的方式来完成
这也就解释了为什么我没有找到相关资料只能靠自己'摸着石头过河'了
结语
在了解一件新事物的时候 按照自己的想法 一番努力和挣扎之后也许方向是错误的 但也会对它更进一步的了解了
使用Flink集群环境进行数据处理相关推荐
- 搭建Flink集群环境
下载最新的Flink安装包 https://www.apache.org/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.t ...
- 如何在 Flink 集群部署 Alink?
简介:在 Flink 集群部署 Alink,需要部署三个 Jar 包(本文会有一个部分专门讲述如何获取),对于不同 Flink 集群环境,方式有些区别,本文主要讨论 Standalone 集群和 Ku ...
- [Flink课程]---- 9.1 使用Ambari 搭建Flink 集群
转自: https://blog.csdn.net/high2011/article/details/90272331 lee / ambari-flink-service: https://gite ...
- 大数据介绍、集群环境搭建、Hadoop介绍、HDFS入门介绍
大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 文章目录 大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 1.课前资料 2.课程整体介绍 3.大数据介绍 3.1 什么是大数 ...
- kafka 基础知识梳理及集群环境部署记录
一.kafka基础介绍 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特 ...
- 大数据 -- Hadoop集群环境搭建
首先我们来认识一下HDFS, HDFS(Hadoop Distributed File System )Hadoop分布式文件系统.它其实是将一个大文件分成若干块保存在不同服务器的多个节点中.通过联网 ...
- 《Pyflink》Flink集群安装,Python+Flink调研
Flink集群安装,Python+Flink调研 Flink集群部署 下载对应版本安装包:https://flink.apache.org/downloads.html 实验环境为hadoop2.7, ...
- 实时计算框架:Flink集群搭建与运行机制
一.Flink概述 1.基础简介 Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算.Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算.主要特性包 ...
- 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)
高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...
最新文章
- 尝鲜:windows 7 来了
- POJ 放苹果问题(递归)
- 交换机工作原理_什么是POE交换机,它有什么好处?
- OpenStack的部署T版(九)——控制台部署
- 工作178:moment使用
- C#LeetCode刷题之#88-合并两个有序数组(Merge Sorted Array)
- Linux学习笔记:用户、用户组、文件系统和网络
- cas无法使用_【漫画】CAS原理分析!无锁原子类也能解决并发问题!
- 流水账一周小记[2009-9-20]
- Linux登录时执行
- python写web界面读取txt_web端自动化——Python读取txt文件、csv文件、xml文件
- 虚拟汽车加油问题 (贪心算法)
- Windows搭建基于EClipse的CppUTest单元测试环境
- pscs6怎么做html模板,ps cs6设计个人作品网页模板教程(6)
- 【机器学习】【决策树】ID3算法,Python代码实现生成决策树的系统
- 云祺与南非最大移动支付公司iVeri携手合作
- python网络爬虫——爬取嗅事百科
- 使用Wechaty搭建微信文件日程匣子
- replace、replaceAll、replaceFirst的区别
- 在北上深杭做Java开发如何拿到三万月薪,需要什么程度技术?
热门文章
- spin lock自旋锁
- Idea加快开发的10个技巧
- 在GDI+中如何实现以左下角为原点的笛卡尔坐标系
- mysql5.5分别把这两个数据库同步到不同的从服务器
- 如何做一名出色的屌丝码农?
- tomcat报错“The specified JRE installation does not exist”
- C#中委托和事件的区别
- asp.net 的web.config文件编写
- 第一百八十二节,jQuery-UI,知问前端--日历 UI
- Microsoft Azure Remoteapp使用自定义镜像创建桌面服务