当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

opensource

拥有积分:55
这家伙太懒,还没有签名!

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

Hbase的RPC

opensource 发表于 2年前 (2014-09-22 09:32:23)  |  评论(0)  |  阅读次数(3154)| 0 人收藏此文章,   我要收藏   

HbaseRPC主要由HBaseRPCRpcEngineHBaseClientHBaseServerVersionedProtocol 5个概念组成。

1HBaseRPChbase RPC的实现类,核心方法:

1)、RpcEngine getProtocolEngine():返回RpcEngine对象

2)、<T extends VersionedProtocol> T waitForProxy():调用RpcEnginegetProxy()方法,返回一个远程代理对象,比如:第一次访问HRegionServer时需要执行该方法,设置代理后,会缓存该对象到HConnectionImplementation中。

 

 

2RpcEngine接口,其实现类:WritableRpcEngine,核心方法:

1)、VersionedProtocol getProxy():返回代理对象,HRegionServerHMaster均是VersionedProtocol的实现类,即返回的对象可代理执行HRegionServerHMaster的方法;

2)、Object[] call():调用程序接口,最终是经过HBaseClient的内部类Connection通过socket方式完成;

       3)、RpcServer getServer():返回RpcServer的实现类,有一个抽象实现: HBaseServerHBaseServer的子类:WritableRpcEngine.Server

4)、stopProxy()

 

 

3HBaseClientRPCclient端实现,最核心的方法是call(),通过该方法可执行服务端的方法,该类中有一个重要的内部类:HBaseClient.Connection,该类封装了socket,具体原理就是把要执行的方法通过socket告诉服务端,服务端通过HBaseServer类从socket中读出client端的调用方法,然后执行相关类的相应方法,结果再通过socket传回。

 

 

4HBaseServerRPCserver端实现,HBaseServer有两个重要的内部类,一个是HBaseServer.Connection,另一个是Handler类,这里的Connectionsocket中读出call方法并放入callQueue队列中,Handler类从该队列中取出call方法(比如:scan查询时执行的一次next(),该方法会执行到服务端HRegionServernext(),这里next就是call方法)并执行,结果通过socket输出给client端,HandlerThread的子类,在RS启动时就会创建所有的Handle线程,然后一直执行,具体的handler线程数可以通过配置项hbase.regionserver.handler.count配置,默认是10

 

 

5VersionedProtocol,该接口的类图如下:



 

 

可进行RPC调用的类必须是该接口的实现类,hbase client、 RS、Master相互之间的访问总结为:

  • HBase Client 通过HMasterInterface接口访问HMasterServer,通过HRegionInterface接口访问HRegionServer;
  • HRegionServer通过HMasterRegionInterface接口访问HMasterServer;
  • HMaster通过HRegionInterface接口访问HRegionServer,在访问RS时Master就是HBase Client的角色。

scan查询为例画时序图,通过时序图详细理解HBaseRPC调用过程



 

 

 

关于HBaseRPC一些知识补充如下:

1HBaseClient缓存了HBaseClient.Connection,默认一个client应用与每个RS均只有一个socket链接,可以通过以下两个配置项修改:

1)hbase.client.ipc.pool.type:默认为RoundRobin,共三种,如下:

// PoolMap类的createPool方法,在HBaseClient缓存connection时会调用,从pool中获取connection时,如果缓存的数量没有达到poolMaxSize,则会返回null,从而创建新的connection对象

protected Pool<V> createPool() {

    switch (poolType) {

    case Reusable: return new ReusablePool<V>(poolMaxSize); //复用的池

    case RoundRobin: return new RoundRobinPool<V>(poolMaxSize); //轮询

    case ThreadLocal:  return new ThreadLocalPool<V>(); //本地线程

    }

    return null;

  }

//PoolMap.RoundRobinPool类的get方法,缓存的数量没有达到poolMaxSize,则会返回null,这时会创建新的connection对象,不同的poolType有不同的实现,比如:ReusablePoolget方法是:return poll();

public R get() {

      if (size() < maxSize) {

        return null;

      }

      nextResource %= size();

      R resource = get(nextResource++);

      return resource;

    }

2)、hbase.client.ipc.pool.sizesocket链接池大小,默认为1

 

2、详解HBaseClientgetConnection()方法:

//HBaseClientgetConnection方法,默认一个regionserver和一个master均只会建立一个socket链接,可以通过修改hbase.client.ipc.pool.size(默认值为1)增加socket链接数,可参看本文档上面几行的内容

protected Connection getConnection(InetSocketAddress addr,

                                   Class<? extends VersionedProtocol> protocol,

                                   User ticket,

                                   int rpcTimeout,

                                   Call call)

                                   throws IOException, InterruptedException {

    if (!running.get()) {

      throw new IOException("The client is stopped");

    }

Connection connection;

//一个regionserver对应一个ConnectionId

    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);

synchronized (connections) {

//connectionsPoolMap类的实例,如果connectionsremoteId对应的链接数量小于hbase.client.ipc.pool.size的配置值则会返回null

      connection = connections.get(remoteId);

      if (connection == null) {

        connection = createConnection(remoteId);

//一个regionserver对应一个ConnectionId

        connections.put(remoteId, connection);

      }

    }

    connection.addCall(call);

 

    //如果没有socket链接则建立socket链接

    connection.setupIOstreams();

    return connection;

  }

 

//ConnectionIdequals方法

public boolean equals(Object obj) {

     if (obj instanceof ConnectionId) {

       ConnectionId id = (ConnectionId) obj;

       return address.equals(id.address) && protocol == id.protocol &&

              ((ticket != null && ticket.equals(id.ticket)) ||

               (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;

     }

     return false;

}

 

//HBaseClient内部类:ConnectionsetupIOstreams方法,在getConnection()中被调用

protected synchronized void setupIOstreams()

        throws IOException, InterruptedException {

//如果有可用的socket对象则直接返回

      if (socket != null || shouldCloseConnection.get()) {

        return;

      }

 

      if (failedServers.isFailedServer(remoteId.getAddress())) {

        IOException e = new FailedServerException(

            "This server is in the failed servers list: " + remoteId.getAddress());

        markClosed(e);

        close();

        throw e;

      }

 

      try {

        setupConnection();

        this.in = new DataInputStream(new BufferedInputStream

            (new PingInputStream(NetUtils.getInputStream(socket))));

        this.out = new DataOutputStream

            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));

        writeHeader();

 

        // update last activity time

        touch();

 

        // start the receiver thread after the socket connection has been set up

        start();

      } catch (Throwable t) {

        failedServers.addToFailedServers(remoteId.address);

        IOException e;

        if (t instanceof IOException) {

          e = (IOException)t;

        } else {

          e = new IOException("Could not set up IO Streams", t);

        }

        markClosed(e);

        close();

 

        throw e;

      }

}

//HBaseClient内部类:ConnectionsetupConnection方法,在setupIOstreams ()中被调用

//在这里创建socket对象

protected synchronized void setupConnection() throws IOException {

      short ioFailures = 0;

      short timeoutFailures = 0;

      while (true) {

        try {

          this.socket = socketFactory.createSocket();

          this.socket.setTcpNoDelay(tcpNoDelay);

          this.socket.setKeepAlive(tcpKeepAlive);

          // connection time out is 20s

          NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf));

          if (remoteId.rpcTimeout > 0) {

            pingInterval = remoteId.rpcTimeout; // overwrite pingInterval

          }

          this.socket.setSoTimeout(pingInterval);

          return;

        } catch (SocketTimeoutException toe) {

          /* The max number of retries is 45,

           * which amounts to 20s*45 = 15 minutes retries.

           */

          handleConnectionFailure(timeoutFailures++, maxRetries, toe);

        } catch (IOException ie) {

          handleConnectionFailure(ioFailures++, maxRetries, ie);

        }

      }

    }

 

3Hbase有三个链接类:

org.apache.hadoop.hbase.client.HConnection

org.apache.hadoop.hbase.ipc.HBaseClient.Connection

org.apache.hadoop.hbase.ipc.HBaseServer.Connection

 l  HConnection接口

实现类:HConnectionImplementation,该链接是clienthbase集群这个层面的链接对象,一个集群的一个client就一个该链接对象,在该对象持有

   1)  RpcEngine对象

   2)   ZooKeeperWatcher 对象

   3)    masterRPC代理对象:HMasterInterface

   4)    regionserverRPC代理对象:HRegionInterface(一个rs对应一个HRegionInterface代理对象)

   5)     缓存的regionlocation信息

   6)    线程池batchPoolbatchPool用于HTable的如下方法:



 

 

其中批量get查询apiget(List<Get> gets)会调用batch方法,而单条查询get不会,只要是可能涉及多个regionserver的操作均会使用多线程处理,像批量get、批量deleteputscan一次只能查询一个RS,因此虽然功能上是批量查询数据,但是不需要线程池。

有两个地方均可能创建该批量操作的线程池(注意是线程池不是连接池),分别是HTablePoolHConnectionImplementation,如果通过HTablePool获取HTable对象则采用HTablePool的线程池,如果采用HConnectionImplementation获取HTable对象,则采用HConnectionImplementation的线程池,这个取决于hbase client程序的用法,HTablePool已经是不推荐的方式,0.94.12的版本推荐通过HConnection获取HTable

// HTable的默认线程池,HTablePool每次创建HTable时均会创建一个直接提交的线程池(采用SynchronousQueue队列),该线程池的特点是不会缓存任务,有任务会直接执行,缺点是:如果并发大会导致同时存活大量的线程,优点是不会缓存任务,从而不会存在任务堆积过多导致jvm内存暴涨,不过开启过多的线程也会导致大量的消耗内存

private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {

    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);

    if (maxThreads == 0) {

      maxThreads = 1; // is there a better default?

    }

    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);

    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,

        keepAliveTime, TimeUnit.SECONDS,

        new SynchronousQueue<Runnable>(),

        Threads.newDaemonThreadFactory("hbase-table"));

    pool.allowCoreThreadTimeOut(true);

    return pool;

  }

 

// HConnectionImplementation的默认线程池

private ExecutorService getBatchPool() {

      if (batchPool == null) {

        // shared HTable thread executor not yet initialized

        synchronized (this) {

          if (batchPool == null) {

            int maxThreads = conf.getInt("hbase.hconnection.threads.max", Integer.MAX_VALUE);

            if (maxThreads == 0) {

              maxThreads = Runtime.getRuntime().availableProcessors();

            }

            long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);

            this.batchPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),

                maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),

                Threads.newDaemonThreadFactory("hbase-connection-shared-executor"));

            ((ThreadPoolExecutor) this.batchPool).allowCoreThreadTimeOut(true);

          }

          this.cleanupPool = true;

        }

      }

      return this.batchPool;

    }

由源码可以看出:

两个线程池均是采用的直接提交线程池,唯一区别是创建ThreadPoolExecutor对象时指定的corePoolSize不一样,HTablePool1HConnectionRuntime.getRuntime().availableProcessors()

 

  l  HBaseClient.ConnectionHBaseServer.Connection

这两个Connection链接类是对socket的封装,HBaseClient.Connection类的实例数默认等于1Master+ RS个数,可以通过hbase.client.ipc.pool.size配置,默认为1,如果是2clientMaster和每个RS均有两个Connection类的实例,也可理解为有2socket链接,也可配置hbase.client.ipc.pool.type(默认为轮询)修改socket连接池的类型。

总之:HConnection缓存HMasterInterfaceHRegionInterfaceRPC代理对象,HMasterInterfaceHRegionInterfaceRPC代理对象最终均是通过Connection类建立的socket与服务端通信, master和每个RS均只对应一个RPC代理对象,每个RPC代理对象默认对应一个Connection对象,一个Connection对象持有一个socket链接。

 

4、关于参数hbase.regionserver.lease.periodRS的租凭期,RS的租凭设计用于当client端持有RS资源的场景,主要用于scan操作,RS会为client端保留scanner对象,以便多次交互,默认60秒,客户端必须在该时间内向RS发送心跳信息,否则RS认为clientdeaded,超过该时间请求RS时,RS会抛出异常,对于scan操作一次next读取相当于一次心跳(参看:Leases类),在client端用该时间作为scan查询时每次next()的超时时间。

 

5、关于hbase.rpc.timeout配置:每次RPC的超时时间,默认为60000,如果没有超时则等待1s后再重试,直到超时或者重试成功,起三个作用:

  1)     Socket读数据的超时时间。如果超过超时值,将引发 java.net.SocketTimeoutException具体解释请参考:java.net.Socket.setSoTimeout()API

  2)       控制整个HBaseRPC.waitForProxy()方法的超时时间,在该方法中RPC远程执行HRegionServergetProtocolVersion()方法,检查clientserver端的协议版本,这个过程的总时间不能超过该配置时间,在RPC的过程中涉及建立socket链接和socket通信,因此该时间应该大于或者等于建立链接(ipc.socket.timeout)的时间(socket 通信时间=整体时间),如果小于建立链接的时间,则多导致无用的建立链接等待,就算建立成功但是也会因为整体超时后抛出异常。

  3)        在定位root表所在的regionserver地址时,会从zk中获取root表的regionserver信息,该过程的超时时间由该参数控制,当zk不可用等原因时,会返回null,这时就会阻塞直到超时,这个设计应该是有问题,阻塞没有任何意义。

HBaseRPCsetRpcTimeout()方式,在一些场景会通过该方法修改rpc超时时间,通过HBaseRPC的如下方法可以看出,hbase最终采用的时间是编程方式制定的超时时间与hbase.rpc.timeout配置的时间中的最小时间:

//HBaseRPC

public static void setRpcTimeout(int rpcTimeout) {

    HBaseRPC.rpcTimeout.set(rpcTimeout);

  }

//HBaseRPC

public static int getRpcTimeout(int defaultTimeout) {// defaultTimeouthbase.rpc.timeout配置的时间

    return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get());

  }

//WritableRpcEngine

public <T extends VersionedProtocol> T getProxy(

      Class<T> protocol, long clientVersion,

      InetSocketAddress addr, Configuration conf, int rpcTimeout)

    throws IOException {

    T proxy =

          (T) Proxy.newProxyInstance(

              protocol.getClassLoader(), new Class[] { protocol },

              new Invoker(client, protocol, addr, User.getCurrent(), conf,

                  HBaseRPC.getRpcTimeout(rpcTimeout)));

}

 

 

通过编程方式设置rpc超时时间的操作只来自ServerCallable类的beforeCall()方法:

public void beforeCall() {

    this.startTime = EnvironmentEdgeManager.currentTimeMillis();

    int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));

    if (remaining < MIN_RPC_TIMEOUT) {

      remaining = MIN_RPC_TIMEOUT;

    }

    HBaseRPC.setRpcTimeout(remaining);

  }

scangetdelete等上十个与server交互的操作类均是通过ServerCallable实现。

 

6HBaseclientserversocket连接是通过hadooporg.apache.hadoop.net.NetUtils类实现,在hadoop1.2.1版本中NetUtils类是基于nio实现socket通信的。

 

7、关于ipc.socket.timeout:默认20s,该参数控制java.nio.channels.Selectorselect(timeout)超时时间,如果20s通道没有就绪则抛出超时异常,也即是socket建立连接的超时时间,读数据的超时时间由hbase.rpc.timeout配置。

 

8HBaseClientcall方法

 

public Writable call(Writable param, InetSocketAddress addr,

                       Class<? extends VersionedProtocol> protocol,

                       User ticket, int rpcTimeout)  {

    Call call = new Call(param);

    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);

    connection.sendParam(call);                 // send the parameter

      while (!call.done) {

        try {

//如果远端调用没有执行完,则会每隔1s钟检查一次,可见是比较低效的。

          call.wait(1000);        

        } catch (InterruptedException ignored) {

          interrupted = true;

        }

      }

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183