`

hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker

阅读更多

1.JobClient 客户端类 通过调用 JobSubmissionProtocol 接口 的 submit 方法提交作业
2. JobSubmissionProtocol  接口为 JobClient 和JobTracker 共同执行的接口,因此它是一个可代理的接口
3. 调用 createRPCProxy() 通过远程RPC 调用实现动态代理 JobTracker 类的 submitJob 方法

 private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
      Configuration conf) throws IOException {
    return (JobSubmissionProtocol)[color=red][size=medium] RPC.getProxy[/size][/color](JobSubmissionProtocol.class,
        JobSubmissionProtocol.versionID, addr, 
        UserGroupInformation.getCurrentUser(), conf,
        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
  }


打开RPC 类 的 getProxy 方法

public static VersionedProtocol getProxy(
      Class<? extends VersionedProtocol> protocol,
      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
      Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {

    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    VersionedProtocol proxy =
        (VersionedProtocol)[color=red][size=medium] Proxy.newProxyInstance[/size][/color](
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                  clientVersion);
    if (serverVersion == clientVersion) {
      return proxy;
    } else {
      throw new VersionMismatch(protocol.getName(), clientVersion, 
                                serverVersion);
    }
  }


通过RPC  远程调用连接到服务端后.通过传入代理接口 获取到JobTracker 类
代理模式 就详细的讲了,如有必要 可翻看java代理模式

这里 getPoxy() 方法中调用  new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)) 通过new Invoker() 获得client.该cleintt 先判断ClientCache 是不是已经含有该client 如果有则加+1 如果没有 则new Client 生成一个 代码如下

 private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      // Construct & cache client.  The configuration is only used for timeout,
      // and Clients have connection pools.  So we can either (a) lose some
      // connection pooling and leak sockets, or (b) use the same timeout for all
      // configurations.  Since the IPC is usually intended globally, not
      // per-job, we choose (a).
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }
/* 每次 +1 */
 synchronized void incCount() {
    refCount++;
  }
  


JobTracker 将其加入到job队列中

该过程 完成了 用户通过JobClient 像JobTracker 提交作业的流程,提交的过程中不是通过直接提交而是通过了rpc 通信 创建JobClient 代理 通过代理模式提交


  • 大小: 24.4 KB
8
1
分享到:
评论
2 楼 黎明lm 2013-03-28  
meifangzi 写道
楼主真厉害  都分析源码了

用了很久.一直没写出来,博客荒废了很久,重新分析下 就当复习了,一起交流学习
1 楼 meifangzi 2013-03-28  
楼主真厉害  都分析源码了

相关推荐

Global site tag (gtag.js) - Google Analytics