134.1 实时流计算介绍

  • 所谓实时流计算,就是近几年由于数据得到广泛应用之后,在数据持久性建模不满足现状的情况下,急需数据流的瞬时建模或者计算处理。
  • 这种实时计算的应用实例有金融服务、网络监控、电信数据管理、 Web 应用、生产制造、传感检测,等等。在这种数据流模型中,单独的数据单元可能是相关的元组(Tuple),如网络测量、呼叫记录、网页访问等产生的数据。
  • 但是,这些数据以大量、快速、时变(可能是不可预知)的数据流持续到达,由此产生了一些基础性的新的研究问题——实时计算。实时计算的一个重要方向就是实时流计算。

134.2 实时流计算过程

  • 以热卖产品的统计为例,看下传统的计算手段:

    • 将用户行为、log等信息清洗后保存在数据库中.
    • 将订单信息保存在数据库中.
    • 利用触发器或者协程等方式建立本地索引,或者远程的独立索引.
    • join订单信息、订单明细、用户信息、商品信息等等表,聚合统计20分钟内热卖产品,并返回top-10.
    • web或app展示.
      • 这是一个假想的场景,但假设你具有处理类似场景的经验,应该会体会到这样一些问题和难处:
    • 水平扩展问题(scale-out)
      • 显然,如果是一个具有一定规模的电子商务网站,数据量都是很大的。而交易信息因为涉及事务,所以很难直接舍弃关系型数据库的事务能力,迁移到具有更好的scale-out能力的NoSQL数据库中。
      • 那么,一般都会做sharding。历史数据还好说,可以按日期来归档,并可以通过批处理式的离线计算,将结果缓存起来。但是,这里的要求是20分钟内,这很难。
    • 性能问题
      • 这个问题,和scale-out是一致的,假设做了sharding,因为表分散在各个节点中,所以需要多次入库,并在业务层做聚合计算。
    • 业务扩展问题
      • 假设不仅仅要处理热卖商品的统计,还要统计广告点击、或者迅速根据用户的访问行为判断用户特征以调整其所见的信息,更加符合用户的潜在需求等,那么业务层将会更加复杂。
      • 所以需要一种实时计算的模型,而不是批处理模型。
      • 需要的这种模型,必须能够处理很大的数据,所以要有很好的scale-out能力,最好是,都不需要考虑太多一致性、复制的问题。
  • 那么,这种计算模型就是实时计算模型,也可以认为是流式计算模型

134.3 案例

  • 假设,的业务要求是:

    • 统计20分钟内最热的10个微博话题
  • 解决这个问题,需要考虑:
  • 数据源
这里,假设的数据,来自微博长连接推送的话题。
  • 问题建模
认为的话题是#号扩起来的话题,最热的话题是此话题出现的次数比其它话题都要多。
比如:@foreach_break : 你好,#世界#,我爱你,#微博#。
“世界”和“微博”就是话题。
  • 计算引擎采用storm
  • 定义时间
    • 时间的定义是一件很难的事情,取决于所需的精度是多少
    • 根据实际,一般采用tick来表示时刻这一概念。
    • 在storm的基础设施中,executor启动阶段,采用了定时器来触发“过了一段时间”这个事件。
    • 如下所示:
(defn setup-ticks! [worker executor-data](let [storm-conf (:storm-conf executor-data)tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)receive-queue (:receive-queue executor-data)context (:worker-context executor-data)](when tick-time-secs(if (or (system-id? (:component-id executor-data))(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))(= :spout (:type executor-data))))(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))(schedule-recurring(:user-timer worker)tick-time-secstick-time-secs(fn [](disruptor/publishreceive-queue[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]])))))))
  • bolt如何判断收到的tuple表示的是“tick”呢?

    • 负责管理bolt的executor线程,从其订阅的消息队列消费消息时,会调用到bolt的execute方法,那么,可以在execute中这样判断:
public static boolean isTick(Tuple tuple) {return tuple != null&& Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
  • 结合上面的setup-tick!的clojure代码,可以知道SYSTEM_TICK_STREAM_ID在定时事件的回调中就以构造函数的参数传递给了tuple,那么SYSTEM_COMPONENT_ID是如何来的呢?

    • 可以看到,下面的代码中,SYSTEM_TASK_ID同样传给了tuple:;;
    • 请注意SYSTEM_TASK_ID和SYSTEM_TICK_STREAM_ID
      (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)
    • 然后利用下面的代码,就可以得到SYSTEM_COMPONENT_ID:
    public String getComponentId(int taskId) {if(taskId==Constants.SYSTEM_TASK_ID) {return Constants.SYSTEM_COMPONENT_ID;} else {return _taskToComponent.get(taskId);}}
  • 滑动窗口

    • 有了上面的基础设施,还需要一些手段来完成“工程化”,将设想变为现实。
  • Topology
    String spoutId = "wordGenerator";String counterId = "counter";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";// 这里,假设TestWordSpout就是发送话题tuple的源builder.setSpout(spoutId, new TestWordSpout(), 5);// RollingCountBolt的时间窗口为9秒钟,每3秒发送一次统计结果到下游builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));// IntermediateRankingsBolt,将完成部分聚合,统计出top-n的话题builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));// TotalRankingsBolt, 将完成完整聚合,统计出top-n的话题builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
  • 上面的topology设计如下:
  • 将聚合计算与时间结合起来
    • tick事件,回调中会触发bolt的execute方法,那可以这么做:
RollingCountBolt:@Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {LOG.debug("Received tick tuple, triggering emit of current window counts");// tick来了,将时间窗口内的统计结果发送,并让窗口滚动emitCurrentWindowCounts();}else {// 常规tuple,对话题计数即可countObjAndAck(tuple);}}// obj即为话题,增加一个计数 count++// 注意,这里的速度基本取决于流的速度,可能每秒百万,也可能每秒几十.// 内存不足? bolt可以scale-out.private void countObjAndAck(Tuple tuple) {Object obj = tuple.getValue(0);counter.incrementCount(obj);collector.ack(tuple);}// 将统计结果发送到下游private void emitCurrentWindowCounts() {Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();lastModifiedTracker.markAsModified();if (actualWindowLengthInSeconds != windowLengthInSeconds) {LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));}emit(counts, actualWindowLengthInSeconds);}
  • 上面的代码可能有点抽象,看下这个图就明白了,tick一到,窗口就滚动:
IntermediateRankingsBolt & TotalRankingsBolt:public final void execute(Tuple tuple, BasicOutputCollector collector) {if (TupleUtils.isTick(tuple)) {getLogger().debug("Received tick tuple, triggering emit of current rankings");// 将聚合并排序的结果发送到下游emitRankings(collector);}else {// 聚合并排序updateRankingsWithTuple(tuple);}}
  • 其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方法略有不同:
  • IntermediateRankingsBolt的聚合排序方法:
  @Overridevoid updateRankingsWithTuple(Tuple tuple) {// 这一步,将话题、话题出现的次数提取出来Rankable rankable = RankableObjectWithFields.from(tuple);// 这一步,将话题出现的次数进行聚合,然后重排序所有话题super.getRankings().updateWith(rankable);}
  • TotalRankingsBolt的聚合排序方法:
  @Overridevoid updateRankingsWithTuple(Tuple tuple) {// 提出来自IntermediateRankingsBolt的中间结果Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);// 聚合并排序super.getRankings().updateWith(rankingsToBeMerged);// 去0,节约内存super.getRankings().pruneZeroCounts();}
  • 而重排序方法比较简单粗暴,因为只求前N个,N不会很大:
  private void rerank() {Collections.sort(rankedItems);Collections.reverse(rankedItems);}
  • 下图可能就是想要的结果,完成了t0 - t1时刻之间的热点话题统计.

大数据视频推荐:
CSDN
人工智能算法竞赛实战
AIops智能运维机器学习算法实战
ELK7 stack开发运维实战
PySpark机器学习从入门到精通
AIOps智能运维实战
腾讯课堂
大数据语音推荐:
ELK7 stack开发运维
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

134.如何进行实时计算相关推荐

  1. flink实战教程-使用set实时计算当天网站uv

    文章目录 背景 案例讲解 模拟source 定义窗口 自定义聚合算子 处理输出结果 背景 对于web网站,我们一般会有这样的需求,实时的计算出来当天网站的uv,尽可能快的展示出来.今天我们就讲一下基于 ...

  2. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  3. 基于Apache Flink的爱奇艺实时计算平台建设实践

    导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  4. SLA 99.99%以上!饿了么实时计算平台3年演进历程

    来自:DBAplus社群 作者介绍 倪增光,饿了么BDI-大数据平台研发高级技术经理,曾先后就职于PPTV.唯品会.15年加入饿了么,组建数据架构team,整体负责离线平台.实时平台.平台工具的开发和 ...

  5. 唯品会实时计算平台的演进之路

    来自:DBAplus社群 本文根据王新春老师在[2018 DAMS中国数据资产管理峰会]现场演讲内容整理而成. 讲师介绍 王新春,唯品会高级经理.数据平台实时团队高级架构师,主要负责实时计算平台.实时 ...

  6. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  7. 实时计算Flink 产品定价——续费和变配

    续费 续费是指购买当前项目到期后项目增加的时长.续费时长单位为月或年,最小续费时长为一个月. 续费步骤 在项目总览里点击项目管理. 点击项目续费. 选择需要续费的时长,系统会根据计算资源以及续费时长为 ...

  8. Spark Streaming实时计算框架介绍

    随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在 ...

  9. 实时计算Flink——产品安全

    实时计算 Flink支持整体全链路实时计算的安全. 账号安全 账号安全分为实时计算账号安全以及数据存储账号安全,下面分别阐述. 实时计算账号安全 实时计算账号当前仅支持阿里云账号体系(包括登录用户名+ ...

最新文章

  1. 西北工业大学21计算机考研,西北工业大学2018年计算机考研879专业综合考试大纲...
  2. [NC15034]德玛西亚万岁
  3. spark匹配html字段,Apache Spark中的高效字符串匹配
  4. concurrent: wai notify notifyAll
  5. 面试精讲之面试考点及大厂真题 - 分布式专栏 19 系统中的降级熔断设计
  6. 检测到目标主机可能存在缓慢的http拒绝服务攻击_高防服务器能防住哪些攻击?“流量清洗”与它有什么关系?...
  7. 记录uluuuuuuu
  8. 阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第3节 两种获取Stream流的方式_2_Stream流中的常用方法_forEach...
  9. 用python画折线图
  10. php前台切图,php网页切图/js切图
  11. 系统发育树构建算法和软件
  12. Linux系统文件颜色代表的意思
  13. 单片机模拟iic从设备-主要代码(2)
  14. 解决“此实现不是 Windows 平台 FIPS 验证的加密算法的一部分”
  15. Oracle 12c统一审计
  16. 为什么要用CAT工具辅助翻译?为什么要用翻译管理系统?以memoQ为例
  17. 华为OD机试 - 开心消消乐
  18. [ app.json 文件内容错误] app.json: window.navigationBarTextStyle 字段需为 black,white【已解决】
  19. PHP在线客服系统源码
  20. TerraBuilder 操作制作MPT

热门文章

  1. 让人混淆的Person p=new Person();和Person p=null;
  2. 在线查字典/汉语字典大全/字典查询网站源码开发搭建
  3. python-OpenCV图像处理常用函数汇总(三)
  4. 全文检索服务器-Elasticsearch
  5. 谁说程序员不懂浪漫?用Python每天自动给女朋友免费发短信
  6. 青云科技以开放姿态打造低代码平台
  7. 解决el--checkbox 复选框的禁用方法
  8. python自动化客户端_如何使用Python自动化登录客户端,pywinauto确实很强大
  9. vue+echarts绘制3D地图
  10. 102页PPT!阿里巴巴数字智能工厂解决方案