#2020云栖大会#阿里云海量offer来啦!投简历、赢阿里云限量礼品及阿里云ACA认证免费考试资格!>>>

用过 Zeppeli n 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的 主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗? 如果你不想用 Zeppelin UI,但又想用 Z eppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?
如果你有这样的诉求,那么 Zeppelin Client API (SDK) 就是你所需要的东西。

Zeppelin 简介

对于不熟悉 Zeppelin 的人,可以用一句话来解释 Zeppelin:大数据引擎的入口,交互式大数据分析平台底座。Zeppelin 最大的特点是连接多种引擎,具有可插拔式,下面这张图例举了一些常用的引擎,当然 Zeppelin 还支持其他很多引擎,这里就不一一例举。

虽然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,对于很多不熟悉 Zeppelin 的人来说使用 Rest API 门槛太高,所以 Zeppelin 专门开发了一个 Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分为 2 个层面的的东西(接下来会逐个详细介绍):

  • Zeppelin Client API (Low Level API)

  • Session API (High Level API)

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度进行操作。你可以先在 notebook 里写好代码 (比如开发阶段在 notebook 里写代码,做测试),然后用 Low Level API 用编程的方式把 Job 跑起来(比如生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例举几个重要的接口(这些 API 都比较直观,我就不多做解释了)。

public String createNote(String notePath) throws Exception public void deleteNote(String noteId) throws Exception public NoteResult executeNote(String noteId) throws Exception public NoteResult executeNote(String noteId,                               Map<String, String> parameters) throws Exception                              public NoteResult queryNoteResult(String noteId) throws Exception public NoteResult submitNote(String noteId) throws Exceptionpublic NoteResult submitNote(String noteId,                              Map<String, String> parameters) throws Exception                              public NoteResult waitUntilNoteFinished(String noteId) throws Exceptionpublic String addParagraph(String noteId,                            String title,                            String text) throws Exception                           public void updateParagraph(String noteId,                             String paragraphId,                             String title,                             String text) throws Exception                            public ParagraphResult executeParagraph(String noteId,                                        String paragraphId,                                        String sessionId,                                        Map<String, String> parameters) throws Exception                                        public ParagraphResult submitParagraph(String noteId,                                       String paragraphId,                                       String sessionId,                                       Map<String, String> parameters) throws Exception                                       public void cancelParagraph(String noteId, String paragraphId)    public ParagraphResult queryParagraphResult(String noteId, String paragraphId)     public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这些 API 能用来做什么呢? 
一个典型的用途是我们在 Zeppelin 里写好代码,做好测试,然后在第三方系统里集成进来。比如下面的代码就是把 Zeppelin 自带的 Spark Basic Features 用编程的方式跑起来,你不仅可以跑 Zeppelin Note,还可以拿到运行结果 (ParagraphResult)。怎么处理运行结果,就留给你发挥想象的空间吧(可以在你的系统里展示出来,或者可视化出来,或者传给其他系统做消费等等)。
此外,对于 Dynamic forms(动态控件,比如文本框,下拉框等等),你还可以动态的提供参数,如下面例子里的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");ZeppelinClient zClient = new ZeppelinClient(clientConfig);String zeppelinVersion = zClient.getVersion();System.out.println("Zeppelin version: " + zeppelinVersion);ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);Map<String, String> parameters = new HashMap<>();parameters.put("maxAge", "40");paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);parameters = new HashMap<>();parameters.put("marital", "married");paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

这下面这张图就是上面我们要 Zeppelin Client API 跑的 Zeppelin 自带的 Spark Basic Features。


Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 里没有 Note,Paragraph 的概念,粒度是你提交的代码。Session API里最重要的class就是 ZSession,这也是Session API的入口,一个 ZSession 代表一个独立的Zeppelin Interpreter 进程,对于 Flink 来说就是一个独立的 Flink Session Cluster。下面例举一些典型的接口(这些 API 都比较直观,我就不多做解释了)。

public void start() throws Exceptionpublic void start(MessageHandler messageHandler) throws Exceptionpublic void stop() throws Exceptionpublic ExecuteResult execute(String code) throws Exceptionpublic ExecuteResult execute(String subInterpreter,                             Map<< span="">String, String> localProperties,                             String code,                             StatementMessageHandler messageHandler) throws Exceptionpublic ExecuteResult submit(String code) throws Exceptionpublic ExecuteResult submit(String subInterpreter,                            Map<< span="">String, String> localProperties,                            String code,                            StatementMessageHandler messageHandler) throws Exception                           public void cancel(String statementId) throws Exception public ExecuteResult queryStatement(String statementId) throws Exceptionpublic ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个 API 能用来做什么呢?   一个典型的用途是就是我们动态创建 Session (Zeppelin Interpreter 进程),动态的提交运行代码,并拿到运行结果。 比如你不想用 Zeppelin 的 UI,要自己做一个 Flink 的开发管理平台,那么你就可以自己做 UI,让用户在 UI 上配置 Flink Job,输入 SQL,然后把所有的这些信息发送到后端,后端调用 ZSession 来运行 Flink Job。

下面的 Java 代码就是用编程的方式调用了 2 条 Flink SQL 语句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中读取源源不断发送过来更新的 SQL 运行结果 (怎么来使用这个结果就靠你的想象力了)。
需要说明的是像 Flink Interpreter 这种流式结果数据更新是通过 WebSocket 实现的,所以下面的代码里有会有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,这些 MessageHandler 就是用来处理通过 WebSocket 发送过来的流式数据结果。下面是 2 条我们在 Zeppelin 里运行的 Flink SQL。

接下来我们会用 Zeppelin Session API 来跑着这 2 条 Flink SQL,然后我们会在MyStatementMessageHandler1,MyStatementMessageHandler2 里拿到结果展示出来。

ZSession session = null;try {    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");    Map<< span="">String, String> intpProperties = new HashMap<>();    session = ZSession.builder()        .setClientConfig(clientConfig)        .setInterpreter("flink")        .setIntpProperties(intpProperties)        .build();    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.    // otherwise you have to use a global MessageHandler.    session.start(new CompositeMessageHandler());    System.out.println("Flink Web UI: " + session.getWeburl());    System.out.println("-----------------------------------------------------------------------------");    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));    ExecuteResult result = session.execute(initCode);    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());    // run flink ssql    Map<< span="">String, String> localProperties = new HashMap<>();    localProperties.put("type", "update");    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",                            new MyStatementMessageHandler1());    session.waitUntilFinished(result.getStatementId());    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",                            new MyStatementMessageHandler2());    session.waitUntilFinished(result.getStatementId());} catch (Exception e) {    e.printStackTrace();} finally {    if (session != null) {        try {            session.stop();        } catch (Exception e) {            e.printStackTrace();        }    }}public static class MyStatementMessageHandler1 implements StatementMessageHandler {    @Override    public void onStatementAppendOutput(String statementId, int index, String output) {        System.out.println("MyStatementMessageHandler1, append output: " + output);    }    @Override    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {        System.out.println("MyStatementMessageHandler1, update output: " + output);    }}public static class MyStatementMessageHandler2 implements StatementMessageHandler {    @Override    public void onStatementAppendOutput(String statementId, int index, String output) {        System.out.println("MyStatementMessageHandler2, append output: " + output);    }    @Override    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {        System.out.println("MyStatementMessageHandler2, update output: " + output);    }}

除了编程方式跑 Flink Job,这个 Session API 还能给我们带来什么呢?
在 Zeppelin 里如果你可以通过 %flink.conf 来对你的 Flink Cluster 进行非常丰富的配置,但是 %flink.conf 是纯文本的配置,不熟悉 Flink 的人很容易配错(如下图)。如果你是自己做 Flink 开发平台的话就可以做一个更完整的 UI,用一些下拉框等等把一些配置选项固定下来,用户只要选择就行了,不需要自己输入文本来配置。

还有下面这类 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比较容易写错的,同理你可以在自己 UI 里用一些控件把这些选项提前固定下来,而不是让用户输入文本的方式。

我相信 Zeppelin Client API 还有很多可以发挥和想象的空间,大家脑洞起来吧。
▼ 视频演示  ▼

更多 Zeppelin 技术干货及使用交流可加入 Flink on Zeppelin 钉钉群。
(钉钉扫码加群)


专注大数据技术、架构、实战

关注我,带你不同角度看数据架构

本文分享自微信公众号 - 大数据每日哔哔(bb-bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

Zeppelin SDK :Flink 平台建设的基石相关推荐

  1. 基于Apache Flink的爱奇艺实时计算平台建设实践

    导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  2. Flink从入门到精通100篇(二十三)-基于Apache Flink的爱奇艺实时计算平台建设实践

    前言 随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  3. 知乎的 Flink 数据集成平台建设实践

    简介:本文由知乎技术平台负责人孙晓光分享,主要介绍知乎 Flink 数据集成平台建设实践.内容如下: 1. 业务场景 : 2. 历史设计 : 3. 全面转向 Flink 后的设计 : 4. 未来 Fl ...

  4. 美团基于 Flink 的实时数仓平台建设新进展

    摘要:本文整理自美团实时数仓平台负责人姚冬阳在 Flink Forward Asia 2021 实时数仓专场的演讲.主要内容包括: 平台建设现状 遇到的问题及解决 未来规划 点击查看直播回放 & ...

  5. 阿里PB级Kubernetes日志平台建设实践

    阿里PB级Kubernetes日志平台建设实践 QCon是由InfoQ主办的综合性技术盛会,每年在伦敦.北京.纽约.圣保罗.上海.旧金山召开.有幸参加这次QCon10周年大会,作为分享嘉宾在刘宇老师的 ...

  6. 干货分享 | 阿里PB级Kubernetes日志平台建设实践

    嘉宾 | 元乙 随着近两年的发展,Kubernetes 早已成为容器编排领域的标准,现在非常多的企业基于 Kubernetes 构建整个微服务的开发.运维平台,而日志是其中必不可少的核心功能.本文整理 ...

  7. 京东到家机器学习平台建设

    文|巩学超/戴枫/魏铮 编辑|刘慧卿/闫文广 目录 前言 机器学习平台总体架构 模型训练平台 特征模型管理平台 在线模型预测服务 算法应用实践 总结和展望 1. 前言 京东到家作为行业领先的即时零售平 ...

  8. ​网易游戏实时 HTAP 计费风控平台建设

    摘要:本文整理自网易互娱资深工程师, Flink Contributor, CDC Contributor 林佳,在 FFA 实时风控专场的分享.本篇内容主要分为五个部分: 实时风控业务会话 会话关联 ...

  9. 美团图数据库平台建设及业务实践

    图数据结构,能够更好地表征现实世界.美团业务相对较复杂,存在比较多的图数据存储及多跳查询需求,亟需一种组件来对千亿量级图数据进行管理,海量图数据的高效存储和查询是图数据库研究的核心课题.本文介绍了美团 ...

最新文章

  1. 决策树算法十问及经典面试问题
  2. Unity持久化存储之PlayerPrefs的使用
  3. Sql语句里的递归查询
  4. python读写excel模块pandas_Python3使用pandas模块读写excel操作示例
  5. 好想找一个灵魂伴侣,然后带着他一起周游世界,会实现吗?
  6. All Things Intelligence—Baidu World 2020
  7. springboot整合rpc远程调用_SpringBoot—-JsonRpc跨语言远程调用协议 - Java天堂
  8. matlab使用xlsread报错,matlab的IO操作复习
  9. 【python】词云图制作
  10. svn locked 怎么解决
  11. html表格制作练习
  12. 如何用MATLAB预测未来人口,使用matlab对将来人口总数进行预测
  13. go实现简单的chan
  14. python opendr_《网络工程师的Python之路》出书了!
  15. 小程序webview嵌入h5兼容iphone安全区域
  16. 用Python掌握QQ群聊天记录数据分析
  17. 物联网导论-自动识别技术
  18. 光功率 博科交换机_华为交换机查看光功率的方法请大神指教
  19. vue项目中videoPlayer 的 src 视频地址参数动态修改---方法
  20. 收购家乐福中国,苏宁要做新零售第三极?

热门文章

  1. 集合差异比较算法及性能测试
  2. Android生成keystore方法
  3. Træfɪk入门教程
  4. Docker-Compose快速搭建Oracle-11G系统
  5. Linux系统下搭建BUG管理系统---禅道
  6. ZooKeeper动态重新配置
  7. Fedora/RedHat上搭建MariaDB
  8. 什么样才能叫精通java_Java学到什么程度才能叫精通?
  9. js this指向问题,同级this指向同级,非同级this指向全局
  10. C#LeetCode刷题之#500-键盘行(Keyboard Row)