- 浏览: 298914 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (165)
- hadoop (47)
- linux (11)
- nutch (7)
- hbase (7)
- solr (4)
- zookeeper (4)
- J2EE (1)
- jquery (3)
- java (17)
- mysql (14)
- perl (2)
- compass (4)
- suse (2)
- memcache (1)
- as (1)
- roller (1)
- web (7)
- MongoDB (8)
- struts2 (3)
- lucene (2)
- 算法 (4)
- 中文分词 (3)
- hive (17)
- noIT (1)
- 中间件 (2)
- maven (2)
- sd (0)
- php (2)
- asdf (0)
- kerberos 安装 (1)
- git (1)
- osgi (1)
- impala (1)
- book (1)
- python 安装 科学计算包 (1)
最新评论
-
dandongsoft:
你写的不好用啊
solr 同义词搜索 -
黎明lm:
meifangzi 写道楼主真厉害 都分析源码了 用了很久. ...
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
meifangzi:
楼主真厉害 都分析源码了
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
zhdkn:
顶一个,最近也在学习设计模式,发现一个问题,如果老是看别人的博 ...
Java观察者模式(Observer)详解及应用 -
lvwenwen:
木南飘香 写道
高并发网站的架构
taskTracker 生成map reduce 任务详解
1. 启动 TaskTracker ,执行main方法 new TaskTracker(conf) 启动taskTracker
2. taskTrack 构造方法初始化变量
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大map数 默认是2
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大reduce数 默认是2
mapred.disk.healthChecker.interval 磁盘健康度检查时间间隔 mills 默认是60*1000
构造rpc 链接jobTACKER
tasktracker.http.threads taskTracker 工作线程数,默认是40 ,copy数据的线程数
initialize(); 构造方法,该方法是一个独立于taskTracker的方法,可循环调用,在taskTracker 处于关闭状态时 仍可用
该方法主要用户构造一些 运行时目录 和心跳RPC 信息初始化分布式缓存开发map任务监听线程 初始化new TaskLauncher() 启动map 和reduce 任务抓取线程
下面详细的讲一下TaskLauncher类,该类为taskTracker 类的内部类,在启动taskTracker 的时候 通过独立的initialize()方法启动.
该类是一个线程类.通过addToTaskQueue() 方法将新的任务添加到 tasksToLaunch list (List<TaskInProgress> tasksToLaunch)中,这个list 很重要,jobTracker将需要job 通过assginTaks 将需要执行的task 通过心跳信息 传给taskTracker,taskTracker 的run()方法调用offerService()解析心跳信息,将解析得来的task执行信息 添加到这个list中, 然后启动run方法 时刻去查看 tasksToLaunch list中是不是有新的 任务放进来.r如果有则去执行,如果没有则调用tasksToLaunch.wait(); 等待.调用startNewTask 方法调用launchTaskForJob() 通过调用 launchTask 去执行map 和reduce任务, launchTask 要判断任务的状态. UNASSIGNED FAILED_UNCLEAN KILLED_UNCLEAN RUNNING
TaskTracker 主要是通过监听jobTracker 通过心跳信息传过来的task任务放在 task的队中 去执行task.
在这个过程中 还会有一些 例如 numFreeSlots 的判断 ,tip 完全是 同步的,等等
jobTracker 通过调用JobQueueTaskScheduler的assginTasks()方法 分配task,两种方式 生成tak
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
obtainNewNodeOrRackLocalMapTask 即为hadoop 机架感知功能,调度的时候根据location 因素去分配taskTracker
obtainNewNodeOrRackLocalMapTask 非机架感知
1. 启动 TaskTracker ,执行main方法 new TaskTracker(conf) 启动taskTracker
2. taskTrack 构造方法初始化变量
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大map数 默认是2
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大reduce数 默认是2
mapred.disk.healthChecker.interval 磁盘健康度检查时间间隔 mills 默认是60*1000
构造rpc 链接jobTACKER
tasktracker.http.threads taskTracker 工作线程数,默认是40 ,copy数据的线程数
initialize(); 构造方法,该方法是一个独立于taskTracker的方法,可循环调用,在taskTracker 处于关闭状态时 仍可用
该方法主要用户构造一些 运行时目录 和心跳RPC 信息初始化分布式缓存开发map任务监听线程 初始化new TaskLauncher() 启动map 和reduce 任务抓取线程
synchronized void initialize() throws IOException, InterruptedException { this.fConf = new JobConf(originalConf); // 绑定地址.taskTracker 的地址 绑定到rpc中 为传送心跳信息做准备 String address = NetUtils.getServerAddress(fConf, "mapred.task.tracker.report.bindAddress", "mapred.task.tracker.report.port", "mapred.task.tracker.report.address"); InetSocketAddress socAddr = NetUtils.createSocketAddr(address); String bindAddress = socAddr.getHostName(); int tmpPort = socAddr.getPort(); //初始化jvm 管理类 this.jvmManager = new JvmManager(this); // RPC 初始化 int max = maxMapSlots > maxReduceSlots ? maxMapSlots : maxReduceSlots; //set the num handlers to max*2 since canCommit may wait for the duration //of a heartbeat RPC //此处taskTracker 汇报的 处理任务的slot 在实际的基础上*2,因为在心跳汇报的阶段传输这段时间会空出来一部分slot.在新的heartbeat 过来的时候 有2倍的slot处理能力 this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager); this.taskReportServer.start(); // 初始化分布式缓存,在写mr代码的时候 讲一个文件写入DistributedCache 的时候, DistributedCache 在这个位置进行初始化 this.distributedCacheManager = new TrackerDistributedCacheManager( this.fConf, taskController); // start the thread that will fetch map task completion events //在该位置启动 map和reduce任务的处理线程 this.mapEventsFetcher = new MapEventsFetcherThread(); mapEventsFetcher.setDaemon(true); mapEventsFetcher.setName( "Map-events fetcher for all reduce tasks " + "on " + taskTrackerName); mapEventsFetcher.start(); //这里 初始化了两个类TaskLauncher reduce 和map 这两个类是具体的 生成map和reduce 的任务类. mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots); reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); mapLauncher.start(); reduceLauncher.start();
下面详细的讲一下TaskLauncher类,该类为taskTracker 类的内部类,在启动taskTracker 的时候 通过独立的initialize()方法启动.
该类是一个线程类.通过addToTaskQueue() 方法将新的任务添加到 tasksToLaunch list (List<TaskInProgress> tasksToLaunch)中,这个list 很重要,jobTracker将需要job 通过assginTaks 将需要执行的task 通过心跳信息 传给taskTracker,taskTracker 的run()方法调用offerService()解析心跳信息,将解析得来的task执行信息 添加到这个list中, 然后启动run方法 时刻去查看 tasksToLaunch list中是不是有新的 任务放进来.r如果有则去执行,如果没有则调用tasksToLaunch.wait(); 等待.调用startNewTask 方法调用launchTaskForJob() 通过调用 launchTask 去执行map 和reduce任务, launchTask 要判断任务的状态. UNASSIGNED FAILED_UNCLEAN KILLED_UNCLEAN RUNNING
TaskTracker 主要是通过监听jobTracker 通过心跳信息传过来的task任务放在 task的队中 去执行task.
在这个过程中 还会有一些 例如 numFreeSlots 的判断 ,tip 完全是 同步的,等等
jobTracker 通过调用JobQueueTaskScheduler的assginTasks()方法 分配task,两种方式 生成tak
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
obtainNewNodeOrRackLocalMapTask 即为hadoop 机架感知功能,调度的时候根据location 因素去分配taskTracker
obtainNewNodeOrRackLocalMapTask 非机架感知
发表评论
-
博客地址变更
2013-08-16 10:29 1157all the guys of visiting the bl ... -
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
2013-04-09 17:33 0taskTracker 生成map reduce ... -
hadoop 源码分析(五)hadoop 任务调度TaskScheduler
2013-04-01 11:07 3901hadoop mapreduce 之所有能够实现job的运行 ... -
hadoop 源码分析(四)JobTracker 添加job 到schduler 队列中
2013-03-29 18:37 2841启动 JobTracker 1. 进入main方法: ... -
hadoop 源码分析(三) hadoop RPC 机制
2013-03-28 15:13 2373Hadoop 通信机制采用自己编写的RPC. 相比于 ... -
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker
2013-03-27 12:57 36891.JobClient 客户端类 通过 ... -
hadoop 源码分析(一) jobClient 提交到JobTracker
2013-03-26 13:41 3571Hadoop 用了2年多了.从最初一起创业的 ... -
RHadoop 安装教程
2013-02-01 17:18 1584RHadoop 环境安装 硬件: centos6 ... -
pig
2012-11-16 19:28 1173转自:http://www.hadoopor.c ... -
hadoop与hive的映射
2012-11-15 10:21 2339hadoop与hive的映射 ... -
hadoop distcp
2012-07-31 10:00 2783hadoop distcp 使用:distcp ... -
MapReduce中Mapper类和Reducer类4函数解析
2012-07-20 18:05 2082MapReduce中Mapper类和Reducer类4函数解析 ... -
hadoop metrics 各参数解释
2012-07-17 18:59 1476hadoop metrics 各参数解释 研究使用hadoo ... -
Hbase几种数据入库(load)方式比较
2012-07-17 14:52 13481. 预先生成HFile入库 这个地址有详细的说明http:/ ... -
Hadoop客户端环境配置
2012-05-11 14:59 1733Hadoop客户端环境配置 1. 安装客户端(通过端用户可以 ... -
hadoop 通过distcp进行并行复制
2012-05-02 15:25 2403通过distcp进行并行复制 前面的HDFS访问模型都集中于 ... -
linux crontab 执行hadoop脚本 关于hadoop环境变量引入
2012-04-10 12:11 0crontab问题 crontab的特点:PATH不全和无终 ... -
hadoop fs 命令封装
2012-04-09 09:39 0hadoop fs 命令封装 #!/usr/bin/env ... -
map-reduce编程核心问题
2012-02-22 13:38 12351-How do we break up a large p ... -
Hadoop Archives
2012-02-17 14:25 0Hadoop Archives 什么是Hadoop arch ...
相关推荐
Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce 论文。 == HDFS == HDFS (Hadoop Distributed File System),Hadoop 分布式文件系统。...TaskTracker,启动和管理Map和Reduce子任务的节点。
框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 通常,hadoop Map/Reduce框架和分布式...
map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 ...
map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和...
org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app.webapp.dao org.apache.hadoop.mapreduce.v2.hs....
5.2.1 Reduce侧的联结 5.2.2 基于DistributedCache的复制联结 5.2.3 半联结:map侧过滤后在reduce侧联结 5.3 创建一个Bloom filter 5.3.1 Bloom filter做了什么 5.3.2 实现一个Bloom filter 5.3.3 Hadoop 0.20...
1.Hadoop,Apache开源的分布式框架。2.HDFS,hadoop的分布式文件系统3....6.TaskTracker,hadoop调度程序,负责Map,Reduce 任务的具体启动和执行。7.Fuse,多文件系统内核程序,可将不同的文件系统mount成linux可读写模式
Hadoop集群安装的详细说明文档, 實作七: Hadoop 叢集安裝 前言 您手邊有兩台電腦,假設剛剛操作的電腦為"主機一" ,另一台則為"主機二" 。則稍後的環境如下 • 管理Data的身份 管理Job的身份 "主機一" namenode ...
865.1.3 预处理和后处理阶段的链接 865.2 联结不同来源的数据 895.2.1 Reduce侧的联结 905.2.2 基于DistributedCache的复制联结 985.2.3 半联结:map侧过滤后在reduce侧联结 1015.3 创建一个Bloom filter...
map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...
mapred.tasktracker.reduce.tasks.maximum dfs.data.dir 8.HDFS监控项含义 Configured Capacity DFS Used DFS Used% DFS Remaining% Live Nodes Dead Nodes 9.MapReduce监控项含义 Maps Reduces Map Task Capacity ...
第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单...
Hadoop的2.0版本的yarn的框架介绍啊 Hadoop yarnYARN 本身框架的优势是扩展性与支持多计算模型。对于扩展性目前主要体现在计算节点规模上,以前 JobTracker-TaskTracker 模型下最多大约在 5000 台机器左右,对于 ...
以Hadoop带的wordcount为例子(下面是启动行):用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控...
4.7.2 .........................................................................20 Map-reduce的运行状态界面 4.7.3 ..........................................................................................
JobClient 类将应⽤已经配置参数打包成 jar ⽂件存储到 hdfs,并把路径提交到 Jobtracker,然后由 JobTracker 创建每⼀个 Task(即 MapTask 和 ReduceTask)并将它们分发到各个 TaskTracker 服务中去执⾏ 2、...
Hadoop进阶地址:1 第一章 概述hadoop的体系架构块:DataNode:NameNode:Secondary NameNode:Hadoop1.X架构图Hadoop 2.Xhadoop2.0以后版本移除了jobtracker tasktracker,改由Yarn平台的resourcemanager负责统一...