Java实现分布式系统限流

Java实现分布式系统限流

为何使用分布式系统限流:

在分布式环境中,我们的系统都是集群化部署,那么使用了单机版的限流策略,比如我们对某一个接口的限流方案是每秒钟最多10次请求,那么因为各个实例都会自己维护一份请求次数,所以真实每秒的请求数是:
节点数 * 每秒最多请求数,这样的话就超出了我们的预期;

分布式限流解决方案:

● 可以基于redis,做分布式限流
● 可以基于nginx做分布式限流
● 可以使用阿里开源的 sentinel 中间件

本次介绍使用 redis 做分布式限流

实现思路:

设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中根据该用户IP创建一个键,并此时我们就设置这个键的过期时间为60秒,当用户请求到来的时候,先去redis中根据用户ip获取这个用户当前分钟请求了多少次,如果获取不到,则说明这个用户当前分钟第一次访问,就创建这个健,并+1,如果获取到了就判断当前有没有超过我们限制的次数,如果到了我们限制的次数则禁止访问。

使用技术:使用redis提供的:incr命令 实现

先引入redis的依赖:

<dependency>             <groupId>redis.clients</groupId>             <artifactId>jedis</artifactId>             <version>2.9.0</version>         </dependency>         <dependency>             <groupId>com.alibaba</groupId>             <artifactId>fastjson</artifactId>             <version>1.2.70</version> </dependency>

redis配置类:

package org.xhs.redis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /**  * @Author: hu.chen  * @Description:  **/ public class RedisConfig {     // 服务器IP地址     private static String ADDR = "127.0.0.1";     // 端口     private static int PORT = 6379;     // 密码     private static String AUTH = null;     // 连接实例的最大连接数     private static int MAX_ACTIVE = 1024;     // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。     private static int MAX_IDLE = 200;     // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException     private static int MAX_WAIT = 10000;     // 连接超时的时间     private static int TIMEOUT = 10000;     // 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;     private static boolean TEST_ON_BORROW = true;         private static JedisPool jedisPool = null;     // 数据库模式是16个数据库 0~15     public static final int DEFAULT_DATABASE = 0;     /**      * 初始化Redis连接池      */     static {         try {             JedisPoolConfig config = new JedisPoolConfig();             config.setMaxTotal(MAX_ACTIVE);             config.setMaxIdle(MAX_IDLE);             config.setMaxWaitMillis(MAX_WAIT);             config.setTestOnBorrow(TEST_ON_BORROW);             jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT, AUTH, DEFAULT_DATABASE);         } catch (Exception e) {             e.printStackTrace();         }     }     /**      * 获取Jedis实例      */     public static Jedis getJedis() {         try {             if (jedisPool != null) {                 Jedis resource = jedisPool.getResource();                 return resource;             } else {                 return null;             }         } catch (Exception e) {             e.printStackTrace();             return null;         }     } }

redis工具类:

package org.xhs.redis; import redis.clients.jedis.Jedis; /**  * @Author: hu.chen  * @Description:  * @DateTime: 2022/1/21 1:06 PM  **/ public class RedisUtils {     /**      * 将指定的key递增1(可用于乐观锁)      *      * @param key      * @return      */     public static Long incr(final String key) {         Jedis jedis = RedisConfig.getJedis();         Long  incr = jedis.incr(key);         returnJedis(jedis);         return incr;     }     /**      * 给指定key设置过期时间      *      * @param key      * @param seconds      * @author ruan 2013-4-11      */     public static void expire(String key, int seconds) {         if (seconds <= 0) {             return;         }         Jedis jedis = RedisConfig.getJedis();         jedis.expire(key, seconds);         // 将连接还回连接池         returnJedis(jedis);     }     /**      * 回收jedis      *      * @param jedis      */     private static void returnJedis(Jedis jedis) {         if (jedis != null) {             jedis.close();         }     } }

实现:

package org.xhs.redis; import java.util.ArrayList; import java.util.List; /**  * @Author: hu.chen  * @Description:  **/ public class TestRedis {     /**      * 超时时间(单位秒)      */     private static int TIMEOUT = 30;     /**      * 每分钟的请求次数限制      */     private static int COUNT = 10;     public static void main(String[] args) {         List<UserRequest> tasks = new ArrayList();         // 准备工作,先初始化 10个线程(用户),这10个用户同时访问一个接口         for (int i = 1; i <= 12; i++) {             String ip = "127.0.0." + i;             String userName = "chenhu_";             String interfaceName = "user/find_" + i;             tasks.add(new UserRequest(ip, userName, interfaceName));         }         for (UserRequest request : tasks) {             // 以用户名为键             if (isAccess(request.getUserName(), COUNT)) {                 System.err.println("用户:"+request.getUserName()+" 当前时间访问次数还未达到上限,可以访问");             } else {                 System.err.println("当前时间访问失败,"+request.getUserName()+"无法获取令牌");             }         }     }     /**      * 是否可以访问      *      * @return      */     private static boolean isAccess(String userName, long count) {         Long incr = RedisUtils.incr(userName);         if (incr == 1) {             RedisUtils.expire(userName, TIMEOUT);         }         if (count < incr) {             return false;         }         return true;     }     /**      * 实体对象      */     private static class UserRequest {         /**          * 请求用户ip          */         private String ip;         /**          * 用户名          */         private String userName;         /**          * 请求的接口名          */         private String interfaceName;         public UserRequest(String ip, String userName, String interfaceName) {             this.ip = ip;             this.userName = userName;             this.interfaceName = interfaceName;         }         public String getIp() {return ip;}         public String getUserName() { return userName;}         public String getInterfaceName() {return interfaceName;}     } }

推荐阅读