当前位置:首页 > PHP教程 > php高级应用 > 列表

阿里改造后的memcached客户端源码详解

发布:smiling 来源: PHP粉丝网  添加日期:2015-04-04 16:26:19 浏览: 评论:0 

最近项目需要用应用缓存解决方案,选择了目前比较流行的memcached作为分布式缓存,客户端我们选择了阿里改造后的memchaced-client-forjava,因为该客户端经过阿里内部大量实际项目的线上运行,表现给力.

源码分析

memcached本身是一个集中式的内存缓存系统,对于分布式的支持服务端并没有实现,只有通过客户端实现;再者,memcached是基于TCP/UDP进行通信,只要客户端语言支持TCP/UDP即可实现客户端,并且可以根据需要进行功能扩展,memchaced-client-forjava 既是使用java语言实现的客户端,并且实现了自己的功能扩展.

几个重要类的说明:

MemcachedCacheManager:管理类,负责缓存服务端,客户端,以及相关资源池的初始化工作,获取客户端等等

MemcachedCache:memcached缓存实体类,实现了所有的缓存API,实际上也会调用MemcachedClient进行操作

MemcachedClient:memcached缓存客户端,一个逻辑概念,负责与服务端实例的实际交互,通过调用sockiopool中的socket

SockIOPool:socket连接资源池,负责与memcached服务端进行交互

ClusterProcessor:集群内数据异步操作工具类

客户端可配置化

MemcachedCacheManager是入口,其start方法读取配置文件memcached.xml,初始化各个组建,包括memcached客户端,socket连接池以及集群节点.

memcached客户端是个逻辑概念,并不是和memcached服务端实例一一对应的,可以认为其是一个逻辑环上的某个节点,后面会讲到hash一致性算法时涉及,该配置文件中,可配置一个或多个客户端,每个客户端可配置一个socketPool连接池,如下:

  1. <client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0”> //开源软件:phpfensi.com 
  2.     <errorHandler>com.alisoft.xplatform.asf.cache.memcached.MemcachedErrorHandler</errorHandler> 
  3. </client> 

扩容

socketpool连接池配置的才是真正连接的memcached服务实例,当然,你可以连接多个memcached服务实例,多个实例可以分布在一台或者多台物理机器上。这样,随着实际业务数据量的增加,可以对现有缓存容量进行扩容,只需在servers中增加memcached实例即可,或者增加多个socketpool配置项,配置如下:

  1. <socketpool name="pool0" failover="true" initConn="5" minConn="5" maxConn="250" maintSleep="5000" nagle="false" socketTO="3000" aliveCheck="true"
  2.     <servers>192.168.1.66:11211,192.168.1.68:11211</servers> 
  3. </socketpool> 

初始化过程

上文提及的MemcachedCacheManager,该类功能包括有初始化各种资源池,获取所有客户端,重新加载配置文件以及集群复制等。我们重点分析方法start,该方法首先加载配置文件,然后初始化资源池,即方法initMemCacheClientPool,该方法中定义了三个资源池,即socket连接资源池socketpool,memcachedcache资源池cachepool,以及由客户端组成的集群资源池clusterpool,这些资源池的数据结构都是线程安全的ConcurrentHashMap,保证了并发效率,将配置信息分别实例化后,再分别放入对应的资源池容器中,socket连接放入socketpool中,memcached客户端放入cachepool中,定义的集群节点放入clusterpool中。

注意,在实例化socket连接池资源socketpool时,会调用每个pool的初始化方法pool.initialize(),来映射memcached实例到HASH环上,以及初始化socket连接.

单点问题

memcached的分布式,解决了容量水平扩容的问题,但是当某个节点失效时,还是会丢失一部分数据,单点故障依然存在,分布式只是解决了数据整体失效问题,而在实际项目中,特别是GAP平台适应的企业级项目中,是不允许数据不一致的,所以对每一份保存的数据都需要进行容灾处理,那么对于定义的每个memcached客户端,都至少增加一个新客户端与其组成一个cluster集群,当更新或者查找数据时,会先定位到该集群中某个节点,如果该节点失效,就去另外一个节点进行操作。在实际项目中,通过合理规划配置cluster和client(memcached客户端),可以最大限度的避免单点故障,当所有client都失效时还会丢失数据,在配置文件中,集群配置如下:

  1. <cluster name="cluster1" mode="active"
  2.     <memCachedClients>mclient1,mclient2</memCachedClients> 
  3. </cluster> 

HASH一致性算法

在memcached支持分布式部署场景下,如何获取一个memcached实例?如何平均分配memcached实例的存储?这些需要一个算法来实现,我们选择的是HASH一致性算法,具体就体现在客户端如何获取一个连接memcached服务端的socket上,也就是如何定位memcached实例的问题?算法要求能够根据每次提供的同一个key获得同一个实例.

HASH闭环的初始化

本质上,hash一致性算法是需要实现一个逻辑环,如图所示,环上所有的节点即为一个memcached实例,如何实现?其实是根据每个memcached实例所在的ip地址,将所有的实例映射到hash数值空间中,构成一个闭合的圆环。

HASH环映射的初始化的代码位于SocketIOPool.populateConsistentBuckets方法中,主要代码如下:

  1. private void populateConsistentBuckets() 
  2.      { 
  3.          ……... 
  4.           for (int i = 0; i < servers.length; i++) 
  5.           { 
  6.                int thisWeight = 1; 
  7.                if (this.weights != null && this.weights[i] != null) 
  8.                     thisWeight = this.weights[i]; 
  9.                     double factor = Math .floor(((double) (40 * this.servers.length * thisWeight)) / (double                                      ) this.totalWeight); 
  10.                for (long j = 0; j < factor; j++) 
  11.                { 
  12.                     byte[] d = md5.digest((servers[i] + "-" + j).getBytes()); 
  13.                     for (int h = 0; h < 4; h++) 
  14.                     { 
  15.                          // k 的值使用MD5hash算法计算获得 
  16.                          Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) 
  17.                                    | ((long) (d[2 + h * 4] & 0xFF) << 16) 
  18.                                    | ((long) (d[1 + h * 4] & 0xFF) << 8) 
  19.                                    | ((long) (d[0 + h * 4] & 0xFF)); 
  20.                          // 用treemap来存储memcached实例所在的ip地址, 
  21.                          // 也就是将每个缓存实例所在的ip地址映射到由k组成的hash环上 
  22.                         consistentBuckets.put(k, servers[i]); 
  23.                          if (log.isDebugEnabled()) 
  24.                               log.debug("++++ added " + servers[i] 
  25.                                         + " to server bucket"); 
  26.                     } 
  27.                } 
  28.            ……... 
  29.           } 
  30.      } 

获取socket连接

在实际获取memcahced实例所在服务器的soket时,只要使用基于同一个存储对象的key的MD5Hash算法,就可以获得相同的memcached实例所在的ip地址,也就是可以准确定位到hash环上相同的节点,代码位于SocketIOPool.getSock方法中,主要代码如下:

  1.  public SockIO getSock(String key, Integer hashCode){ 
  2.       …………. 
  3.           
  4.       // from here on, we are working w/ multiple servers 
  5.       // keep trying different servers until we find one 
  6.       // making sure we only try each server one time 
  7.       Set<String> tryServers = new HashSet<String>(Arrays.asList(servers)); 
  8.       // get initial bucket 
  9.       // 通过key值计算hash值,使用的是基于MD5的算法 
  10.       long bucket = getBucket(key, hashCode); 
  11.       String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets .get(bucket)  : buckets.g                          et((int) bucket); 
  12.        …………...   
  13.  } 
  14.  private long getBucket(String key, Integer hashCode) 
  15.  { 
  16.       / / 通过key值计算hash值,使用的是基于MD5的算法 
  17.       long hc = getHash(key, hashCode); 
  18.       if (this.hashingAlg == CONSISTENT_HASH) 
  19.       { 
  20.            return findPointFor(hc); 
  21.       } else 
  22.       { 
  23.            long bucket = hc % buckets.size(); 
  24.            if (bucket < 0) 
  25.                 bucket *= -1; 
  26.            return bucket; 
  27.       } 
  28.  } 
  29.  /** 
  30.   * Gets the first available key equal or above the given one, if none found, 
  31.   * returns the first k in the bucket 
  32.   * 
  33.   * @param k 
  34.   *            key 
  35.   * @return 
  36.   */ 
  37.  private Long findPointFor(Long hv) 
  38.  { 
  39.       // this works in java 6, but still want to release support for java5 
  40.       // Long k = this.consistentBuckets.ceilingKey( hv ); 
  41.       // return ( k == null ) ? this.consistentBuckets.firstKey() : k; 
  42.       // 该consistentBuckets中存储的是HASH结构初始化时,存入的所有memcahced实例节点,也就是整个hash环 
  43.       // tailMap方法是取出大于等于hv的所有节点,并且是递增有序的 
  44.       SortedMap<Long, String> tmap = this.consistentBuckets.tailMap(hv); 
  45.       // 如果tmap为空,就默认返回hash环上的第一个值,否则就返回最接近hv值的那个节点 
  46.       return (tmap.isEmpty()) ? this.consistentBuckets.firstKey() : tmap .firstKey(); 
  47.  } 
  48. /** 
  49.   * Returns a bucket to check for a given key. 
  50.   * 
  51.   * @param key 
  52.   *            String key cache is stored under 
  53.   * @return int bucket 
  54.   */ 
  55.  private long getHash(String key, Integer hashCode) 
  56.  { 
  57.       if (hashCode != null) 
  58.       { 
  59.            if (hashingAlg == CONSISTENT_HASH) 
  60.                 return hashCode.longValue() & 0xffffffffL; 
  61.            else 
  62.                 return hashCode.longValue(); 
  63.       } else 
  64.       { 
  65.            switch (hashingAlg) 
  66.            { 
  67.            case NATIVE_HASH: 
  68.                 return (long) key.hashCode(); 
  69.            case OLD_COMPAT_HASH: 
  70.                 return origCompatHashingAlg(key); 
  71.            case NEW_COMPAT_HASH: 
  72.                 return newCompatHashingAlg(key); 
  73.            case CONSISTENT_HASH: 
  74.                 return md5HashingAlg(key); 
  75.            default
  76.                 // use the native hash as a default 
  77.                 hashingAlg = NATIVE_HASH; 
  78.                 return (long) key.hashCode(); 
  79.            } 
  80.       } 
  81.  } 
  82. /** 
  83.   * Internal private hashing method. 
  84.   * 
  85.   * MD5 based hash algorithm for use in the consistent hashing approach. 
  86.   * 
  87.   * @param key 
  88.   * @return 
  89.   */ 
  90.  private static long md5HashingAlg(String key) 
  91.  { 
  92.       / /通过key值计算hash值,使用的是基于MD5的算法 
  93.       MessageDigest md5 = MD5.get(); 
  94.       md5.reset(); 
  95.       md5.update(key.getBytes()); 
  96.       byte[] bKey = md5.digest(); 
  97.       long res = ((long) (bKey[3] & 0xFF) << 24) 
  98.                 | ((long) (bKey[2] & 0xFF) << 16) 
  99.                 | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); 
  100.       return res; 
  101.  } 

通过以上代码的分析,整个memcahced服务端实例HASH环的初始化,以及数据更新和查找使用的算法都是基于同一种算法,这就保证了通过同一个key获得的memcahced实例为同一个.

socket连接池

这部分单独介绍,请猛烈地戳这里。

容灾、故障转移以及性能

衡量系统的稳定性,很大程度上是对各种异常情况的处理,充分考虑异常情况,以及合理处理异常是对系统设计人员的要求,下面看看在故障处理和容灾方面系统都做了那些工作。

定位memcached实例时,当第一次定位失败,会对所有其他的属于同一个socketpool中的memcahced实例进行定位,找到一个可用的,代码如下:

  1. // log that we tried 
  2. // 先删除定位失败的实例 
  3. tryServers.remove(server); 
  4. if (tryServers.isEmpty()) 
  5.     break
  6. // if we failed to get a socket from this server 
  7. // then we try again by adding an incrementer to the 
  8. // current key and then rehashing 
  9. int rehashTries = 0; 
  10. while (!tryServers.contains(server)) 
  11.   // 重新计算key值 
  12.   String newKey = new StringBuilder().append(rehashTries).append(key).toString(); 
  13.   // String.format( "%s%s", rehashTries, key ); 
  14.   if (log.isDebugEnabled()) 
  15.       log.debug("rehashing with: " + newKey); 
  16.   // 去HASH环上定位实例节点 
  17.   bucket = getBucket(newKey, null); 
  18.   server=(this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket); 
  19.   rehashTries++; 
  20.  } 

查找数据时,当前节点获取不到,会尝试到所在集群中其他的节点查找,成功后,会将数据复制到原先失效的节点中,代码如下:

  1. public Object get(String key) 
  2.      { 
  3.           Object result = null; 
  4.           boolean isError = false;   
  5.        ……....... 
  6.           if (result == null && helper.hasCluster()) 
  7.            if (isError || helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE)) 
  8.           { 
  9.                List<MemCachedClient> caches = helper.getClusterCache(); 
  10.                for(MemCachedClient cache : caches) 
  11.                { 
  12.                     if (getCacheClient(key).equals(cache)) 
  13.                          continue
  14.                     try{ try 
  15.                          { 
  16.                               result = cache.get(key); 
  17.                          } 
  18.                          catch(MemcachedException ex) 
  19.                          { 
  20.                               Logger.error(new StringBuilder(helper.getCacheName()) 
  21.                                    .append(" cluster get error"),ex); 
  22.                               continue
  23.                          } 
  24.                          //仅仅判断另一台备份机器,不多次判断,防止效率低下 
  25.                          if (helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE                                                      ) && result != null) 
  26.                          { 
  27.                               Object[] commands = new Object[]{CacheCommand.RECOVER,key,result}; 
  28.                              // 加入队列,异步执行复制数据 
  29.                               addCommandToQueue(commands); 
  30.                          } 
  31.                          break;     
  32.                     } 
  33.                     catch(Exception e) 
  34.                     { 
  35.                      Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster get error"),e); 
  36.                     } 
  37.                } 
  38.           } 
  39.           return result; 
  40.      } 

更新数据时,异步更新到集群内其他节点,示例代码如下:

  1. public boolean add(String key, Object value) 
  2.      { 
  3.           boolean result = getCacheClient(key).add(key,value); 
  4.           if (helper.hasCluster()) 
  5.           { 
  6.                Object[] commands = new Object[]{CacheCommand.ADD,key,value}; 
  7.                // 加入队列,异步执行 
  8.                addCommandToQueue(commands); 
  9.           } 
  10.           return result; 
  11.      } 

删除数据时,需要同步执行,如果异步的话,会产生脏数据,代码如下:

  1. public Object remove(String key) 
  2.      { 
  3.           Object result = getCacheClient(key).delete(key); 
  4.           //异步删除由于集群会导致无法被删除,因此需要一次性全部清除 
  5.           if (helper.hasCluster()) 
  6.           { 
  7.                List<MemCachedClient> caches = helper.getClusterCache(); 
  8.                
  9.                for(MemCachedClient cache : caches) 
  10.                { 
  11.                     if (getCacheClient(key).equals(cache)) 
  12.                          continue
  13.                     try 
  14.                     { 
  15.                          cache.delete(key); 
  16.                     } 
  17.                     catch(Exception ex) 
  18.                     { 
  19.                          Logger.error(new StringBuilder(helper.getCacheName()) 
  20.                                         .append(" cluster remove error"),ex); 
  21.                     } 
  22.                } 
  23.           } 
  24.           return result; 
  25.      } 

异步执行集群内数据同步,因为不可能每次数据都要同步执行到集群内每个节点,这样会降低系统性能,所以在构造MemcachedCache对象时,会建立一个队列,线程安全的linked阻塞队列LinkedBlockingQueue,将所有需要异步执行的命令放入队列中,异步执行,具体异步执行由ClusterProcessor类负责,代码如下:

  1. public MemcachedCache(MemCachedClientHelper helper,int statisticsInterval) 
  2.      { 
  3.           this.helper = helper;  
  4.           dataQueue = new LinkedBlockingQueue<Object[]>(); 
  5.          ……… 
  6.           processor = new ClusterProcessor(dataQueue,helper); 
  7.           processor.setDaemon(true); 
  8.           processor.start(); 
  9.      } 

本地缓存的使用是为了降低连接服务端的IO开销,当有些数据变化频率很低时,完全可以放在应用服务器本地,同时可以设置有效时间,直接获取,DefaultCacheImpl类为本地缓存的实现类,在构造MemcachedCache对象时,即初始化.

每次查找数据时,会先查找本地缓存,如果没有再去查缓存,结束后将数据让如本地缓存中,代码如下:

  1. public Object get(String key, int localTTL) 
  2.      { 
  3.           Object result = null; 
  4.          // 本地缓存中查找 
  5.           result = localCache.get(key); 
  6.           if (result == null) 
  7.           { 
  8.                result = get(key); 
  9.                if (result != null) 
  10.                { 
  11.                     Calendar calendar = Calendar.getInstance(); 
  12.                     calendar.add(Calendar.SECOND, localTTL); 
  13.                     // 放入本地缓存 
  14.                     localCache.put(key, result,calendar.getTime()); 
  15.                } 
  16.           } 
  17.           return result; 
  18.      } 

增加缓存数据时,会删除本地缓存中对应的数据,代码如下:

  1. public Object put(String key, Object value, Date expiry) 
  2.      { 
  3.           boolean result = getCacheClient(key).set(key,value,expiry); 
  4.           //移除本地缓存的内容 
  5.           if (result) localCache.remove(key); 
  6.         …….. 
  7.           return value; 
  8.      } 

改造部分

据以上分析,我们通过封装,做到了客户端的可配置化,memcached实例的水平扩展,通过集群解决了单点故障问题,并且保证了应用程序只要每次使用相同的数据对象的key值即可获取相同的memcached实例进行操作。但是,为了使缓存的使用对于应用程序来说完全透明,我们对cluster部分进行了再次封装,即把cluster看做一个node,根据cluster名称属性,进行HASH数值空间计算(同样基于MD5算法),映射到一个HASH环上.

这部分逻辑放在初始化资源池clusterpool时进行(即放在MemcahedCacheManager.initMemCacheClientPool方法中),与上文中所描述的memcached实例HASH环映射的逻辑一致,部分代码如下.

  1. //populate cluster node to hash consistent Buckets 
  2. MessageDigest md5 = MD5.get(); 
  3. // 使用cluster的名称计算HASH数值空间 
  4. byte[] d = md5.digest((node.getName()).getBytes()); 
  5. for (int h = 0; h < 4; h++) 
  6.    Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) 
  7.                        | ((long) (d[2 + h * 4] & 0xFF) << 16) 
  8.                        | ((long) (d[1 + h * 4] & 0xFF) << 8) 
  9.                        | ((long) (d[0 + h * 4] & 0xFF)); 
  10.   consistentClusterBuckets.put(k, node.getName()); 
  11.   if (log.isDebugEnabled()) 
  12.        log.debug("++++ added " + node.getName() + " to cluster bucket"); 
  13. }  

在进行缓存操作时,仍然使用数据对象的key值获取到某个cluster节点,然后再使用取余算法(这种算法也是经常用到的分布式定位算法,但是有局限性,即随着节点数的增减,定位越来越不准确),拿到cluster中的某个节点,在进行缓存的操作;定位hash环上cluster节点的逻辑也与上文一样,这里不在赘述。部分定位cluster中节点的取余算法代码如下:

  1. public IMemcachedCache getCacheClient(String key){ 
  2.        …………. 
  3.        String clusterNode = getClusterNode(key); 
  4.         MemcachedClientCluster mcc = clusterpool.get(clusterNode); 
  5.         List<IMemcachedCache> memcachedCachesClients = mcc.getCaches(); 
  6.         //根据取余算法获取集群中的某一个缓存节点 
  7.         if (!memcachedCachesClients.isEmpty()) 
  8.         { 
  9.             long keyhash = key.hashCode(); 
  10.             int index = (int)keyhash % memcachedCachesClients.size(); 
  11.             if (index < 0 ) 
  12.                 index *= -1; 
  13.             return memcachedCachesClients.get(index); 
  14.         } 
  15.         return null; 
  16.     } 

这样,对于应用来说,配置好资源池以后,无需关心那个集群或者客户端节点,直接通过MemcachedCacheManager获取到某个memcachedcache,然后进行缓存操作即可.

最后,使用GAP平台分布式缓存组件,需要提前做好容量规划,集群和客户端事先配置好,另外,缓存组件没有提供数据持久化功能.

Tags: memcached 客户端源码

分享到: