在分布式系统中,随着业务量的增长,如何保护核心资源、防止系统过载、保证系统的稳定性成为了一个重要的问题。限流算法作为一种有效的流量控制手段,被广泛应用于各类系统中。本文将详细介绍四种常见的限流算法、两种常用的限流器工具,从原理、源码的角度进行分析。
这种算法的基本思想是通过维护一个计数器,在特定的时间窗口内累计接收到的请求次数,当请求次数达到预设的阈值时,后续的请求会被限流或直接拒绝。
工作原理:
public interface RateLimiter {
? ?boolean acquire();
}
public class CounterBasedRateLimiter implements RateLimiter {
? ?/**
? ? * 窗口时长,毫秒级
? ? */
? ?private final long windowSize;
? ?/**
? ? * 限流大小
? ? */
? ?private final int limit;
? ?/**
? ? * 窗口开始时间,毫秒级
? ? */
? ?private long windowStartTime;
? ?/**
? ? * 计数器
? ? */
? ?private final AtomicInteger counter;
? ?public CounterBasedRateLimiter(long windowSize, int limit) {
? ? ? ?this.windowSize = windowSize;
? ? ? ?this.limit = limit;
? ? ? ?this.counter = new AtomicInteger();
? ? ? ?reset();
? }
? ?private synchronized void reset() {
? ? ? ?if (checkIfWindowValid()) {
? ? ? ? ? ?return;
? ? ? }
? ? ? ?windowStartTime = System.currentTimeMillis();
? ? ? ?counter.set(0);
? }
? ?private boolean checkIfWindowValid() {
? ? ? ?return System.currentTimeMillis() - windowStartTime <= windowSize;
? }
? ?@Override
? ?public boolean acquire() {
? ? ? ?// 检查窗口是否有效,若已无效则重置
? ? ? ?if (!checkIfWindowValid()) {
? ? ? ? ? ?reset();
? ? ? }
? ? ? ?return counter.incrementAndGet() <= limit;
? }
}
基于滑动窗口的限流算法是一种较为先进且灵活的流量控制技术,用于限制在一定时间窗口内某个资源的访问次数或流量。相较于简单的固定窗口计数器限流,滑动窗口算法能更好地处理请求的均匀分布和平滑限流,减少因为窗口切换带来的不连续性和峰值问题。
工作原理:
public class SlidingWindowRateLimiter implements RateLimiter {
? ?/**
? ? * 窗口时长,秒级
? ? */
? ?private final long windowSize;
? ?/**
? ? * 限流大小
? ? */
? ?private final int limit;
? ?/**
? ? * 滑动窗口
? ? */
? ?private final ConcurrentHashMap<Long, AtomicInteger> window;
? ?/**
? ? * 当前滑动窗口的起始区间
? ? */
? ?private long windowStart;
? ?/**
? ? * 当前总数
? ? */
? ?private final AtomicInteger total;
? ?public SlidingWindowRateLimiter(long windowSize, int limit) {
? ? ? ?this.windowSize = windowSize;
? ? ? ?this.limit = limit;
? ? ? ?this.window = new ConcurrentHashMap<>();
? ? ? ?this.total = new AtomicInteger();
? ? ? ?this.windowStart = System.currentTimeMillis() / 1000;
? }
? ?private synchronized void refresh(long cur) {
? ? ? ?// 再次检查
? ? ? ?if (window.containsKey(cur)) {
? ? ? ? ? ?return;
? ? ? }
? ? ? ?// 清理过期区间数据
? ? ? ?long newWindowStart = cur - windowSize + 1;
? ? ? ?for (long i = windowStart; i < newWindowStart; i++) {
? ? ? ? ? ?AtomicInteger removed = window.remove(i);
? ? ? ? ? ?if (removed != null) {
? ? ? ? ? ? ? ?total.addAndGet(-removed.intValue());
? ? ? ? ? }
? ? ? }
? ? ? ?windowStart = newWindowStart;
? ? ? ?// 最后加上新的时间区间
? ? ? ?window.putIfAbsent(cur, new AtomicInteger(0));
? }
? ?@Override
? ?public boolean acquire() {
? ? ? ?long cur = System.currentTimeMillis() / 1000;
? ? ? ?// 检查当前时间区间是否已初始化,若未初始化,则进行初始化
? ? ? ?if (!window.containsKey(cur)) {
? ? ? ? ? ?refresh(cur);
? ? ? }
? ? ? ?// 尝试从滑动窗口获取元素
? ? ? ?while (!Thread.interrupted()) {
? ? ? ? ? ?int curTotal = total.get();
? ? ? ? ? ?if (curTotal + 1 > limit) {
? ? ? ? ? ? ? ?return false;
? ? ? ? ? }
? ? ? ? ? ?if (total.compareAndSet(curTotal, curTotal + 1)) {
? ? ? ? ? ? ? ?window.get(cur).incrementAndGet();
? ? ? ? ? ? ? ?return true;
? ? ? ? ? }
? ? ? }
? ? ? ?return false;
? }
}
基于漏桶(Leaky Bucket)的限流算法是一种在网络传输和系统资源管理中广泛应用的流量整形和控制技术。该算法的核心理念是模拟一个带有小孔的桶,其中水代表流入系统的请求或数据包,桶则象征系统的处理能力。
工作原理:
public class LeakyBucketRateLimiter implements RateLimiter {
? ?/**
? ? * 流速,每秒漏rate个
? ? */
? ?private final int rate;
? ?/**
? ? * 桶大小
? ? */
? ?private final int bucketSize;
? ?/**
? ? * 漏桶
? ? */
? ?private final BlockingQueue<Object> bucket;
? ?public LeakyBucketRateLimiter(int bucketSize, int rate) {
? ? ? ?this.bucketSize = bucketSize;
? ? ? ?this.rate = rate;
? ? ? ?bucket = new ArrayBlockingQueue<>(bucketSize);
? ? ? ?new Thread(this::leaky).start();
? }
? ?public void leaky() {
? ? ? ?// 按照规定速度漏
? ? ? ?while (!Thread.interrupted()) {
? ? ? ? ? ?bucket.poll();
? ? ? ? ? ?sleep();
? ? ? }
? }
? ?@Override
? ?public boolean acquire() {
? ? ? ?// 尝试向桶插入元素,若有空间能插入返回true,否则返回false
? ? ? ?return bucket.offer(1);
? }
? ?public void sleep() {
? ? ? ?try {
? ? ? ? ? ?Thread.sleep(1000 / rate);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?Thread.currentThread().interrupt();
? ? ? }
? }
}
基于令牌桶(Token Bucket)的限流算法是一种在网络传输、系统资源调度和API调用限速等领域广泛应用的流量控制策略。该算法通过模拟一个不断填充令牌的桶来决定哪些请求可以被执行。
工作原理:
public class TokenBucketRateLimiter implements RateLimiter {
? ?/**
? ? * 每秒补充的令牌数
? ? */
? ?private final int rate;
? ?/**
? ? * 最大令牌数量
? ? */
? ?private final int bucketSize;
? ?/**
? ? * 令牌桶
? ? */
? ?private final AtomicInteger bucket;
? ?public TokenBucketRateLimiter(int rate, int bucketSize) {
? ? ? ?this.rate = rate;
? ? ? ?this.bucketSize = bucketSize;
? ? ? ?bucket = new AtomicInteger(0);
? ? ? ?new Thread(this::refill).start();
? }
? ?public void refill() {
? ? ? ?// 定时补充令牌
? ? ? ?while (!Thread.interrupted()) {
? ? ? ? ? ?if (bucket.get() < bucketSize) {
? ? ? ? ? ? ? ?bucket.incrementAndGet();
? ? ? ? ? }
? ? ? ? ? ?sleep();
? ? ? }
? }
? ?@Override
? ?public boolean acquire() {
? ? ? ?// 尝试获取令牌
? ? ? ?while (!Thread.interrupted()) {
? ? ? ? ? ?int cur = bucket.get();
? ? ? ? ? ?if (cur == 0) {
? ? ? ? ? ? ? ?return false;
? ? ? ? ? }
? ? ? ? ? ?if (bucket.compareAndSet(cur, cur - 1)) {
? ? ? ? ? ? ? ?return true;
? ? ? ? ? }
? ? ? }
? ? ? ?return false;
? }
? ?public void sleep() {
? ? ? ?try {
? ? ? ? ? ?Thread.sleep(1000 / rate);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?Thread.currentThread().interrupt();
? ? ? }
? }
}
对比基于计数的限流算法(这里指固定窗口计数器算法)、滑动窗口算法、漏桶算法和令牌桶算法:
基于计数的限流算法(固定窗口) | 滑动窗口算法 | 漏桶算法 | 令牌桶算法 | |
---|---|---|---|---|
原理 | 固定时间段内计数,超限则限流 | 时间窗口内细分计数,逐个窗口检查 | 请求进入队列并按恒定速率流出 | 按固定速率填充令牌,请求需消耗令牌才能处理 |
特点 | 粗粒度限流,易实现 | 精细化限流,更平滑 | 控制流出速率,无视突发请求 | 允许一定突发流量,同时保持平均速率 |
灵活性 | 较差,窗口切换时可能出现突刺 | 较好,连续性和稳定性较好 | 良好,恒定处理速率 | 最佳,可调节限流速率和突发处理能力 |
突发处理 | 不允许 | 有一定处理能力 | 不允许 | 允许 |
平滑性 | 较差 | 较好 | 很好 | 很好 |
实现难度 | 简单 | 中等 | 简单到中等 | 中等到复杂 |
适用场景 | 对简单限流需求,如基础并发控制 | 需要更精细流量控制的场景 | 稳定性要求高,流量整形 | 网络传输、接口限流、既要限速又要允许突发流量 |
Guava RateLimiter 是 Google Guava 库中提供的一个强大的限流工具类,主要用于控制系统的吞吐量或请求频率,防止服务因短时间内接收到过多请求而过载。RateLimiter 实现了令牌桶算法,可以按指定的速率发放令牌,请求到来时只有拿到令牌才能继续执行。
依赖:
<dependencies>
? ?<!-- Google Guava -->
? ?<dependency>
? ? ? ?<groupId>com.google.guava</groupId>
? ? ? ?<artifactId>guava</artifactId>
? ? ? ?<version>31.1-jre</version>
? ?</dependency>
</dependencies>
RateLimiter rateLimiter = RateLimiter.create(10);
double acquire = rateLimiter.acquire(); // 返回等待时间
Guava RateLimiter 采用的是令牌桶算法。采用了非常巧妙地方式实现,和上述介绍令牌桶算法小节的代码有所差异。Guava RateLimiter采用时间差计算令牌+提前消费令牌+睡眠等待的机制实现令牌桶算法。
主要逻辑如下:
storedPermits
:表示当前令牌桶中存储的令牌数量。maxPermits
:表示令牌桶的最大容量,即最多能存储多少令牌。stableIntervalMicros
:稳定状态下,生成一个令牌所需的固定时间间隔,单位是微秒(microsecond)。nextFreeTicketMicros
:下一次请求能够获取令牌的时间点,这个时间会被不断推后以保证稳定的速率。requiredPermits
)和当前时间(nowMicros
)。resync(nowMicros)
来同步令牌桶的状态,确保令牌生成逻辑与当前时间一致。storedPermitsToSpend
)。freshPermits
)以及为此需要等待的时间(waitMicros
)。nextFreeTicketMicros
,将其推进到下一个可供发放令牌的时间点。storedPermits
)中,但不超过最大令牌数(maxPermits
)。nextFreeTicketMicros
更新为当前时间,从而恢复正常的令牌发放节奏。resync过程:
获取令牌:
整体来看,这段代码通过reserveEarliestAvailable()
方法实现了动态调整令牌发放策略,确保限流器在不同的请求情况下都能维持预期的稳定速率,同时允许在令牌充足时快速响应请求,在令牌不足时则合理安排等待时间。
public abstract class RateLimiter {
?@CanIgnoreReturnValue
?public double acquire(int permits) {
? ?// 预留permits个令牌,返回需要等待的时长 microsToWait
? ?long microsToWait = reserve(permits);
? ?// 睡眠 microsToWait
? ?stopwatch.sleepMicrosUninterruptibly(microsToWait);
? ?return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
? ?
?final long reserve(int permits) {
? ?// 检查参数
? ?checkPermits(permits);
? ?synchronized (mutex()) {
? ? ?// 预留permits个令牌,并且获取需要等待的时长。并且传入当前时间
? ? ?return reserveAndGetWaitLength(permits, stopwatch.readMicros());
? }
}
? ?
?final long reserveAndGetWaitLength(int permits, long nowMicros) {
? ?long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
? ?return max(momentAvailable - nowMicros, 0);
}
}
abstract class SmoothRateLimiter extends RateLimiter {
/** The currently stored permits. 当前剩余令牌数*/
double storedPermits;
/** The maximum number of stored permits. 最大令牌数*/
double maxPermits;
/**
* 生成1个令牌的时间间隔
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;
/**
* 下一次释放令牌的时间
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
// 用于返回的时间,用于睡眠。如果当前有令牌,返回nowMicros;否则返回下一次有令牌的时间
long returnValue = nextFreeTicketMicros;
// 算出当前可以被消费的令牌数storedPermitsToSpend、提前消费的令牌数freshPermits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
// 算出提前消费的令牌导致的等待时间waitMicros、下一次释放令牌的时间nextFreeTicketMicros
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 消费令牌
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
void resync(long nowMicros) {
// 如果当前时间已经超过了释放令牌的时间,则需要更新
if (nowMicros > nextFreeTicketMicros) {
// 计算当前时间 - 释放令牌时间,算出时间间隔。除以1个时间间隔,算出在此时间间隔会生成多少个令牌。最大不得超过maxPermits个令牌
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
// 更新令牌释放时间
nextFreeTicketMicros = nowMicros;
}
}
}
Redisson RateLimiter 是 Redisson 客户端库提供的一种分布式限流器实现,它基于 Redis 的强大数据结构和 Lua 脚本支持,能够在分布式环境下实现高效的限流功能。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
RRateLimiter rateLimiter = redissonClient.getRateLimiter("rateLimiter");
// 全局限流,每小时不超过100个请求
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
// 单客户端限流,每分钟不超过5个请求
//rateLimiter.trySetRate(RateType.PER_CLIENT, 5, 1, RateIntervalUnit.MINUTES);
// 尝试获取一个令牌,如果获取成功则返回true,失败则返回false
boolean permitted = rateLimiter.tryAcquire();
// 或者尝试获取多个令牌,指定最长等待时间
//boolean permitted = rateLimiter.tryAcquire(3, 1000, TimeUnit.MILLISECONDS); // 尝试获取3个令牌,最多等待1秒
Redisson的RateLimiter通过lua脚本保证执行的原子性。主要采用固定窗口的算法。
限流器主要由两个key组成:
主要的方法包括trySetRateAsync,用于初始化限流器;tryAcquireAsync用于获取令牌。
从下列lua脚本可以看出,方法的主要作用就是设置元数据。
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
// 异步执行一个Redis命令,尝试设置限流器的速率、间隔和类型
return commandExecutor.evalWriteAsync(
// 设置Redis key为限流器的名字
getName(),
// 使用LongCodec实例进行序列化和反序列化
LongCodec.INSTANCE,
// 使用EVAL_BOOLEAN命令,表示执行Lua脚本并期望返回一个布尔值
RedisCommands.EVAL_BOOLEAN,
// Lua脚本内容:
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); " // 1. 尝试设置哈希表中key为'rate'的字段,其值为ARGV[1](即限流速率rate)
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); " // 2. 尝试设置哈希表中key为'interval'的字段,其值为ARGV[2](即限流间隔转换成毫秒后的值)
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);", // 3. 尝试设置哈希表中key为'type'的字段,其值为ARGV[3](即限流类型的枚举序号)
// Lua脚本参数,第一个元素是keys数组,这里是限流器名字
Collections.<Object>singletonList(getName()),
// Lua脚本的额外参数,分别对应限流速率、限流间隔(转换为毫秒)和限流类型(转换为枚举序号)
rate, unit.toMillis(rateInterval), type.ordinal());
}
这段代码实现了一个异步方法,用于尝试从Redis存储的限流器中获取指定数量的令牌。
Lua脚本首先读取限流器的相关配置,然后根据令牌计数器当前的值判断是否可以发放令牌,并进行相应的增减操作。
如果令牌发放成功,返回nil;如果不成功,则返回令牌计数器的剩余生存时间。
整个过程都在Redis中完成,实现了高效的分布式限流控制。
// Java方法定义,异步尝试获取令牌(Try acquiring asynchronously)
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
// 使用Redis命令执行器执行Lua脚本,并返回异步结果
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// Lua脚本开始
"local rate = redis.call('hget', KEYS[1], 'rate'); " // 1. 从哈希表中获取限流速率(rate)
+ "local interval = redis.call('hget', KEYS[1], 'interval'); " // 2. 从哈希表中获取限流间隔(interval)
+ "local type = redis.call('hget', KEYS[1], 'type'); " // 3. 从哈希表中获取限流类型(type)
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized') " // 4. 断言限流器已初始化
+ "local valueName = KEYS[2]; " // 5. 初始化变量valueName,指向全局令牌计数器的键名
+ "if type == '1' then " // 6. 如果限流类型为某种特定类型(此处可能是客户端级别的限流)
+ "valueName = KEYS[3]; " // 7. 将valueName指向客户端令牌计数器的键名
+ "end; "
+ "local currentValue = redis.call('get', valueName); " // 8. 获取当前令牌计数器的值
+ "if currentValue ~= false then " // 9. 如果当前令牌计数器存在值
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then " // 10. 如果当前令牌数量小于请求的令牌数
+ "return redis.call('pttl', valueName); " // 11. 返回令牌计数器剩余的生存时间(毫秒)
+ "else " // 12. 否则,当前令牌数量足够
+ "redis.call('decrby', valueName, ARGV[1]); " // 13. 从令牌计数器中减去请求的令牌数
+ "return nil; " // 14. 返回nil,表示成功获取令牌(异步方法中,nil通常表示正常执行)
+ "end; "
+ "else " // 15. 如果当前令牌计数器不存在值
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); " // 16. 断言请求令牌数不超过限流速率
+ "redis.call('set', valueName, rate, 'px', interval); " // 17. 重新设置令牌计数器的值,并设置过期时间
+ "redis.call('decrby', valueName, ARGV[1]); " // 18. 从令牌计数器中减去请求的令牌数
+ "return nil; " // 19. 返回nil,表示成功获取令牌
+ "end; ",
// Lua脚本参数列表,包含限流器的名字、全局令牌计数器的键名和客户端令牌计数器的键名
Arrays.<Object>asList(getName(), getValueName(), getClientValueName()),
// Lua脚本中的额外参数,分别是请求的令牌数量和当前Redis连接管理器的ID
value, commandExecutor.getConnectionManager().getId());
}
除了Guava RateLimiter 和 Redisson RateLimiter,还有许多其他的限流工具和技术,下面是其中一部分:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。