我们知道TaskTracker在默认情况下,每个3秒就行JobTracker发送一个心跳包,也就是在这个心跳包中包含对任务的请求。JobTracker返回给TaskTracker的心跳包中包含有各种action(任务),如果有满足在此TaskTracker上执行的任务的话,该任务也就包含在心跳包的响应中。在TaskTracker端有线程专门等待map或reduce任务,并从队列中取出执行。

1. TaskTracker发送心跳包

  TaskTracker是作为一个单独的JVM运行的,它启动以后一直处于offerService()函数中,每隔3秒就执行一次transmitHeartBeat函数,如下所示:

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

  该函数具体代码为:

  HeartbeatResponse transmitHeartBeat(long now) throws IOException {  ......    if (status == null) {synchronized (this) {status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses(sendCounters), failures, maxMapSlots,maxReduceSlots); }} //// 检查是否可以接受新的任务//
    boolean askForNewTask;long localMinSpaceStart;synchronized (this) {askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart;}......    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask, heartbeatResponseId);
......return heartbeatResponse;}

  我们从中可以看出,TaskTracker首先创建一个TaskTrackerStatus对象,其中包含有TaskTracker的各种信息,比如,map slot的数目,reducer slot槽的数目,TaskTracker所在的主机名等信息。然后,对TaskTracker的空闲的slot以及磁盘空间进行检查,如果满足相应的条件时,最终就会通过JobClient(为JobTracker的代理)将心跳信息发送给JobTracker,并得到JobTracker的响应HeartbeatResponse。如下所示,JobClient是InterTrackerProtocol的一个实例,而JobTracker实现了InterTrackerProtocol这个接口。

    this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {public Object run() throws IOException {return RPC.waitForProxy(InterTrackerProtocol.class,InterTrackerProtocol.versionID,jobTrackAddr, fConf);}});

    那么,TaskTracker怎样通过JobTracker的代理与JobTracker进行通信呢?它是通过RPC调用JobTracker的heartbeat(......)方法而实现的。

2. TaskTracker端获取任务

  TaskTracker接收到任务后,会将它们放入到相应的LinkedList中,LinkedList实现了List和Queue接口,它是基于链表实现的FIFO的队列。

heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ for(TaskTrackerAction action: actions) {if (action instanceof LaunchTaskAction) {addToTaskQueue((LaunchTaskAction)action);......}}  ......

  private void addToTaskQueue(LaunchTaskAction action) {
    if (action.getTask().isMapTask()) {
      mapLauncher.addToTaskQueue(action);
    } else {
      reduceLauncher.addToTaskQueue(action);
    }
    }

 

  TaskTracker启动的时候,创建了两个线程:mapLauncher和reduceLauncher,它们分别处理map任务和reduce任务,map任务有mapLauncher负责将其放入到LinkedList中,reduce任务有reducerLauncher负责将其放入到它维护的LinkedList中。

  public void addToTaskQueue(LaunchTaskAction action) {synchronized (tasksToLaunch) {TaskInProgress tip = registerTask(action, this);tasksToLaunch.add(tip);tasksToLaunch.notifyAll();}}

  mapLauncher或者是reducerLauncher根据接收到的action,创建对应的TaskTracker.TaskInProgress对象,并放入到队列中,唤醒等待的线程进行处理。 如下所示,该线程负责从taskToLaunch中获取task,当有空间的slot时,执行这个task。

  synchronized (tasksToLaunch) {while (tasksToLaunch.isEmpty()) {tasksToLaunch.wait();}//get the TIPtip = tasksToLaunch.remove(0);task = tip.getTask();LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots");}
.....//得到空闲的slot后,启动这个taskstartNewTask(tip);

  这样,TaskTracker就得到了待处理的任务,具体如何执行请参考下一篇博客。

转载于:https://www.cnblogs.com/yueliming/p/3278196.html

TaskTracker获取并执行map或reduce任务的过程(一)相关推荐

  1. TaskTracker执行map或reduce任务的过程(二)

    上次说到,当MapLauncher或ReduceLancher(用于执行任务的线程,它们扩展自TaskLauncher),从它们所维护的LinkedList也即队列中获取到TaskInProgress ...

  2. Yarn中map、reduce任务运行容器YarnChild分析

    在对Yarn上MRAppMaster组件详解以及任务资源申请.启动的源码分析的分析中可以知道,真正用于执行MapTask任务.ReduceTask任务的进程容器为YarnChild进程,接下来对该Ya ...

  3. MapReduce剖析笔记之五:Map与Reduce任务分配过程

    转载:https://www.cnblogs.com/esingchan/p/3940565.html 在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果 ...

  4. c++ map用法_Python专题——五分钟带你了解map、reduce和filter

    点击上方蓝字,和我一起学技术.今天是Python专题第6篇文章,给大家介绍的是Python当中三个非常神奇的方法:map.reduce和filter.不知道大家看到map和reduce的时候有没有什么 ...

  5. java filter函数的用法_5分钟掌握Python | Map、Reduce和Filter如何运用?

    文末有惊喜哦 天给大家介绍的是Python当中三个非常神奇的方法:map.reduce和filter. 一.Map Map除了地图之外,另一个英文本意是映射.在C++和Java一些语言当中,将map进 ...

  6. hadoop学习之:Map、Reduce详解

    Hadoop学习重点主要为HDFS.MapReduce 部分: 接下来重点描述一下MAP与Reduce 的过程. 看了好多资料,如果有错误的地方请大家指出. MAP部分: 下图是官方给予的关于MapR ...

  7. hadoop中map和reduce的数量设置问题

    转载http://my.oschina.net/Chanthon/blog/150500 map和reduce是hadoop的核心功能,hadoop正是通过多个map和reduce的并行运行来实现任务 ...

  8. python中的zip、map、reduce 、lambda、filter函数的使用

    飞机票 lambda函数 lambda只是一个表达式,函数体比def简单很多. lambda的主体是一个表达式,而不是一个代码块.仅仅能在lambda表达式中封装有限的逻辑进去. lambda表达式是 ...

  9. c++ map用法_5分钟掌握Python | Map、Reduce和Filter如何运用?

    - 点击上方"中国统计网"订阅我吧!- 今天给大家介绍的是Python当中三个非常神奇的方法:map.reduce和filter. Map Map除了地图之外,另一个英文本意是映射 ...

最新文章

  1. OpenCV优化:图像的遍历4种方式
  2. python集合加个逗号_8.Python集合与字符串
  3. AssertionError: Path does not exist: py-faster-rcnn/data/VOCdevkit2007/VOC2007/ImageSets/Main
  4. ubuntu 10.04源 更新源列表 选择
  5. java context.write_Channel.write() 和 ChannelHandlerContext.write() 的区别
  6. CSDN光合计划-纯干货福利-推荐几个算法、分布式、数据库全系列学习教程(企业实用技术类)
  7. 互联网晚报 | 3月26日 星期六 |​ 竞拍规则优化,部分城市土地市场有所回暖;​​武汉房贷利率下调...
  8. java的Teacher类_java类的继承 - osc_tauwfamo的个人空间 - OSCHINA - 中文开源技术交流社区...
  9. 前后落差大用什么词语_【刺激】全国最长滑索!“白云飞索”全长1500米,落差200多米!...
  10. python做病毒传播的空间数据_利用4行Python代码监测每一行程序的运行时间和空间消耗...
  11. js图片 area 颜色_JS提取图片的主体颜色
  12. 【论文笔记】Deep Neural Networks for Object Detection
  13. python 修改pdf_使用Python编辑PDF
  14. python docx 表格复制粘贴_python Word 表格转 Excel
  15. 图片特效展示(鼠标移入移除特效)
  16. 基于facades数据集的图像成任务,完成各种GAN的对比实验
  17. 单片机中断实验2 EX0
  18. 使用kd树数据结构,实现k-means聚类加速
  19. Vue全家桶之VueX(六)
  20. 2021年信创产业融资分析报告

热门文章

  1. 【spring-boot】启用数据缓存功能
  2. 1102示波器使用方法_你知道示波器的探头是怎样工作的吗?示波器探头的使用方法...
  3. mysql pt-kill_percona-toolkit之pt-kill 杀掉mysql查询或连接的方法
  4. ubuntu20输入法qiehuan_Ubuntu20.04安装搜狗输入法
  5. 微信有电脑客户端吗_微信电脑版有多难用,你们真的没感觉吗
  6. java 前端页面传过来的值怎么防止篡改_反爬虫,到底是怎么回事儿?
  7. centos7磁盘挂载
  8. awk使用shell中的变量
  9. 适用于 Python 的 10 大最佳 IDE,你 Pick 哪一款?
  10. Excel,Python,SQL?数据分析师的技能树要怎么点?