flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台
背景
zeppelin不提供per job模式
实时平台开发周期长
基于zeppelin开发一个简易实时平台
开发zeppelin Interpreter
提交sql任务
提交jar任务
背景
随着flink的蓬勃发展,zeppelin社区也大力推进flink与zeppelin的集成.zeppelin的定位是一种使用sql或者scala等语言的一个交互式的分析查询分析工具。
Web-based notebook that enables data-driven,interactive data analytics and collaborative documents with SQL, Scala and more.
所以zeppelin与flink或者是其他的解释器集成的时候,就会有这么一个架构的特点,我需要启动一个处理数据的服务,相关的任务都提交到这个上面,拿flink来说,就是需要启动一个flink的集群,比如local、remote、session模式的集群。当我们执行一些flink sql的时候,都是提交到这个集群来执行的。
zeppelin不提供per job模式
但是我们在生产环境中,对于一些flink的流式任务,我们一般会采用per job的模式提交任务,主要是为了任务资源的隔离,每个任务互不影响。目前zeppelin是不支持这种模式的。所以很多公司都会开发一个自己的实时流式任务计算平台,可以实现使用sql或者jar的方式通过平台来提交任务到集群,避免了底层一些复杂的操作,使一些只会sql的人也能开发flink任务。
实时平台开发周期长
但是开发一个实时计算平台其实是相对比较复杂的,它需要有前端的写sql的页面,后端的提交逻辑,以及前后端的交互等等。所以我的想法是既然zeppelin已经提供了我们做一个实时平台的很多的功能,比如写sql的页面、前后端交互、提交任务、获取任务的状态等等,那么我们是不是可以用zeppelin来开发一个简化版的实时计算平台呢。
基于zeppelin开发一个简易实时平台
今天我们谈谈怎么通过zeppelin来实现一个简易的实时平台,目的是可以把flink的sql和jar的流式任务以per job的方式提交到yarn集群。
我们简单的看下zeppelin中flink 解释器的源码,他底层是使用了flink scala shell,具体相关内容可以参考 Flink Scala REPL :https://ci.apache.org/projects/flink/flink-docs-stable/ops/scala_shell.html.
zeppelin在提交flink的任务的时候,会判断下集群是否启动,如果没有启动flink集群,会根据设置的模式(local、yarn)先启动一个非隔离模式的flink集群(remote模式需要提前启动好一个集群),然后客户端保持着和服务器的连接,后续有用户提交的任务,就把任务提交到刚起启动的集群。我研究了一下代码觉得在这个上面加一个per job模式的话可能会破坏原来的架构,改动还会比较大,所以后来想自己做一个zepplin的解释器,功能就是通过sql或者jar的方式专门用来提交flink的流式任务。
开发zeppelin Interpreter
具体zeppelin的Interpreter的开发可以参考这篇文章。
https://zeppelin.apache.org/docs/0.9.0-preview1/development/writing_zeppelin_interpreter.html
核心的代码就是继承抽象类Interpreter,实现其中的几个方法,我们简单来讲讲。
public abstract class Interpreter {
/** * 初始化的时候调用,可以在这个里面加一些系统初始化的工作,这个方法只调用一次。 * 写过flink自定义source和sink的同学应该不会陌生。 */ @ZeppelinApi public abstract void open() throws InterpreterException;
/** * * 释放Interpreter资源,也只会被调用一次。 */ @ZeppelinApi public abstract void close() throws InterpreterException;
/** * 异步的运行输入框里面的代码并返回结果。. * * @param st 就是页面那个框里你输入的东西 */ @ZeppelinApi public abstract InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException;
}
除了上面列出来的这几个,还有其他的几个,我这里就不罗列代码了,大家有兴趣的可以自己看下。
底层我使用的是flink application模式来提交的任务,在open里面做一些提交flink初始化的工作,比如构造配置文件,启动yarnClient等等。在interpret方法解析内容,执行提交任务的工作。
最终我们实现了可以通过jar包和sql的方式来提交任务到yarn集群。
提交sql任务
我们可以指定一些任务的参数,比如jobname,并行度、checkpoint间隔等等,页面大概长这个样子,提交任务之后,可以在yarn集群看到相关的任务。
提交jar任务
首先把相应的jar上传到hdfs相关路径,然后提交任务之前,指定jar的路径,以及jobname、并行度等等,正文就不需要写什么了,然后把这个任务提交到yarn集群。
目前只是实现了一些核心的功能,还有一些其他的功能需要后续完善。
更多内容,欢迎关注我的公众号【大数据技术与应用实战】
flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台相关推荐
- flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink
简介: Flink 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在刚刚发布的 Flink 1.10 中,PyFlink 添加了对 Python UDFs 的支持,现在可以 ...
- flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路
导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...
- flink sql udf jar包_Java动态加载Jar实例解析
导读:在实际项目开发中,有时会遇到需动态加载jar包的应用场景.如将Flink的UDF自定义方法制作成方法库(既打成一个Jar),在提交给Flink集群的Job中动态加载jar便可使用.下面将举一个简 ...
- flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
可能会有一些截图中会有错误提示,是因为本地的包一直包下载有问题,截完图已经下载好了. 创建包结构 创建一个基础信息类 所有输出到mysql数据库中的自定义MR任务的自定义key均需要实现该抽象类 代码 ...
- hive内置函数_flink教程flink modules详解之使用hive函数
modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用 ...
- Flink SQL Gateway REST Endpoint 使用教程
介绍 SQL Gateway 是一种支持远程多个客户机并发执行 SQL 的服务.它提供了一种提交 Flink Job.查找元数据和在线分析数据的简单方法.SQL Gateway 由可插拔 Endpo ...
- java poi jar maven_导出maven项目依赖的jar包(图文教程)
注意使用mvn命令是需要配置好maven的环境变量 一.导出到自定义目录中 在maven项目下创建lib文件夹,输入以下命令: mvn dependency:copy-dependencies -Do ...
- Springboot jar包外置教程
Springboot jar包外置教程 入职的第一个小任务是:使springboot项目的外部依赖jar包与项目本身打成的jar包分离.每次默认的把springboot项目打成可运行jar包, ...
- 基于Apache Flink的爱奇艺实时计算平台建设实践
导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...
最新文章
- android sql 顺序执行问题,怎么才能执行第一个完毕,才能继续执行第二个
- SAP S/4HANA Central Procurement – 采购的未来
- java程序如何优化--技巧总结
- SQL SERVER 2000安装遇到的问题小汇总(转载)
- c++类与类之间关系
- Index of school
- Blazor Server 应用程序中进行 HTTP 请求
- python远程执行shell 防止注入脚本_解决 window 上python远程执行shell paramiko 下令 Permission denied...
- Zookeeper的简介及命令行操作
- python搭建https代理服务器_使用NGINX作为HTTPS正向代理服务器
- [NAACL19]一个更好更快更强的序列标注成分句法分析器
- 网络编程实战之在线电子词典
- mysql 嵌套查询优化
- 对音频压缩概念的一些误解--记一次与音视频压缩专家的对话
- 15款超好用的新浪微博短链接在线生成器(新浪t.cn、腾讯url.cn)
- 沈劭劼居然还是大疆的....大疆真的可怕。大疆如果做一款室内无人机不分分钟秒杀其他。
- 【跨境电商平台规则与合规研讨会】在跨境驿站顺利召开
- 对接微信公众号出现【invalid ip xxx.xxx.xxx.xxx 】
- 应届生必看的职场建议
- vue el-form表单验证,多表单验证及动态数据项表单验证