导读:本文主要带来 streamx 在我司中实时任务的应用,我们的部署方式是onyarn的模式,然后使用的版本是自己编译的1.2.1,都是基于Flink做的任务开发。

本文通过一下几个点展开:

  • joyme介绍
  • streamx 调研及部署
  • streamx sql 作业开发
  • streamx custom code 作业开发
  • streamx 监控
  • streamx 作业的任务问题定位及测试
  • 社区印象
  • 总结

joyme介绍

乐我无限科技有限公司孵化于知名互联网公司猎豹移动。公司核心产品 LiveMe 是一款全球知名的社交娱乐产品,于 2016 年 4 月上线,并迅速 成为美国时尚年轻人最喜爱的社交产品之一,美国 Google Play 社交榜 长期排名第一。

streamx 调研及部署

streamx也是我们遇见的必然,基于我们之前实时作业的开发模式,我们不得不寻找一个开源的平台来支撑我们的业务工作。

  1. 自己打包到服务器上flink run提交作业
  2. flinksql 通过自己研发的老平台提交,老平台后台开发人员已经离职,后续的代码没有人维护
  3. 一部分sparkstreaming作业
  4. 实时作业有scala 开发,有java开发

基于这些原因吧,我们需要一个开源平台来管理我们的实时作业,同时我们也做一次重构,统一开发模式,统一开发语言,将项目集中管理。

第一次遇见streamx就基本确定了,搭建以后进行了一些操作,界面友好,flink多版本支持,权限管理,作业监控主要的一些功能已经满足我们的需求,社区也是很活跃,从1.1开始见证了streamx的功能的完善,也相信会不断的完善。

streamx sql作业开发

sql开发的模式提供了我们很大的便利,关于一些简单的指标开发,我们不需要写一堆的代码。sql也方便我们很多同学开发工作,毕竟我们一些做仓库的同学在编写代码方面还是有些难度。

打开界面添加新任务,默认 Development Mode 就是sql模式。我们在Flink sql部分开始写我们的sql逻辑。

Flink sql 部分,按照Flink 官网查看我们需要开发的逻辑sql就可以,一般我们就是这三部分。接入source ,中间逻辑处理,最后sink。我们基本的source是kafka,也会有写维表mysql去做关联,最后sink 部分es,redis,mysql。

-- 连接kafka

CREATE TABLE source_table (

`Data` ROW<

uid STRING>

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = '主题',

'connector.properties.bootstrap.servers'='broker地址',

'connector.startup-mode' = 'latest-offset',

'update-mode' = 'append',

'format.type' = 'json',

'connector.properties.group.id' = '消费组id',

'format.derive-schema' = 'true'

);

//落地表sink

CREATE TABLE sink_table (

`uid` STRING

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://xxx/xxx?characterEncoding=utf8&useSSL=false',

'connector.username' = 'username',

'connector.password' = 'password',

'connector.table' = 'tablename',

'connector.write.flush.max-rows' = '50',

'connector.write.flush.interval' = '2s',

'connector.write.max-retries' = '3'

);

// 代码逻辑过程

INSERT INTO sink_table

SELECT  Data.uid  FROM source_table;

Dependency 部分,关于依赖这里提供了两种方式,一种是pom的坐标方式,把我们需要的依赖坐标写上去,或者是从我们本地上传已经下载好的jar包。这两种也可以混着用,上传完毕点击应用然后我们提交作业的时候就会加载我们的依赖。

Application conf 部分,我们这里只是设置了checkpoint 和 savepoint这两个配置。一个保存点的位置,二是多久执行一次checkpoint。其他的配置基本没有动,可以根据自己需要去配置,在使用yarn.application.queue 的时候是没生效的,关于这个我们是通过下面的动态参数进行重新指定的。

剩下的部分,我们的应用名称肯定是必须的,剩下的一些参数配置就要根据我们的作业去配置,你的量大了,就需要内存,并行度给多一些。有时候需要根据作业的运行情况再次进行配置。

Dynamic Option,动态选项配置这里我们配置了基本的一个,yarn队列名称。也有一些配置了开启增量的checkpoint选项和状态过期时间,基本的这些参数都可以从flink的官网去查询到。之前有一些作业确实经常出现内存溢出的问题,然后再加上增量参数和过期参数以后,作业的运行情况好多了。但是过期时间我觉得还需验证一下,还有就是sql作业设计到状态这种比较大和逻辑复杂的情况下,我个人感觉还是用我们streaming代码来实现比较好控制一些。

-Dyarn.application.queue= yarn队列名称

-Dstate.backend.incremental=true

-Dtable.exec.state.ttl=过期时间

完成配置以后提交,然后再application界面进行部署。

streamx custom code作业开发

streaming作业我们使用flink java进行开发,将之前spark scala,flink scala,flink java的作业进行了重构,然后工程整合到了一起,目的就是为了维护起来方便。

custom code 作业需要我们提交我们代码到git,然后配置我们的项目

配置完成以后,根据我们对应的项目进行编译也就完成项目的打包环节。这样后面的constom code作业也可以引用。每次需要上线都需要进行编译才可以,否则只能是上次编译的代码。这里有个问题,就是因为我们公司的gitlab其实账号密码都是定期需要我们去更新的,也是为了安全。这样就会导致,我们streamx已经配置好的项目还是我们之前的密码,结果导致编译失败,所以针对这个问题,我目前的解决方案就是直接去修改数据库的项目表密码字段。当然这里我们可以直接修改源码,简单的实现一个update的操作。

新建任务,选择custom code ,选择flink版本,选择我们的项目以及模块 jar包,选择我们开发的应用模式,Apache Flink,程序主函数入库类,任务的名称。

任务的并行度,监控的方式,内存大小根据我们任务需要进行配置。

Dynamic Option,动态选项参数这里,我们写了基本的两个配置。指定yarn资源队列名称,flink 依赖的jar位置

-Dyarn.application.queue=yarn_queue

-Dyarn.provided.lib.dirs=/streamx/xxxx/lib

Program Args,程序的参数根据我们程序的需要定义我们的参数,比如 我们统一启动的类是StartJobApp,那么启动作业就需要传入作业的full name告诉我们启动类要去找哪个类来启动此次任务,也就是一个反射机制。

--className xxx.xxxx.Test

作业配置完成以后同样也是submit提交,然后application界面部署任务。

streamx 监控

streamx的监控需要在setting模块去配置我们的发送邮件

然后在我们的任务里面,需要配置重启策略,然后监控在多久内几次异常,然后是报警还是重启的策略,同时发送报警要发到哪个邮箱。目前我这个版本1.2.1只是有邮件的发送。

当我们的作业出现失败的情况下,我们就可以接收到报警邮箱。这报警还是很好看的有木有,可以清楚看到我们的哪个作业,什么状态。也可以点击下面的具体地址进行查看。

关于报警这一块目前我们基于streamx的t_flink_app表进行了一个定时任务的开发。为什么要这么做?因为发送邮件这种通知,我们大部分人可能不会去及时去看。所以我们选择监控每个任务的状态去把对应的监控信息发送我们的飞书报警群,这样可以及时发现问题去解决任务。一个简单的python脚本,然后配置了crontab 去定时执行的。

import MySQLdb
import json
import requests

def connect_mysql():
    db = MySQLdb.connect("mysqlhost", "database", "password", "dstream", charset='utf8')

cursor = db.cursor()

cursor.execute("select STATE,JOB_NAME,ALERT_EMAIL  from t_flink_app where state in (-9,15,8,9)")

data = cursor.fetchall()

db.close()
    return data

def alert( data):
    for row in data:
      send(row)

def send(row):
    webhook = '飞书机器人的hook地址'
    payload_mes = {
        "msg_type": "text",
        "content": {
            "text": '作业:' + row[1].encode('utf-8') + ',状态:' + row[0].encode('utf-8')
        }
    }

state='KILLED'

if int(row[0].encode('utf-8')) == 8:
        state='FAILING'
    elif int(row[0].encode('utf-8')) == 9:
        state='FAILED'
    elif int(row[0].encode('utf-8')) == 15:
        state='LOST'
    else:
        state='KILLED'

email=row[2]

if email is None:
        email='-'

payload_mes = {
        "msg_type": "post",
        "content": {
            "post": {
                "zh_cn": {
                    "title": "flink实时作业告警",
                    "content": [
                        [{
                            "tag": "text",
                            "text": "作业名称: " + row[1].encode('utf-8')
                        }],
                           [ {
                                "tag": "text",
                                "text": "作业状态: " + state
                            }],
                            [{
                                "tag": "text",
                                "text": "报警邮件: " + email.encode('utf-8')
                            }]
                    ]
                }
            }
        }
    }
    headers = {
        'Content-Type': 'application/json'
    }

res = requests.request('POST', webhook, headers=headers, data=json.dumps(payload_mes))

print(res.text)

if __name__ == '__main__':
    data = connect_mysql()
    alert(data)
    print(data)

streamx 作业的任务问题定位及测试环节

关于我们作业的异常问题,基本分了这么几种情况。

1、作业启动不起来的问题,就是我们写完作业然后点击运行部署。发现起不来,这时候我们需要看界面的详情信息的日志。在我们的任务列表中有一个眼镜的功能,点进去。然后start logs中会找到我们提交的作业日志信息,点进去查看,如果有明显的提示信息,我们直接解决就可以了。如果没有,就需要我们去我们的后台部署任务的地方 logs/下面的streamx.out,打开以后会找到我们启动失败的日志信息。

2、如果是我们任务已经起来了,但是任务失败了。其实这时候就是交给我们的集群了,但是同样可以用 第一种情况 的方式打开我们作业的具体日志,在这里也可以看到,我们的作业是因为什么?是sql的connector 不存在,还是我们代码的哪行代码空指针了,都可以看到具体的堆栈信息。有了具体信息,我们就可以对症下药了。

3、测试环节

其实做大数据的开发上线跟我们后台开发的测试上线还是有区别的,我们大多数的测试都是自己测试,然后我们接入的环境基本也是正式的环境,我们没有测试环境很多时候上线以后也是需要正式环境去测试再发现问题再进行修改再次上下的这样一个来回过程。

1、代码作业,本地代码测试,自己写的代码,我们本地肯定需要进行代码模拟数据进行一个代码正常的运行。没问题上线后通知业务测试进行验证,同时我们也用正式的环境账号进行一定的测试验证结果。都没问题了清除之前的数据,正式上线。

2、 sql作业,我们去接入一个消息,去做一个指标,最后落地。然后落地以后我们会对数据进行大致的一个校验工作,完了没问题。我们通知业务方的测试进行一个数据的验证。全部验证通过以后,上线。

3、做大数据的我们经常遇到的就是数据问题,关于实时数据的问题,我们是经常会做一些历史数据补数的脚本方案。需要在某个时间点进行这个动作,使得数据能够保持正确。

社区印象

很多时候我们在群里讨论问题,都会得到社区小伙伴的即时响应。提交的一些issue在当下能解决的,基本也会在下一个版本或者最新的代码分支中进行修复。在群里,我们也看到很多不是社区的小伙伴,也在积极互相帮助去解决问题。群里也有很多其他社区的大佬,很多小伙伴也积极加入了社区的开发工作。整个社区给我的感觉还是很活跃!

总结

目前我们线上运行 60个实时作业,sql与code差不多一半一半。后续也会有更多的实时任务进行上线。很多同学都会担心streamx稳不稳定的问题,就我个人感觉而言,streamx只是一个帮助你部署,监控的一个工具。到底稳不稳,还是要看自家的hadoop yarn集群稳不稳定(我们用的onyan模式),已经跟streamx其实关系不大了。还有就是你自己写的sql 或者是代码健不健壮。更多的是这两方面是我们应该考虑,这两方面没问题再充分利用streamx的灵活性才能让我们作业更好的运行,单从一方面说streamx稳不稳定,实属耍流氓。

streamx如何支持我司线上实时作业相关推荐

  1. AI+音视频双引擎驱动,保司线上服务能力全面升级 | 爱分析报告

    报告编委 张扬 爱分析联合创始人&首席分析师 孙文瑞 爱分析高级分析师 廖耘加 爱分析分析师 外部专家(按姓氏拼音排序) 段磊 容联云音视频负责人 徐靖辰 声网数字化转型政企行业总监 特别鸣谢 ...

  2. 金山“云”上音乐节 —— 一文带你看懂如何支持一场线上演出

    疫情当下,线上演艺活动受到巨大冲击,娱乐行业展开一场自救大行动,把演出从线下搬到了线上. 转到线上的演出目前主要是以直播形式出现在各大直播平台,比如最近迅速蹿火的One Third(OT)云蹦迪.&q ...

  3. 线上实时监测推广效果,App投放渠道数据分析

    近几年APP推广市场发生了很大的变化,新的APP越来越多,流量红利的消息也越来越多,流量转化率越来越差. 很多时候公司推广费用的支出没有达到预期效果 如何才能合理分配广告中的每一笔费用? 我们知道一半 ...

  4. 动态引入js文件-支持cdn等线上地址

    动态引入cdn js文件,并使用js中的变量常量,亲测有效 原文链接:https://blog.csdn.net/Jie_1997/article/details/112011603 function ...

  5. 第四范式推出业界首个基于持久内存、支持毫秒级恢复的万亿维线上预估系统...

    线上预估服务是AI在企业应用落地的关键环节,企业通常会采用分布式计算架构在内存中完成实时数据处理和高达万亿维的模型特征存储,并通过多集群副本解决传统纯内存(DRAM)天生的易"失" ...

  6. SDCC 2016线上架构峰会顾问团、嘉宾、议题、日程大公开(免费报名)

    始建于2007年的SDCC,已经成功举办七届,历届技术讲师超过550位,参会人数超过7500人,成为技术圈口碑卓越的技术会议品牌.今年开始,为创造更多的技术交流和分享机会,CSDN已将SDCC拓展为关 ...

  7. 海外直播软件 Bigo 的 TiDB 4.0 线上实践

    作者介绍:徐嘉埥,Bigo DBA,TUG 华南区大使. Bigo 于 2014 年成立,是一家高速发展的科技公司.Bigo 基于强大的音视频处理技术.全球音视频实时传输技术.人工智能技术.CDN 技 ...

  8. 真实时在线K歌,全音乐版权支持|网易云信一体化实时合唱解决方案有一套!...

    在线K歌:井喷式增长的新社交 随着互联网技术与网络音乐的发展,在线K歌这种新兴的声音社交方式越来越为人熟知,短短几年内便积累了大量的拥趸:据统计,截至2018年,在线K歌行业设备数已超过2亿,网民渗透 ...

  9. 从难免的线上bug说起代码的思考

    经常是某司线上又出bug了,然后是给公司造成多少损失,追根究底总是可以找到一些原因,诸如:写代码逻辑考虑不全面,或者代码有硬伤,也有测试不充分,甚至不测试都有,也有是运维的问题等等. 我对测试部专业, ...

最新文章

  1. 图像数据读取及数据扩增方法
  2. 第130天:移动端-rem布局
  3. PHP安装zip拓展,以及libzip安装问题
  4. SSM关联码表的多个字段显示中文流程以及sql写法
  5. lodoop打印控件详解
  6. python炫酷特效代码_推荐几个炫酷的 Python 开源项目
  7. 图Graph--最小生成树
  8. hibernate 各种主键生成策略(转)
  9. tomcat原理及作用,MySQL数据中有很多换行符和回车符
  10. cv2 和matplotlib中画图时的颜色选取
  11. matlab gui界面设计 打开文件,matlab GUI界面设计 点击按钮加载.mat数据的所有变量到工作空间中...
  12. 马氏距离 java实现_马氏距离(Mahalanobis Distence) [python]
  13. ImportError: cannot import name ‘_validate_lengths‘ 解决方法
  14. 突破NVIDIA NVENC并发Session数目限制
  15. Spring boot JPA+Gradle+QueryDSL 完美配置生成Q文件依赖
  16. matlab的损失函数mse,MSELoss损失函数
  17. 『清华ERNIE』 与 『百度ERNIE』 的爱恨情仇
  18. Springboot+Vue整合笔记【超详细】
  19. 版式文件 流式文件_画册版式设计的重要性!
  20. 利用云服务器搭建网站

热门文章

  1. 在C中将二进制转换为十进制
  2. 万向和肖风的区块链版图
  3. Win11任务栏修改方法(更新中)
  4. 深度相机(八)--OpenNI及与Kinect for windows SDK的比较
  5. 经超计算机求职电视剧,经超电影,电视剧全集_经超影视作品大全推荐 - 剧知晓...
  6. 我实测了国内外GPT,问了10个问题,差点把电脑砸了...
  7. 【Linux】详解socket编程接口
  8. XTP dockingpane的使用方法
  9. 传统企业怎么通过抖音引流到微信
  10. ai人工智能大爆发_人工智能解释了大爆炸之前发生的事情