Redis集群批量操作

Redis集群批量操作

Redis在3.0版正式引入了集群这个特性,扩展变得非常简单。然而当你开心的升级到3.0后,却发现有些很好用的功能现在工作不了了, 比如我们今天要聊的pipeline功能等批量操作。

Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。

/** * 根据key计算slot, * 再根据slot计算node, * 获取pipeline * 进行批量操作  */public class BatchUtil {    public static Map<String, String> mget(JedisCluster jc, String... keys){        Map<String, String> resMap = new HashMap<>();        if(keys == null || keys.length == 0){            return resMap;        }        //如果只有一条,直接使用get即可        if(keys.length == 1){            resMap.put(keys[0], jc.get(keys[0]));            return resMap;        }                //JedisCluster继承了BinaryJedisCluster        //BinaryJedisCluster的JedisClusterConnectionHandler属性        //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache        //从而获取slot和JedisPool直接的映射        MetaObject metaObject = SystemMetaObject.forObject(jc);        JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");        //保存地址+端口和命令的映射        Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();        List<String> keyList = null;        JedisPool currentJedisPool = null;        Pipeline currentPipeline = null;                for(String key : keys){            //计算哈希槽            int crc = JedisClusterCRC16.getSlot(key);            //通过哈希槽获取节点的连接            currentJedisPool = cache.getSlotPool(crc);            //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的            //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样            //的,可以作为map的key            if(jedisPoolMap.containsKey(currentJedisPool)){                jedisPoolMap.get(currentJedisPool).add(key);            }else{                keyList = new ArrayList<>();                keyList.add(key);                jedisPoolMap.put(currentJedisPool, keyList);            }        }                //保存结果        List<Object> res = new ArrayList<>();        //执行        for(Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){            try {                currentJedisPool = entry.getKey();                keyList = entry.getValue();                //获取pipeline                currentPipeline = currentJedisPool.getResource().pipelined();                for(String key : keyList){                    currentPipeline.get(key);                }                //从pipeline中获取结果                res = currentPipeline.syncAndReturnAll();                currentPipeline.close();                for(int i=0; i<keyList.size(); i++){                    resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString());                }            } catch (Exception e) {                e.printStackTrace();                return new HashMap<>();            }        }        return resMap;    }}

推荐阅读