欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

本文由@从流域到海域翻译,发表于腾讯云+社区

map()和reduce()是在集群式设备上用来做大规模数据处理的方法,用户定义一个特定的映射,函数将使用该映射对一系列键值对进行处理,直接产生出一系列键值对。

Map Reduce和流处理

Hadoop的Map / Reduce模型在并行处理大量数据方面非常出色。它提供了一个通用的分区机制(基于数据的关键)来分配不同机器上的聚合式工作负载。基本上, map / reduce的算法设计都是关于如何在处理过程中的不同阶段为记录值选择正确的key。

然而,“时间维度”与数据的其他维度属性相比具有非常不同的特征,特别是在涉及实时数据处理时。它对面向批处理的Map/Reduce模型提出了一系列不同的挑战。

  1. 实时处理需要非常低的响应延迟,这意味着没有太多的数据能够在“时间”维度上进行处理。
  2. 从多个数据源收集到的数据可能没有全部到达汇总点。
  3. 在Map/Reduce的标准模型中,reduce阶段在map阶段完成之前无法启动。而且在下载到reducer之前,所有处理过程的中间数据都保存在磁盘中。所有这些都显著增加了处理的延迟。

尽管Hadoop Map/Reduce是针对批处理的工作负载而设计的,但某些应用程序(如欺诈检测,广告显示,网络监控需要实时响应以处理大量数据),现在已开始考虑各种调整Hadoop的方法以使其适合更实时的处理环境。在本篇文章中,我尝试了一些基于Map/Reduce模型的执行低延迟并行处理的技术。

常用流处理模型

在这个模型中,数据是在各种各样的OLTP系统中生成的,这些系统更新了事务数据存储,并异步发送其他数据用于分析处理。分析处理过程将输出写入到决策模型,该决策模型会将信息反馈给OLTP系统来进行实时决策。

注意与OLTP系统分离的分析处理的“异步性质”,在该方式下OLTP系统不会放慢速度等待分析处理完成。无论如何,我们仍然需要尽快进行分析处理,否则决策模型将不能反映当前世界的真实场景,它将不会很有用处。什么程度的延迟可容忍的是应用程序指定的。

在Map/Reduce中进行微批处理

一种方法是根据时间窗(例如每小时)将数据分成小批量,并将每批中收集的数据提交给Map/Reduce作业。这需要分段机制,以便OLTP应用程序可以继续独立于分析处理。而作业调度程序用于规范生产者和消费者,基于此它们每个生产者或消费者都可以独立进行。(生产者和消费者是在操作系统理论中对产生数据和处理数据的程序的称呼,译者注)

连续性Map/Reduce

这里让我们想象一下有关Map/Reduce执行模型的一些可能的修改,以使其适应实时流处理。我并不担心Hadoop在线原型(HOP)所采用的方法的向后兼容性 。

长时间运行

第一种修改方法是使mapper和reducer长时间运行。因此,我们不能等待map阶段结束之后才开始reduce阶段,因为map阶段永远不会结束。这意味着mapper在完成处理后会将数据推送到reducer,并让reducer对数据进行排序。这种方法的缺点是它没有机会去运行地图侧的combine()函数以降低带宽使用率。它还将更多的工作量转移到正需要进行分类的reducer。

注意在延迟和优化之间需要有一个折衷。优化需要更多的数据在源头(即Mapper)就进行累积,如此即可以执行本地合并(即:结合在一起)。不幸的是,低延迟需要尽快发送数据,因此没有太多时间使大量累积操作可以完成。

HOP提出了一种自适应流控制机制,在该方式下数据会被尽快推送到Reducer,直到Reducer被重载并退回(使用某种流量控制协议)。然后mapper将缓冲处理后的消息并在发送给reducer之前执行combine()函数。这种方法将会自动地来回移动Reducer和Mapper之间的聚合工作负载。

时间窗口:切片和范围

这是一个“时间片(time slice)”概念和一个“时间范围(time range)”的概念。“切片(Slice)”定义了执行reduce处理之前所累计结果的时间窗口。这也是mapper在发送到reducer之前应积累的最小数据量。

“范围(Range)”定义了结果所汇总的时间窗口。它可以是一个具有明确起点定义的界标窗口或者是跳跃窗口的(考虑移动的界标场景)。它也可以是一个滑动窗口,其中从当前时间开始聚合的固定大小的窗口。

在从每个mapper接收到特定时间片后,reducer可以启动聚合处理并将结果与之前的聚合结果进行合并。切片(大小)可以根据mapper发送的数据量来进行动态调整。

增量处理

请注意,reducer需要在收到所有mapper中相同时间片的所有记录后计算聚合片值。之后,它会调用用户定义的merge()函数将切片值与范围值合并。如果范围需要刷新(例如达到跳转窗口边界),将调用init()函数来获取刷新的范围值。如果范围值需要更新(当某个切片值超出滑动范围时),则会调用unmerge()函数。

以下是我们如何在每小时更新(即:一小时大小切片)的情况下,在24小时滑动窗口内跟踪平均命中率(即:每小时总命中数)的示例。

# Call at each hit record
map(k1, hitRecord) {site = hitRecord.site# lookup the slice of the particular keyslice = lookupSlice(site)if (slice.time - now > 60.minutes) {# Notify reducer whole slice of site is sentadvance(site, slice)slice = lookupSlice(site)}emitIntermediate(site, slice, 1)
}combine(site, slice, countList) {hitCount = 0for count in countList {hitCount += count}# Send the message to the downstream nodeemitIntermediate(site, slice, hitCount)
}
复制代码
# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {hitCount = 0for count in countList {hitCount += count}sv = SliceValue.newsv.hitCount = hitCountreturn sv
}# Called at each jumping window boundary
init(slice) {rangeValue = RangeValue.newrangeValue.hitCount = 0return rangeValue
}# Called after each reduce()
merge(rangeValue, slice, sliceValue) {rangeValue.hitCount += sliceValue.hitCount
}# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {rangeValue.hitCount -= sliceValue.hitCount
}
复制代码

问答
比较好的MapReduce例子有哪些?
相关阅读
MapReduce极简教程
大数据运算模型 MapReduce 原理
如何为Hadoop选择最佳弹性MapReduce框架

此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/developer/article/1122471?fromSource=waitui

Map Reduce和流处理相关推荐

  1. Hadoop Map/Reduce的工作流

    问题描述 我们的数据分析平台是单一的Map/Reduce过程,由于半年来不断地增加需求,导致了问题已经不是那么地简单,特别是在Reduce阶段,一些大对象会常驻内存.因此越来越顶不住压力了,当前内存问 ...

  2. [ZZ]Map/Reduce hadoop 细节

    转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...

  3. Hadoop简介(1):什么是Map/Reduce

    看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop整体有所了解了. Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Red ...

  4. Hadoop Map/Reduce教程

    Hadoop Map/Reduce教程 目的     先决条件     概述     输入与输出     例子:WordCount v1.0         源代码         用法        ...

  5. 用通俗易懂的大白话讲解Map/Reduce原理

    Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及分布式数据库Hbase,同时Hadoop的相关项目也很丰 ...

  6. python内置函数map reduce filter详解,面试必备知识

    面试时候经常会考到 map reduce filter 这三个内置函数的使用 map() 函数 map() 会根据提供的函数对指定序列做映射. 第一个参数 function 以参数序列中的每一个元素调 ...

  7. python3函数中lambda/filter/map/reduce的用法

    lambda/filter/map/reduce这几个函数面试中很肯定会用到,本篇主要介绍这几个函数的用法. 1.lambda 匿名函数,用法如下: # lambada 参数,参数,参数 : 返回的表 ...

  8. Hadoop学习:Map/Reduce初探与小Demo实现

    一.    概念知识介绍 Hadoop MapReduce是一个用于处理海量数据的分布式计算框架.这个框架攻克了诸如数据分布式存储.作业调度.容错.机器间通信等复杂问题,能够使没有并行 处理或者分布式 ...

  9. MAP/REDUCE:Google和Nutch实现异同及其他

    /*版权声明:可以任意转载,转载时请务必标明文章原始出处和作者信息 .*/ 张俊林                       timestamp:2006年11月26日 设计要素 nutch包含以下 ...

最新文章

  1. 从网上下载的jar包导入到本地maven库
  2. Observer观察者设计模式
  3. 就业模拟试题(.NET部分)
  4. 破旧立新 “云”称霸
  5. NPOI导出Excel2007-xlsx
  6. linux 下安装安装rzsz命令
  7. mdl文件是c语言,MDL文件扩展名 - 什么是.mdl以及如何打开? - ReviverSoft
  8. 软件测试的工作流程是什么
  9. Java代理的几种方式
  10. 南阳oj 28 大数阶乘
  11. js 删除QQ空间的说说
  12. [转载]三、二、一 …… Geronimo!,第 2 部分: 构建 Geronimo
  13. Presto架构和使用总结
  14. 稳压器功能一览[转]
  15. Hive的HQL的执行过程(怎么转换成MR、Spark等任务)
  16. 非常好理解的python re正则表达式入手
  17. Pixel-level Extrinsic Self Calibration of High Resolution LiDAR and Camera in Targetless Environment
  18. 微信小程序 - 跨域问题
  19. Android应用该用H5开发还是原生开发?
  20. 第八篇,字符数组和字符指针详细讲解。

热门文章

  1. NPER用计算机怎么算,计算机财务管理第三章详解.doc
  2. cad设计院常用字体_趣谈 | 那些年我们看过的电气图纸(附CAD/EPLAN区别)
  3. Python学习笔记(4):Python如何设置类似C语言静态函数
  4. linux获取文件的md5,linux shell 获取文件md5的命令linux操作系统 -电脑资料
  5. linux操作系统网络,网络安装linux操作系统
  6. 可观测性PHP秩判据,线性系统的可控性与可观测性
  7. php 图片系统,Linger
  8. java防止上传恶意文件_从补丁分析到在野利用:揭秘CVE20201464 Windows文件签名验证绕过漏洞疑云...
  9. ajax mysql点赞_php+mysql结合Ajax实现点赞功能完整实例
  10. html高度为零,html中父div高度为0的原因