[Curator] Node Cache 的使用与分析

[Curator] Node Cache 的使用与分析

Node Cache

使用节点数据作为本地缓存使用。这个类可以对节点进行监听,能够处理节点的增删改事件,数据同步等。 还可以通过注册自定义监听器来更细节的控制这些数据变动操作。

1. 关键 API

org.apache.curator.framework.recipes.cache.NodeCache

org.apache.curator.framework.recipes.cache.NodeCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. 机制说明

  • 仅仅是单个数据的缓存
  • 内部使用状态机作为不同操作的处理控制

3. 用法

3.1 创建

public NodeCache(CuratorFramework client,                         String path)

3.2 使用

还是一样的套路,在使用前需要调用start();用完之后需要调用close()方法。

随时都可以调用getCurrentData()获取当前缓存的状态和数据。

也可以通过getListenable()获取监听器容器,并在此基础上增加自定义监听器:

public void addListener(NodeCacheListener listener)

4. 错误处理

NodeCache实例已经自带一个ConnectionStateListener处理链接状态的变化。

5. 源码分析

5.1 类定义

public class NodeCache implements Closeable{}
  • 实现了java.io.Closeable

5.2 成员变量

public class NodeCache implements Closeable{    private final Logger log = LoggerFactory.getLogger(getClass());    private final CuratorFramework client;    private final String path;    private final boolean dataIsCompressed;    private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();    private final AtomicBoolean isConnected = new AtomicBoolean(true);    private ConnectionStateListener connectionStateListener = new ConnectionStateListener()    {        @Override        public void stateChanged(CuratorFramework client, ConnectionState newState)        {            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )            {                if ( isConnected.compareAndSet(false, true) )                {                    try                    {                        reset();                    }                    catch ( Exception e )                    {                        ThreadUtils.checkInterrupted(e);                        log.error("Trying to reset after reconnection", e);                    }                }            }            else            {                isConnected.set(false);            }        }    };    private Watcher watcher = new Watcher()    {        @Override        public void process(WatchedEvent event)        {            try            {                reset();            }            catch(Exception e)            {                ThreadUtils.checkInterrupted(e);                handleException(e);            }        }    };    private enum State    {        LATENT,        STARTED,        CLOSED    }    private final BackgroundCallback backgroundCallback = new BackgroundCallback()    {        @Override        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception        {            processBackgroundResult(event);        }    };}
  • log
  • client
  • path
    • 节点路径
  • dataIsCompressed
    • 数据是否压缩
  • data
    • AtomicReference
    • 存放着本地缓存数据
    • 缓存数据被封装成ChildData
  • state
    • AtomicReference
    • 状态
    • 内部枚举
      • LATENT (默认)
      • STARTED
      • CLOSED
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • 监听器容器
  • isConnected
    • 是否已连接ZK
    • AtomicBoolean
  • connectionStateListener
    • 自带的链接状态监听器
  • watcher
    • 自带的节点监听器
    • 一旦节点变动,则调用reset()重置
  • backgroundCallback
    • 节点数据回调操作
    • 避免线程阻塞

5.3 构造器

public NodeCache(CuratorFramework client, String path){    this(client, path, false);}public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){    this.client = client;    this.path = PathUtils.validatePath(path);    this.dataIsCompressed = dataIsCompressed;}

构造器很简单,就是赋值处理。 所以,大部分逻辑还是在start()中。

5.4 启动

public void     start() throws Exception{    start(false);}public void     start(boolean buildInitial) throws Exception{    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");    client.getConnectionStateListenable().addListener(connectionStateListener);    if ( buildInitial )    {        client.checkExists().creatingParentContainersIfNeeded().forPath(path);        internalRebuild();    }    reset();}private void     internalRebuild() throws Exception{    try    {        Stat    stat = new Stat();        byte[]  bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);        data.set(new ChildData(path, stat, bytes));    }    catch ( KeeperException.NoNodeException e )    {        data.set(null);    }}private void     reset() throws Exception{    if ( (state.get() == State.STARTED) && isConnected.get() )    {        client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);    }}

可以看到

  • 默认是不进行缓存数据的初始化的

再来看看启动的过程

  1. 原子操作更新启动状态
  2. 为链接添加connectionStateListener监听器
  3. 如果需要初始化缓存
    1. 创建节点
    2. 调用internalRebuild(),初始数据
      1. 同步节点数据与状态
      2. 写入本地缓存
  4. 调用reset()
    1. 在正常状态时检查缓存节点是否存在
      • 第3步可能是不错初始化动作的
      • 为节点添加了watcher
      • 回调触发backgroundCallback
        • 调用processBackgroundResult()方法(状态机)

5.4.1 processBackgroundResult方法

启动逻辑很大一部分在processBackgroundResult方法中。所以,这里再来看看这个方法:

private void processBackgroundResult(CuratorEvent event) throws Exception{    switch ( event.getType() )    {        case GET_DATA:        {            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )            {                ChildData childData = new ChildData(path, event.getStat(), event.getData());                setNewData(childData);            }            break;        }        case EXISTS:        {            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )            {                setNewData(null);            }            else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )            {                if ( dataIsCompressed )                {                    client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);                }                else                {                    client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);                }            }            break;        }    }}

由于启动在先,所以先来看看EXISTS事件:

  1. 如果节点不存在
    1. 调用setNewData(null),设置数据为null
  2. 如果节点存在
    1. 如果数据需要压缩处理
      1. 则解压获取值
    2. 否则直接获取数据
  • 好吧,这里又是通过backgroundCallback来回调获取值,所以,又会以新的新的状态回到processBackgroundResult
    • GET_DATA
    • 状态机

再来看看GET_DATA事件:

  1. 如果读取正常
    1. 构建childData
    2. 调用setNewData,赋值

5.4.2 setNewData方法

如此看来,还需要看看setNewData方法:

private void setNewData(ChildData newData) throws InterruptedException{    ChildData   previousData = data.getAndSet(newData);    if ( !Objects.equal(previousData, newData) )    {        listeners.forEach        (            new Function<NodeCacheListener, Void>()            {                @Override                public Void apply(NodeCacheListener listener)                {                    try                    {                        listener.nodeChanged();                    }                    catch ( Exception e )                    {                        ThreadUtils.checkInterrupted(e);                        log.error("Calling listener", e);                    }                    return null;                }            }        );        if ( rebuildTestExchanger != null )        {            try            {                rebuildTestExchanger.exchange(new Object());            }            catch ( InterruptedException e )            {                Thread.currentThread().interrupt();            }        }    }}
  1. 本地缓存data赋新值
  2. 如果发现缓存有更新
    1. 触发监听容器中的监听器(同步调用)
    2. rebuildTestExchanger
      • 这个可以基本略过
      • 测试时向其他现场发送一个信号对象

5.4.3 小结

启动过程大致可以分为

  1. 添加链接监听器
  2. 如果需要初始化节点,则创建节点,并拉取缓存数据到本地
  3. 为节点加上节点监听器,并挂载回调方法
  4. 通过回调状态机来同步缓存数据

5.5 获取缓存数据

public ChildData getCurrentData(){    return data.get();}

直接读取缓存数据。 对使用者来说,只需要操作本地缓存。 而本地缓存与ZK节点数据,通过监听器回调状态机来完成同步动作。

5.6 关闭

缓存用完,需要调用close()

public void close() throws IOException{    if ( state.compareAndSet(State.STARTED, State.CLOSED) )    {        listeners.clear();        client.clearWatcherReferences(watcher);        client.getConnectionStateListenable().removeListener(connectionStateListener);        // TODO        // From PathChildrenCache        // This seems to enable even more GC - I'm not sure why yet - it        // has something to do with Guava's cache and circular references        connectionStateListener = null;        watcher = null;    }        }
  • 原子操作更新状态
  • 清理监听器容器
  • 清理掉节点上监听器
  • 清理掉链接上的监听器

并没有制空本地缓存数据

6. 小结

与Path Cache比起来,Node Cache要简单很多。

  • Path Cache
    • 更像是一个Cache Manager
    • 在path下管理着多个cache
    • 由于多个cache的存在
      • 同步逻辑复杂
      • 并发问题更为严重
        • 所以内部使用了
          • 命令模式
          • 异步按序执行操作
  • Node Cache
    • 仅仅是单个数据的缓存
    • 而且缓存数据的特性,也无需严格控制并发(脏数据也可以接受)
    • 使用一个回调状态机来处理不同的数据状态

推荐阅读