• 背景

    • zeppelin不提供per job模式

    • 实时平台开发周期长

  • 基于zeppelin开发一个简易实时平台

  • 开发zeppelin Interpreter

    • 提交sql任务

    • 提交jar任务

背景

随着flink的蓬勃发展,zeppelin社区也大力推进flink与zeppelin的集成.zeppelin的定位是一种使用sql或者scala等语言的一个交互式的分析查询分析工具。

Web-based notebook that enables data-driven,interactive data analytics and collaborative documents with SQL, Scala and more.

所以zeppelin与flink或者是其他的解释器集成的时候,就会有这么一个架构的特点,我需要启动一个处理数据的服务,相关的任务都提交到这个上面,拿flink来说,就是需要启动一个flink的集群,比如local、remote、session模式的集群。当我们执行一些flink sql的时候,都是提交到这个集群来执行的。

zeppelin不提供per job模式

但是我们在生产环境中,对于一些flink的流式任务,我们一般会采用per job的模式提交任务,主要是为了任务资源的隔离,每个任务互不影响。目前zeppelin是不支持这种模式的。所以很多公司都会开发一个自己的实时流式任务计算平台,可以实现使用sql或者jar的方式通过平台来提交任务到集群,避免了底层一些复杂的操作,使一些只会sql的人也能开发flink任务。

实时平台开发周期长

但是开发一个实时计算平台其实是相对比较复杂的,它需要有前端的写sql的页面,后端的提交逻辑,以及前后端的交互等等。所以我的想法是既然zeppelin已经提供了我们做一个实时平台的很多的功能,比如写sql的页面、前后端交互、提交任务、获取任务的状态等等,那么我们是不是可以用zeppelin来开发一个简化版的实时计算平台呢。

基于zeppelin开发一个简易实时平台

今天我们谈谈怎么通过zeppelin来实现一个简易的实时平台,目的是可以把flink的sql和jar的流式任务以per job的方式提交到yarn集群

我们简单的看下zeppelin中flink 解释器的源码,他底层是使用了flink scala shell,具体相关内容可以参考 Flink Scala REPL :https://ci.apache.org/projects/flink/flink-docs-stable/ops/scala_shell.html.

zeppelin在提交flink的任务的时候,会判断下集群是否启动,如果没有启动flink集群,会根据设置的模式(local、yarn)先启动一个非隔离模式的flink集群(remote模式需要提前启动好一个集群),然后客户端保持着和服务器的连接,后续有用户提交的任务,就把任务提交到刚起启动的集群。我研究了一下代码觉得在这个上面加一个per job模式的话可能会破坏原来的架构,改动还会比较大,所以后来想自己做一个zepplin的解释器,功能就是通过sql或者jar的方式专门用来提交flink的流式任务。

开发zeppelin Interpreter

具体zeppelin的Interpreter的开发可以参考这篇文章。

https://zeppelin.apache.org/docs/0.9.0-preview1/development/writing_zeppelin_interpreter.html

核心的代码就是继承抽象类Interpreter,实现其中的几个方法,我们简单来讲讲。

public abstract class Interpreter {

  /**  * 初始化的时候调用,可以在这个里面加一些系统初始化的工作,这个方法只调用一次。  * 写过flink自定义source和sink的同学应该不会陌生。   */  @ZeppelinApi  public abstract void open() throws InterpreterException;

  /**   *    * 释放Interpreter资源,也只会被调用一次。   */  @ZeppelinApi  public abstract void close() throws InterpreterException;

    /**   * 异步的运行输入框里面的代码并返回结果。.   *   * @param st 就是页面那个框里你输入的东西   */  @ZeppelinApi  public abstract InterpreterResult interpret(String st,                                              InterpreterContext context)      throws InterpreterException;    

}

除了上面列出来的这几个,还有其他的几个,我这里就不罗列代码了,大家有兴趣的可以自己看下。

底层我使用的是flink application模式来提交的任务,在open里面做一些提交flink初始化的工作,比如构造配置文件,启动yarnClient等等。在interpret方法解析内容,执行提交任务的工作。

最终我们实现了可以通过jar包和sql的方式来提交任务到yarn集群。

提交sql任务

我们可以指定一些任务的参数,比如jobname,并行度、checkpoint间隔等等,页面大概长这个样子,提交任务之后,可以在yarn集群看到相关的任务。

在这里插入图片描述

提交jar任务

首先把相应的jar上传到hdfs相关路径,然后提交任务之前,指定jar的路径,以及jobname、并行度等等,正文就不需要写什么了,然后把这个任务提交到yarn集群。

在这里插入图片描述

目前只是实现了一些核心的功能,还有一些其他的功能需要后续完善。

更多内容,欢迎关注我的公众号【大数据技术与应用实战】

image

flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台相关推荐

  1. flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink

    简介: Flink 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在刚刚发布的 Flink 1.10 中,PyFlink 添加了对 Python UDFs 的支持,现在可以 ...

  2. flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  3. flink sql udf jar包_Java动态加载Jar实例解析

    导读:在实际项目开发中,有时会遇到需动态加载jar包的应用场景.如将Flink的UDF自定义方法制作成方法库(既打成一个Jar),在提交给Flink集群的Job中动态加载jar便可使用.下面将举一个简 ...

  4. flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

    可能会有一些截图中会有错误提示,是因为本地的包一直包下载有问题,截完图已经下载好了. 创建包结构 创建一个基础信息类 所有输出到mysql数据库中的自定义MR任务的自定义key均需要实现该抽象类 代码 ...

  5. hive内置函数_flink教程flink modules详解之使用hive函数

    modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用 ...

  6. Flink SQL Gateway REST Endpoint 使用教程

    介绍 SQL Gateway 是一种支持远程多个客户机并发执行 SQL 的服务.它提供了一种提交 Flink  Job.查找元数据和在线分析数据的简单方法.SQL Gateway 由可插拔 Endpo ...

  7. java poi jar maven_导出maven项目依赖的jar包(图文教程)

    注意使用mvn命令是需要配置好maven的环境变量 一.导出到自定义目录中 在maven项目下创建lib文件夹,输入以下命令: mvn dependency:copy-dependencies -Do ...

  8. Springboot jar包外置教程

    Springboot jar包外置教程     入职的第一个小任务是:使springboot项目的外部依赖jar包与项目本身打成的jar包分离.每次默认的把springboot项目打成可运行jar包, ...

  9. 基于Apache Flink的爱奇艺实时计算平台建设实践

    导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

最新文章

  1. android sql 顺序执行问题,怎么才能执行第一个完毕,才能继续执行第二个
  2. SAP S/4HANA Central Procurement – 采购的未来
  3. java程序如何优化--技巧总结
  4. SQL SERVER 2000安装遇到的问题小汇总(转载)
  5. c++类与类之间关系
  6. Index of school
  7. Blazor Server 应用程序中进行 HTTP 请求
  8. python远程执行shell 防止注入脚本_解决 window 上python远程执行shell paramiko 下令 Permission denied...
  9. Zookeeper的简介及命令行操作
  10. python搭建https代理服务器_使用NGINX作为HTTPS正向代理服务器
  11. [NAACL19]一个更好更快更强的序列标注成分句法分析器
  12. 网络编程实战之在线电子词典
  13. mysql 嵌套查询优化
  14. 对音频压缩概念的一些误解--记一次与音视频压缩专家的对话
  15. 15款超好用的新浪微博短链接在线生成器(新浪t.cn、腾讯url.cn)
  16. 沈劭劼居然还是大疆的....大疆真的可怕。大疆如果做一款室内无人机不分分钟秒杀其他。
  17. 【跨境电商平台规则与合规研讨会】在跨境驿站顺利召开
  18. 对接微信公众号出现【invalid ip xxx.xxx.xxx.xxx 】
  19. 应届生必看的职场建议
  20. vue el-form表单验证,多表单验证及动态数据项表单验证

热门文章

  1. FreeNAS安装与应用—安装篇
  2. Day12 前端html
  3. NOIP2009 潜伏者
  4. (2014年2月7日升级)Ubuntu-14.04-Alpha2-32位简体中文优化封装版
  5. CentOS5.8下varnish-2.1.5的安装配置
  6. 【WinCE】SD card技术了解并WINCE下SDHC驱动开发(updated)
  7. dedecms模板中首页实现分页的方法
  8. 【主机】vnc 介绍以及安装时注意的问题
  9. 支持企业互联网的正确故障恢复方法
  10. cvMinMaxLoc函数实例