在现代 Web 和移动应用程序中,API 是不同组件、服务和用户之间通信的支柱。但是,随着 API 使用量的增长,存在系统过载的风险,从而导致性能下降甚至服务中断。防止此类问题的最有效方法之一是通过 API 速率限制。
是指限制用户或系统在特定时间范围内可以向 API 发出的请求数的做法,该时间范围以每秒或每分钟的请求数来衡量。这可确保没有单个用户或客户端压垮 API,从而允许公平使用并保护后端免受过多流量的淹没。在本文中,我们将探讨可用的不同速率限制策略、它们的使用案例以及实施这些策略以保护 API 免受过载的最佳实践。
为什么 API 速率限制很重要?
API 速率限制对于以下方面至关重要:
防止恶意泛洪和拒绝服务 (DOS) 攻击。
保持 API 性能和可靠性。
确保用户之间的公平使用。
防止过度使用的云服务带来高成本。
常见的 API 速率限制策略
可以在 API 网关、负载均衡器等中实施多种速率限制策略。
1. 固定窗口限速
此策略涉及对固定时间窗口内允许的请求数量设置固定限制,例如每分钟 100 个请求。计数器在窗口结束时重置。主要的缺点是可能会出现 “雷霆万众”的问题。如果多个用户在窗口重置之前达到其限制,则系统可能会面临流量高峰,从而导致过载。
import time class FixedWindowRateLimiter: def __init__(self, limit, window_size): self.limit = limit self.window_size = window_size self.requests = [] def is_allowed(self): current_time = time.time() self.requests = [req for req in self.requests if req > current_time - self.window_size] # Check if the number of requests in the current window exceeds the limit if len(self.requests) < self.limit: self.requests.append(current_time) return True else: return False # Example usage limiter = FixedWindowRateLimiter(limit=5, window_size=60) # 5 requests per minute for _ in range(7): if limiter.is_allowed(): print("Request allowed") else: print("Rate limit exceeded") time.sleep(10) # Sleep for 10 seconds between requests
2. 滑动窗口限速
此策略试图通过根据请求时间戳动态移动窗口来解决 “雷霆群” 的问题。
在这种方法中,窗口不断向前移动,并根据最近的时间段对请求进行计数,从而实现更顺畅的流量分配,并且不太可能导致突然的突发。允许用户在任何 60 秒内发出 100 个请求。如果他们在 30 秒前发出请求,则在接下来的 30 秒内只能再发出 99 个请求。与固定窗口策略相比,它的实现和管理稍微复杂一些。
import time from collections import deque class SlidingWindowRateLimiter: def __init__(self, limit, window_size): self.limit = limit self.window_size = window_size self.requests = deque() def is_allowed(self): current_time = time.time() while self.requests and self.requests[0] < current_time - self.window_size: self.requests.popleft() if len(self.requests) < self.limit: self.requests.append(current_time) return True else: return False # Example usage limiter = SlidingWindowRateLimiter(limit=5, window_size=60) for _ in range(7): if limiter.is_allowed(): print("Request allowed") else: print("Rate limit exceeded") time.sleep(10) # Sleep for 10 seconds between requests
3. Token Bucket 限速
Token Bucket 是使用最广泛的算法之一。在这种方法中,令牌以固定速率生成并存储在存储桶中。每个请求都会从存储桶中删除一个令牌。如果存储桶为空,则请求将被拒绝,直到生成新令牌。
此算法需要仔细跟踪令牌和存储桶状态,并且可能会在实现过程中引入一些复杂性。它比固定窗口或滑动窗口更灵活,允许请求激增,同时随着时间的推移强制执行最大速率。
import time class TokenBucketRateLimiter: def __init__(self, rate, capacity): self.rate = rate self.capacity = capacity self.tokens = capacity self.last_checked = time.time() def is_allowed(self): current_time = time.time() elapsed = current_time - self.last_checked self.tokens += elapsed * self.rate if self.tokens > self.capacity: self.tokens = self.capacity self.last_checked = current_time if self.tokens >= 1: self.tokens -= 1 return True else: return False # Example usage limiter = TokenBucketRateLimiter(rate=1, capacity=5) for _ in range(7): if limiter.is_allowed(): print("Request allowed") else: print("Rate limit exceeded") time.sleep(1) # Sleep for 1 second between requests
4. 漏桶限速
与令牌桶算法类似,泄漏桶模型通过控制进入系统的请求流来强制实施最大速率。
在此模型中,请求以不同的速率添加到“存储桶”中,但存储桶以固定速率泄漏。如果存储桶溢出,则拒绝进一步的请求。此策略有助于消除突发流量,同时确保以恒定速率处理请求。与令牌桶类似,实现起来可能很复杂,尤其是对于请求流量变化较大的系统。
import time class LeakyBucketRateLimiter: def __init__(self, rate, capacity): self.rate = rate self.capacity = capacity self.water_level = 0 self.last_checked = time.time() def is_allowed(self): current_time = time.time() elapsed = current_time - self.last_checked self.water_level -= elapsed * self.rate if self.water_level < 0: self.water_level = 0 self.last_checked = current_time if self.water_level < self.capacity: self.water_level += 1 return True else: return False # Example usage limiter = LeakyBucketRateLimiter(rate=1, capacity=5) for _ in range(7): if limiter.is_allowed(): print("Request allowed") else: print("Rate limit exceeded") time.sleep(1) # Sleep for 1 second between requests
5. 基于 IP 的速率限制
在此策略中,根据用户的 IP 地址应用速率限制。这可确保来自单个 IP 地址的请求被限制在特定阈值内。使用 VPN 或代理的用户可以绕过此方法。此外,它可能会不公平地影响共享 IP 地址的用户。
import time class IpRateLimiter: def __init__(self, limit, window_size): self.limit = limit self.window_size = window_size self.ip_requests = {} def is_allowed(self, ip): current_time = time.time() if ip not in self.ip_requests: self.ip_requests[ip] = [] self.ip_requests[ip] = [req for req in self.ip_requests[ip] if req > current_time - self.window_size] if len(self.ip_requests[ip]) < self.limit: self.ip_requests[ip].append(current_time) return True else: return False # Example usage limiter = IpRateLimiter(limit=5, window_size=60) for ip in ['192.168.1.1', '192.168.1.2']: for _ in range(7): if limiter.is_allowed(ip): print(f"Request from {ip} allowed") else: print(f"Rate limit exceeded for {ip}") time.sleep(10) # Sleep for 10 seconds between requests
6. 基于用户的速率限制
这是一种更加个性化的速率限制策略,其中限制应用于每个单独的用户或经过身份验证的账户,而不是他们的 IP 地址。对于,可以根据其帐户(例如,通过 API 密钥或 OAuth 令牌)进行速率限制。
import time class UserRateLimiter: def __init__(self, limit, window_size): self.limit = limit self.window_size = window_size self.user_requests = {} def is_allowed(self, user_id): current_time = time.time() if user_id not in self.user_requests: self.user_requests[user_id] = [] self.user_requests[user_id] = [req for req in self.user_requests[user_id] if req > current_time - self.window_size] if len(self.user_requests[user_id]) < self.limit: self.user_requests[user_id].append(current_time) return True else: return False # Example usage limiter = UserRateLimiter(limit=5, window_size=60) for user_id in ['user1', 'user2']: for _ in range(7): if limiter.is_allowed(user_id): print(f"Request from {user_id} allowed") else: print(f"Rate limit exceeded for {user_id}") time.sleep(10) # Sleep for 10 seconds between requests
实施速率限制的最佳实践
使用明确的错误响应,通常为“429 Too Many Requests”。
基于上下文和因素(如用户角色、API 终端节点或订阅层级)的速率限制。
根据 API 的需求,在不同级别(例如,全局、每用户、每 IP)进行精细限制。
记录和监控速率限制,以识别潜在的滥用或误用模式。
对高度分布式系统使用 Redis 或类似的缓存解决方案。
使用指数回退以递增的延迟间隔重试。
结论
API 速率限制是 的一个关键方面,可确保性能、可靠性和安全性。通过根据系统需求选择适当的策略并有效监控使用模式,即使在流量较重的情况下,也可以保持 API 的运行状况和性能。速率限制不仅仅是一种防御措施;它是构建可扩展且强大的 Web 服务不可或缺的一部分。