`

hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程

阅读更多
taskTracker 生成map reduce 任务详解

1. 启动 TaskTracker ,执行main方法 new TaskTracker(conf) 启动taskTracker
2. taskTrack 构造方法初始化变量
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大map数 默认是2
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大reduce数 默认是2
mapred.disk.healthChecker.interval   磁盘健康度检查时间间隔 mills 默认是60*1000
构造rpc 链接jobTACKER
tasktracker.http.threads taskTracker 工作线程数,默认是40 ,copy数据的线程数
initialize(); 构造方法,该方法是一个独立于taskTracker的方法,可循环调用,在taskTracker 处于关闭状态时 仍可用
该方法主要用户构造一些 运行时目录 和心跳RPC 信息初始化分布式缓存开发map任务监听线程 初始化new TaskLauncher() 启动map 和reduce 任务抓取线程

synchronized void initialize() throws IOException, InterruptedException {
    this.fConf = new JobConf(originalConf);
    
   


    // 绑定地址.taskTracker 的地址 绑定到rpc中 为传送心跳信息做准备
    String address = 
      NetUtils.getServerAddress(fConf,
                                "mapred.task.tracker.report.bindAddress", 
                                "mapred.task.tracker.report.port", 
                                "mapred.task.tracker.report.address");
    InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
    String bindAddress = socAddr.getHostName();
    int tmpPort = socAddr.getPort();
    
//初始化jvm 管理类
    this.jvmManager = new JvmManager(this);

   
// RPC 初始化
    int max = maxMapSlots > maxReduceSlots ? 
                       maxMapSlots : maxReduceSlots;
    //set the num handlers to max*2 since canCommit may wait for the duration
//of a heartbeat RPC
//此处taskTracker 汇报的 处理任务的slot 在实际的基础上*2,因为在心跳汇报的阶段传输这段时间会空出来一部分slot.在新的heartbeat 过来的时候 有2倍的slot处理能力
    this.taskReportServer = RPC.getServer(this, bindAddress,
        tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
    this.taskReportServer.start();

   

    // 初始化分布式缓存,在写mr代码的时候 讲一个文件写入DistributedCache 的时候, DistributedCache 在这个位置进行初始化
    this.distributedCacheManager = new TrackerDistributedCacheManager(
        this.fConf, taskController);
  

// start the thread that will fetch map task completion events
//在该位置启动 map和reduce任务的处理线程
    this.mapEventsFetcher = new MapEventsFetcherThread();
    mapEventsFetcher.setDaemon(true);
    mapEventsFetcher.setName(
                             "Map-events fetcher for all reduce tasks " + "on " + 
                             taskTrackerName);
    mapEventsFetcher.start();

  //这里 初始化了两个类TaskLauncher reduce 和map 这两个类是具体的 生成map和reduce 的任务类.
    mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
    reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
    mapLauncher.start();
    reduceLauncher.start();

下面详细的讲一下TaskLauncher类,该类为taskTracker 类的内部类,在启动taskTracker 的时候 通过独立的initialize()方法启动.
该类是一个线程类.通过addToTaskQueue() 方法将新的任务添加到 tasksToLaunch list (List<TaskInProgress> tasksToLaunch)中,这个list 很重要,jobTracker将需要job 通过assginTaks 将需要执行的task 通过心跳信息 传给taskTracker,taskTracker 的run()方法调用offerService()解析心跳信息,将解析得来的task执行信息 添加到这个list中, 然后启动run方法 时刻去查看 tasksToLaunch list中是不是有新的 任务放进来.r如果有则去执行,如果没有则调用tasksToLaunch.wait(); 等待.调用startNewTask 方法调用launchTaskForJob() 通过调用 launchTask 去执行map 和reduce任务, launchTask 要判断任务的状态. UNASSIGNED FAILED_UNCLEAN KILLED_UNCLEAN RUNNING

TaskTracker 主要是通过监听jobTracker 通过心跳信息传过来的task任务放在 task的队中 去执行task.

在这个过程中 还会有一些 例如 numFreeSlots 的判断 ,tip 完全是 同步的,等等
jobTracker 通过调用JobQueueTaskScheduler的assginTasks()方法 分配task,两种方式 生成tak
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
obtainNewNodeOrRackLocalMapTask 即为hadoop 机架感知功能,调度的时候根据location 因素去分配taskTracker
obtainNewNodeOrRackLocalMapTask 非机架感知


  • 大小: 11.3 KB
1
0
分享到:
评论

相关推荐

    大数据与云计算技术 Hadoop概论和快速入门 共40页.ppt

    Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce 论文。 == HDFS == HDFS (Hadoop Distributed File System),Hadoop 分布式文件系统。...TaskTracker,启动和管理Map和Reduce子任务的节点。

    hadoop 1.2.1 api 最新chm 伪中文版

    框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 通常,hadoop Map/Reduce框架和分布式...

    Hadoop权威指南 第二版(中文版)

     map阶段和reduce阶段  横向扩展  合并函数  运行一个分布式的MapReduce作业  Hadoop的Streaming  Ruby版本  Python版本  Hadoop Pipes  编译运行 第3章 Hadoop分布式文件系统  HDFS的设计  HDFS的概念 ...

    Hadoop权威指南(中文版)2015上传.rar

    map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和...

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app.webapp.dao org.apache.hadoop.mapreduce.v2.hs....

    Hadoop实战中文版

    5.2.1 Reduce侧的联结 5.2.2 基于DistributedCache的复制联结 5.2.3 半联结:map侧过滤后在reduce侧联结 5.3 创建一个Bloom filter 5.3.1 Bloom filter做了什么 5.3.2 实现一个Bloom filter 5.3.3 Hadoop 0.20...

    EasyHadoop实战手册

    1.Hadoop,Apache开源的分布式框架。2.HDFS,hadoop的分布式文件系统3....6.TaskTracker,hadoop调度程序,负责Map,Reduce 任务的具体启动和执行。7.Fuse,多文件系统内核程序,可将不同的文件系统mount成linux可读写模式

    Hadoop集群安装

    Hadoop集群安装的详细说明文档, 實作七: Hadoop 叢集安裝 前言 您手邊有兩台電腦,假設剛剛操作的電腦為"主機一" ,另一台則為"主機二" 。則稍後的環境如下 • 管理Data的身份 管理Job的身份 "主機一" namenode ...

    Hadoop实战中文版.PDF

    865.1.3 预处理和后处理阶段的链接 865.2 联结不同来源的数据 895.2.1 Reduce侧的联结 905.2.2 基于DistributedCache的复制联结 985.2.3 半联结:map侧过滤后在reduce侧联结 1015.3 创建一个Bloom filter...

    Hadoop实战(陆嘉恒)译

    map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...

    java大数据作业_1云计算、大数据、hadoop

    mapred.tasktracker.reduce.tasks.maximum dfs.data.dir 8.HDFS监控项含义 Configured Capacity DFS Used DFS Used% DFS Remaining% Live Nodes Dead Nodes 9.MapReduce监控项含义 Maps Reduces Map Task Capacity ...

    Hadoop实战

    第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单...

    Yarn框架代码详细分析V0.5

    Hadoop的2.0版本的yarn的框架介绍啊 Hadoop yarnYARN 本身框架的优势是扩展性与支持多计算模型。对于扩展性目前主要体现在计算节点规模上,以前 JobTracker-TaskTracker 模型下最多大约在 5000 台机器左右,对于 ...

    Hadoop二次开发必懂

    以Hadoop带的wordcount为例子(下面是启动行):用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控...

    Hadoop入门实战手册

    4.7.2 .........................................................................20 Map-reduce的运行状态界面 4.7.3 ..........................................................................................

    大数据平台常见面试题.pdf

    JobClient 类将应⽤已经配置参数打包成 jar ⽂件存储到 hdfs,并把路径提交到 Jobtracker,然后由 JobTracker 创建每⼀个 Task(即 MapTask 和 ReduceTask)并将它们分发到各个 TaskTracker 服务中去执⾏ 2、...

    BigDataLearning

    Hadoop进阶地址:1 第一章 概述hadoop的体系架构块:DataNode:NameNode:Secondary NameNode:Hadoop1.X架构图Hadoop 2.Xhadoop2.0以后版本移除了jobtracker tasktracker,改由Yarn平台的resourcemanager负责统一...

Global site tag (gtag.js) - Google Analytics