实时计算在点评的使用场景

类别一:Dashboard、实时DAU、新激活用户数、实时交易额等

♦  Dashboard类:北斗(报表平台)、微信(公众号)和云图(流量分析)等

♦  实时DAU:包括主APP(Android/iPhone/iPad)、团APP、周边快查、PC、M站

♦  新激活用户数:主APP

♦  实时交易额:闪惠/团购交易额

以报表平台为例,下图是一张APP UV的实时曲线图,它以分钟级别粒度展现了 实时的DAU数据和曲线。

从图中可以看见一个尖点,这个尖点就是当天push过后带来的用户,这样可以看到实时的运营效率。

类别二:搜索、推荐、安全等

以搜索为例:用户在点评的每一步有价值的操作(包括:搜索、点击、浏览、购买、收藏等),都将实时、智能地影响搜索结果排序,从而显著提升用户搜索体验、搜索转化率。

某用户 搜索“ 火锅 ”,当他 在搜索结果页 点击了“ 重庆高老九火锅 ”后, 再次刷新搜索结果列表时,该商户的排序就会提升到顶部 。

再结合其他的一些实时反馈的个性化推荐策略,最终使团购的交易额有了明显的增加,转化率提升了2个多点。

实时计算在业界的使用场景

场景1:阿里JStorm

♦  双11实时交易数据

场景2:360Storm

♦  抢票软件验证码自动识别:大家用360浏览器在12306上买票的时候,验证码自动识别是在Storm上计算完成的。

♦  网盘图片缩略图生成:360网盘的缩略图也是实时生成出来的,这样可以节约大量的文件数量和存储空间。

♦  实时入侵检测

♦  搜索热词推荐

场景3:腾讯TDProcess

分布式K/V存储引擎TDEngine和支持数据流计算的TDProcess,TDProcess是基于Storm的计算引擎,提供了通用的计算模型,如Sum、Count、PV/UV计算和TopK统计等。

场景4:京东Samza

整个业务主要应用订单处理,实时分析统计出待定区域中订单各个状态的量:待定位、待派工、待拣货、待发货、待配送、待妥投等。

点评如何构建实时计算平台

点评的实时计算平台是一个端到端的方案,从下面的平台架构图,可以看出整体架构是一个比较长的过程,包括了数据源、数据的传输通道、计算、存储和对外服务等。

实时计算平台首先解决的问题是,数据怎么获取,如何拿到那些数据。现在做到了几乎所有点评线上产生的数据都可以毫秒级拿到,封装对应的数据输入源Spout。

通过Blackhole支持日志类实时获取,包括打点日志、业务Log、Nginx日志等。 整合Puma Client第一时间获取数据库数据变更。整合Swallow获取应用消息。Blackhole是团队开发的类Kafka系统,主要目标是批量从业务方拉取日志时做到数据的完整性和一致性,然后也提供了实时的消费能力。Puma是以MySQL binlog为基础开发的,这样可以实时拿到数据库的update、delete、insert操作。

Swallow是点评的MQ系统。通过整合各种传输通道,并且封装相应的Spout,做业务开发的同学就完全不用关心数据怎样可靠获取,只需要写自己的业务逻辑就可以了。解决了数据和传输问题后,计算过程则在Storm中完成。

如果在Storm计算过程中或计算出结果后,需要与外部存储系统交互,也提供了一个data-service服务 ,通过点评的RPC框架提供接口,用户不用关心实际Redis/HBase这些系统的细节和部署情况, 以及这个数据到底是在Redis还是HBase中的,可以根据SLA来做自动切换;

同时计算的结果也是通过data-service服务,再反馈到线上系统。就拿刚刚搜索结果的例子,搜索业务在用户再次搜索的时候会根据userId请求一次data-service,然后拿到这个用户的最近浏览记录,并重新排序结果,返回给用户。

这样的好处就是实时计算业务和线上其他业务完全解耦,实时计算这边出现问题,不会导致线上业务出现问题。

Storm基础知识简单介绍

Apache Storm( http://storm.apache.org/)是由Twitter开源的分布式实时计算系统。Storm可以非常容易、可靠地处理无限的数据流。对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。Storm可以使用何编程语言进行开发。

Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面运行的是MapReduce的Job,而在Storm上面运行的是Topology。

Storm和Hadoop一个非常关键的区别是Hadoop的MapReduce Job最终会结束,而Storm的Topology会一直运行(除非显式地杀掉)。

Storm基本概念:

Nimbus和Supervisor之间的通讯是依靠ZooKeeper来完成,并且Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的。可以用kill-9来杀死Nimbus和Supervisor进程,然后再重启它们,它们可以继续工作。

在Storm中,Spout是Topology中产生源数据流的组件。通常Spout获取从Kafka、MQ等的数据,然后调用nextTuple函数,发射数据出去供Bolt消费。

图中的Spout就发射出去了两条数据流。而Bolt是在Topology中接受Spout的数据,然后执行处理的组件。Bolt在接收到消息后会调用execute函数,用户可以在其中执行自己想要的操作。

为什么用Storm呢,因为Storm有它的优点:

易用性

只要遵守Topology,Spout,Bolt的编程规范即可开发出一个扩展性极好的应用,像底层RPC,Worker之间冗余,数据分流之类的操作,开发者完全不用考虑。

扩展性

当某一级处理单元速度不够时,直接配置一下并发数,即可线性扩展性能。

健壮性

当Worker失效或机器出现故障时, 自动分配新的Worker替换失效Worker。

准确性

采用Acker机制,保证数据不丢失。采用事务机制,保证数据准确性。刚刚介绍了一些Storm的基础概念和特性,再用一张比较完整的图来回顾一下整个Storm的体系架构:

Storm提交一个作业的时候,是通过Thrift的Client执行相应的命令来完成。Nimbus针对该Topology建立本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不同的Worker上,调度的结果写入Zookeeper中。

Zookeeper上建立assignments节点,存储Task和Supervisor中Worker的对应关系。在Zookeeper上创建workerbeats节点来监控Worker的心跳。Supervisor去Zookeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行。

每个Worker上运行多个Task,Task由Executor来具体执行。Worker根据Topology信息初始化建立Task之间的连接,相同Worker内的Task通过DisrupterQueue来通信,不同Worker间默认采用Netty来通信,然后整个Topology就运行起来了。

如何保证业务运行可靠性

首先Storm自身有很多容错机制,也加了很多监控信息,方便业务同学监控自己的业务状态。

在Storm上,遇到的一个很基本的问题就是,各个业务是运行的Worker会跑在同一台物理机上。曾经有位同学就在自己的Worker中起了200多个线程来处理json,结果就是这台机器的CPU都被他的Worker吃光了,其他的业务也跟着倒霉。

因此也使用CGroup做了每个Worker的资源隔离,主要限制了CPU和Memory的使用。相对而言JStorm在很多方面要完善一些,JStorm自己就带资源隔离。对应监控来说,基本的主机维度的监控在ganglia上可以看见,比如现在集群的运行状况。下图是现在此时的集群的网络和负载:

这些信息并不能保证业务就OK,因此将Storm上的很多监控信息和点评的开源监控系统Cat集成在了一起,从Cat上可以看见更多的业务运行状态信息。

比如在Cat中我可以看见整个集群的TPS,现在已经从30多万降下来了。 然后我可以设置若干的报警规则, 如:连续N分钟降低了50%可以报警。然后也监控了各个业务Topology的TPS、Spout输入、Storm的可用Slot等的变化。

这个图就是某个业务的TPS信息, 如果TPS同比或者环比出现问题,也可以报警给业务方。

Storm使用经验分享

1.使用组件的并行度代替线程池

Storm自身是一个分布式、多线程的框架,对每个Spout和Bolt,都可以设置其并发度;它也支持通过rebalance命令来动态调整并发度,把负载分摊到多个Worker上。

如果自己在组件内部采用线程池做一些计算密集型的任务,比如JSON解析,有可能使得某些组件的资源消耗特别高,其他组件又很低,导致Worker之间资源消耗不均衡,这种情况在组件并行度比较低的时候更明显。

比如某个Bolt设置了1个并行度,但在Bolt中又启动了线程池,这样导致的一种后果就是,集群中分配了这个Bolt的Worker进程可能会把机器的资源都给消耗光了,影响到其他Topology在这台机器上的任务的运行。如果真有计算密集型的任务,可以把组件的并发度设大,Worker的数量也相应提高,让计算分配到多个节点上。

为了避免某个Topology的某些组件把整个机器的资源都消耗光的情况,除了不在组件内部启动线程池来做计算以外,也可以通过CGroup控制每个Worker的资源使用量。

2.不要用DRPC批量处理大数据

RPC提供了应用程序和StormTopology之间交互的接口,可供其他应用直接调用,使用Storm的并发性来处理数据,然后将结果返回给调用的客户端。这种方式在数据量不大的情况下,通常不会有问题,而当需要处理批量大数据的时候,问题就比较明显了。

(1)处理数据的Topology在超时之前可能无法返回计算的结果。

(2)批量处理数据,可能使得集群的负载短暂偏高,处理完毕后,又降低回来,负载均衡性差。

批量处理大数据不是Storm设计的初衷,Storm考虑的 是时效性和批量之间的均衡,更多地看中前者。需要准实时地处理大数据量,可以考虑Spark Stream等批量框架。

3.不要在Spout中处理耗时的操作

Spout中nextTuple方法会发射数据流,在启用Ack的情况下,fail方法和ack方法会被触发。

需要明确一点,在Storm中Spout是单线程(JStorm的Spout分了3个线程,分别执行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗时,某个消息被成功执行完毕后,Acker会给Spout发送消息,Spout若无法及时消费,可能造成ACK消息超时后被丢弃,然后Spout反而认为这个消息执行失败了,造成逻辑错误。反之若fail方法或者ack方法的操作耗时较多,则会影响Spout发射数据的量,造成Topology吞吐量降低。

4.注意fieldsGrouping的数据均衡性

fieldsGrouping是根据一个或者多个Field对数据进行分组,不同的目标Task收到不同的数据,而同一个Task收到的数据会相同。

假设某个Bolt根据用户ID对数据进行fieldsGrouping,如果某一些用户的数据特别多,而另外一些用户的数据又比较少,那么就可能使得下一级处理Bolt收到的数据不均衡,整个处理的性能就会受制于某些数据量大的节点。可以加入更多的分组条件或者更换分组策略,使得数据具有均衡性。

5.优先使用localOrShuffleGrouping

localOrShuffleGrouping是指如果目标Bolt中的一个或者多个Task和当前产生数据的Task在同一个Worker进程里面,那么就走内部的线程间通信,将Tuple直接发给在当前Worker进程的目的Task。否则,同shuffleGrouping。

localOrShuffleGrouping的数据传输性能优于shuffleGrouping,因为在Worker内部传输,只需要通过Disruptor队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高,而网络开销和序列化开销占主要地位的情况下,可以优先使用localOrShuffleGrouping来代替shuffleGrouping。

6.设置合理的MaxSpoutPending值

在启用Ack的情况下,Spout中有个RotatingMap用来保存Spout已经发送出去,但还没有等到Ack结果的消息。RotatingMap的最大个数是有限制的,为p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通过setMaxSpoutPending方法来设定),num-tasks是Spout的Task数。如果不设置MaxSpoutPending的大小或者设置得太大,可能消耗掉过多的内存导致内存溢出,设置太小则会影响Spout发射Tuple的速度。

7.设置合理的Worker数

Worker数越多,性能越好?先看一张Worker数量和吞吐量对比的曲线(来源于JStorm文档:

https://github.com/alibaba/jstorm/tree/master/docs/ 0.9.4.1jstorm性能测试.docx)。

从图可以看出,在12个Worker的情况下,吞吐量最大,整体性能最优。这是由于一方面,每新增加一个Worker进程,都会将一些原本线程间的内存通信变为进程间的网络通信,这些进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率。

另一方面,每新增加一个Worker进程,都会额外地增加多个线程(Netty发送和接收线程、心跳线程、SystemBolt线程以及其他系统组件对应的线程等),这些线程切换消耗了不少CPU,sys系统CPU消耗占比增加,在CPU总使用率受限的情况下,降低了业务线程的使用效率。

8.平衡吞吐量和时效性

Storm的数据传输默认使用Netty。在数据传输性能方面,有如下的参数可以调整:

storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分别为接收消息线程和发送消息线程的数量。

netty.transfer.batch.size是指每次 Netty Client向 Netty Server发送的数据的大小,如果需要发送的Tuple消息大于netty.transfer.batch.size,则Tuple消息会按照netty.transfer.batch.size进行切分,然后多次发送。

storm.messaging.netty.buffer_size为每次批量发送的Tuple序列化之后的TaskMessage消息的大storm.messaging.netty.flush.check.interval.ms表示当有TaskMessage需要发送的时候, Netty Client检查可以发送数据的频率。

降低storm.messaging.netty.flush.check.interval.ms的值,可以提高时效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升网络传输的吐吞量,使得网络的有效载荷提升(减少TCP包的数量,并且TCP包中的有效数据量增加),通常时效性就会降低一些。因此需要根据自身的业务情况,合理在吞吐量和时效性直接的平衡。

除了这些参数,怎么找到Storm中性能的瓶颈,可以通过如下的一些途径来进行:

在Storm的UI中,对每个Topology都提供了相应的统计信息,其中有3个参数对性能来说参考意义比较明显,包括Execute latency、Process latency和Capacity。

分别看一下这3个参数的含义和作用。

(1)Execute latency:消息的平均处理时间,单位为毫秒。

(2)Process latency:消息从收到到被ack掉所花的时间,单位为毫秒。如果没有启用Acker机制,那么Process latency的值为0。

(3)Capacity:计算公式为Capacity = Bolt或者Executor调用execute方法处理的消息数量 * 消息平均执行时间 /时间区间。这个值越接近1,说明Bolt或者Executor基本一直在调用execute方法,因此并行度不够,需要扩展这个组件的Executor数量。为了在Storm中达到高性能,在设计和开发Topology的时候,需要注意以下原则。

(1)模块和模块之间解耦,模块之间的层次清晰,每个模块可以独立扩展,并且符合流水线的原则。

(2)无状态设计,无锁设计,水平扩展支持。

(3)为了达到高的吞吐量,延迟会加大;为了低延迟,吞吐量可能降低,需要在二者之间平衡。

(4)性能的瓶颈永远在热点,解决热点问题。

(5)优化的前提是测量,而不是主观臆测。收集相关数据,再动手,事半功倍。

关于计算框架的后续问题

目前Hadoop/Hive专注于离线分析业务,每天点评有1.6万个离线分析任务。Storm专注于实时业务,实时每天会处理100亿+条的数据。

在这两个框架目前有很大的gap,一个是天级别,一个是秒级别,然后有大量的业务是准实时的,比如分钟级别。因此会使用Spark来做中间的补充。

Spark Streaming + Spark SQL也能够降低很大的开发难度。相对而言,目前Storm的学习和开发成本还是偏高。要做一个10万+TPS的业务在Storm上稳定运行,需要对Storm了解比较深入才能做到,不然会发现有这样或者那样的问题。

揭秘大众点评的大数据实时计算相关推荐

  1. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  2. 大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图

    大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图 http://edu.51cto.com/roadmap/view/id-29.html http://my.oschina.net/i ...

  3. .NET 大数据实时计算--学习笔记

    摘要 纯 .Net 自研大数据实时计算平台,在中通快递服务数百亿包裹,处理数据万亿计!将分享大数据如何落地以及设计思路,技术重难点. 目录 背景介绍 计算平台架构 项目实战 背景介绍 计算平台架构 分 ...

  4. Flink大数据实时计算系列-案例初体验:HotPages

    Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...

  5. 1. 大数据实时计算介绍

    Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架.它的底层,其实,也是基于我们之前讲解的Spark Core的.基本的计算模型,还是基于内存的大数据实时 ...

  6. Flink大数据实时计算系列-Flink的Operator Chains的优化机制

    Flink大数据实时计算系列-Flink的Operator Chains的优化机制 目录 Flink改变并行度 并行度改为3 并行度改为2 Flink Operator Chains Flink gr ...

  7. Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法、Presto的介绍与使用场景

    Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法.Presto的介绍与使用场景 Presto的安装与使用 目录 Flink写出多个parquet小文件处理方法 Presto ...

  8. Flink大数据实时计算系列-列式存储parquet文件格式介绍、Flink进行rowformat格式文件保存

    Flink大数据实时计算系列-列式存储parquet文件格式介绍 Flink进行rowformat格式文件保存 列式存储parquet文件格式介绍

  9. Flink大数据实时计算系列-Flink的processFunction TimerService的基础用法

    Flink大数据实时计算系列-Flink的processFunction TimerService的基础用法 目录 Flink的processFunction TimerService的基础用法 Fl ...

  10. Flink大数据实时计算系列-Flink的Keyed Windows 对比 Non-Keyed Windows的区别

    Flink大数据实时计算系列-Flink的Keyed Windows 对比 Non-Keyed Windows的区别 目录 Flink的Keyed Windows 对比 Non-Keyed Windo ...

最新文章

  1. 网络推广方案分享网站想要更快的优化到首页的技巧!
  2. C++ Primer 5th笔记(chap 17 标准库特殊设施)正则表达式
  3. 一篇文章弄懂Java多线程基础和Java内存模型
  4. vs2015提示中文
  5. C#实现反射调用动态加载的DLL文件中的方法
  6. Mybatis多表模型
  7. 《重来》值得你多看几遍
  8. Package.json 属性说明
  9. Fragstats 4.2 批处理(geotiff格式)
  10. 本机查看文件服务器所有共享的文件,谁访问,谁打开,
  11. 2023东北大学计算机考研信息汇总
  12. java.lang.IllegalStateException: Failed to convert message:‘‘ to outbound message.
  13. 13_Pinia组件库
  14. 这是我见过最牛逼的Shell脚本!
  15. 银河麒麟最新V10系统下载链接
  16. dwg格式文件怎么打开
  17. Java后端响应出来的图片流在HTML中显示
  18. 意想不到!这个神奇的bug让我加班到深夜
  19. js+css 灵活层叠 绝对/相对 定位 2
  20. Transformer 五年引用超四万,其中六位作者创立五家创业公司

热门文章

  1. 使用nim master为oracle2分区安装os,使用 NIM 资源部署自定义的 AIX 系统
  2. 案例:Java多态实现饮料购买咖啡、可乐、矿泉水2.0
  3. 程序猿给娃取名的正确姿势
  4. 如何选择适合你的兴趣爱好(二十七),外语
  5. 保研复习笔记:概率论与数理统计
  6. 如何将MAC的文件存储至NAS?
  7. android拼图小游戏代码,Android实现拼图小游戏
  8. 2019级第二次月赛暨软件计科联合新生赛题解
  9. 怎么将小程序添加到我的小程序?
  10. OpenLayers使用高德导航接口实现动画animate