系统接口的速率限制各种方式实践,看完你就都懂了

清风孤客 2024-12-11 09:38:11

在现代 Web 和移动应用程序中,API 是不同组件、服务和用户之间通信的支柱。但是,随着 API 使用量的增长,存在系统过载的风险,从而导致性能下降甚至服务中断。防止此类问题的最有效方法之一是通过 API 速率限制。

是指限制用户或系统在特定时间范围内可以向 API 发出的请求数的做法,该时间范围以每秒或每分钟的请求数来衡量。这可确保没有单个用户或客户端压垮 API,从而允许公平使用并保护后端免受过多流量的淹没。

在本文中,我们将探讨可用的不同速率限制策略、它们的使用案例以及实施这些策略以保护 API 免受过载的最佳实践。

 

为什么 API 速率限制很重要?

API 速率限制对于以下方面至关重要:

防止恶意泛洪和拒绝服务 (DOS) 攻击。

保持 API 性能和可靠性。

确保用户之间的公平使用。

防止过度使用的云服务带来高成本。

常见的 API 速率限制策略

可以在 API 网关、负载均衡器等中实施多种速率限制策略。

1. 固定窗口限速

此策略涉及对固定时间窗口内允许的请求数量设置固定限制,例如每分钟 100 个请求。计数器在窗口结束时重置。主要的缺点是可能会出现 “雷霆万众”的问题。如果多个用户在窗口重置之前达到其限制,则系统可能会面临流量高峰,从而导致过载。

import time class FixedWindowRateLimiter:     def __init__(self, limit, window_size):         self.limit = limit         self.window_size = window_size         self.requests = []     def is_allowed(self):         current_time = time.time()         self.requests = [req for req in self.requests if req > current_time - self.window_size]         # Check if the number of requests in the current window exceeds the limit         if len(self.requests) < self.limit:             self.requests.append(current_time)             return True         else:             return False # Example usage limiter = FixedWindowRateLimiter(limit=5, window_size=60)  # 5 requests per minute for _ in range(7):     if limiter.is_allowed():         print("Request allowed")     else:         print("Rate limit exceeded")     time.sleep(10)  # Sleep for 10 seconds between requests

2. 滑动窗口限速

此策略试图通过根据请求时间戳动态移动窗口来解决 “雷霆群” 的问题。

在这种方法中,窗口不断向前移动,并根据最近的时间段对请求进行计数,从而实现更顺畅的流量分配,并且不太可能导致突然的突发。允许用户在任何 60 秒内发出 100 个请求。如果他们在 30 秒前发出请求,则在接下来的 30 秒内只能再发出 99 个请求。与固定窗口策略相比,它的实现和管理稍微复杂一些。

import time from collections import deque class SlidingWindowRateLimiter:     def __init__(self, limit, window_size):         self.limit = limit         self.window_size = window_size         self.requests = deque()     def is_allowed(self):         current_time = time.time()         while self.requests and self.requests[0] < current_time - self.window_size:             self.requests.popleft()         if len(self.requests) < self.limit:             self.requests.append(current_time)             return True         else:             return False # Example usage limiter = SlidingWindowRateLimiter(limit=5, window_size=60)   for _ in range(7):     if limiter.is_allowed():         print("Request allowed")     else:         print("Rate limit exceeded")     time.sleep(10)  # Sleep for 10 seconds between requests

3. Token Bucket 限速

Token Bucket 是使用最广泛的算法之一。在这种方法中,令牌以固定速率生成并存储在存储桶中。每个请求都会从存储桶中删除一个令牌。如果存储桶为空,则请求将被拒绝,直到生成新令牌。

此算法需要仔细跟踪令牌和存储桶状态,并且可能会在实现过程中引入一些复杂性。它比固定窗口或滑动窗口更灵活,允许请求激增,同时随着时间的推移强制执行最大速率。

import time class TokenBucketRateLimiter:     def __init__(self, rate, capacity):         self.rate = rate           self.capacity = capacity           self.tokens = capacity         self.last_checked = time.time()     def is_allowed(self):         current_time = time.time()         elapsed = current_time - self.last_checked         self.tokens += elapsed * self.rate         if self.tokens > self.capacity:             self.tokens = self.capacity         self.last_checked = current_time         if self.tokens >= 1:             self.tokens -= 1             return True         else:             return False # Example usage limiter = TokenBucketRateLimiter(rate=1, capacity=5)  for _ in range(7):     if limiter.is_allowed():         print("Request allowed")     else:         print("Rate limit exceeded")     time.sleep(1)  # Sleep for 1 second between requests

4. 漏桶限速

与令牌桶算法类似,泄漏桶模型通过控制进入系统的请求流来强制实施最大速率。

在此模型中,请求以不同的速率添加到“存储桶”中,但存储桶以固定速率泄漏。如果存储桶溢出,则拒绝进一步的请求。此策略有助于消除突发流量,同时确保以恒定速率处理请求。与令牌桶类似,实现起来可能很复杂,尤其是对于请求流量变化较大的系统。

import time class LeakyBucketRateLimiter:     def __init__(self, rate, capacity):         self.rate = rate           self.capacity = capacity          self.water_level = 0           self.last_checked = time.time()     def is_allowed(self):         current_time = time.time()         elapsed = current_time - self.last_checked         self.water_level -= elapsed * self.rate         if self.water_level < 0:             self.water_level = 0         self.last_checked = current_time         if self.water_level < self.capacity:             self.water_level += 1             return True         else:             return False # Example usage limiter = LeakyBucketRateLimiter(rate=1, capacity=5)  for _ in range(7):     if limiter.is_allowed():         print("Request allowed")     else:         print("Rate limit exceeded")     time.sleep(1)  # Sleep for 1 second between requests

5. 基于 IP 的速率限制

在此策略中,根据用户的 IP 地址应用速率限制。这可确保来自单个 IP 地址的请求被限制在特定阈值内。使用 VPN 或代理的用户可以绕过此方法。此外,它可能会不公平地影响共享 IP 地址的用户。

import time class IpRateLimiter:     def __init__(self, limit, window_size):         self.limit = limit         self.window_size = window_size         self.ip_requests = {}     def is_allowed(self, ip):         current_time = time.time()         if ip not in self.ip_requests:             self.ip_requests[ip] = []         self.ip_requests[ip] = [req for req in self.ip_requests[ip] if req > current_time - self.window_size]         if len(self.ip_requests[ip]) < self.limit:             self.ip_requests[ip].append(current_time)             return True         else:             return False # Example usage limiter = IpRateLimiter(limit=5, window_size=60)   for ip in ['192.168.1.1', '192.168.1.2']:     for _ in range(7):         if limiter.is_allowed(ip):             print(f"Request from {ip} allowed")         else:             print(f"Rate limit exceeded for {ip}")         time.sleep(10)  # Sleep for 10 seconds between requests

6. 基于用户的速率限制

这是一种更加个性化的速率限制策略,其中限制应用于每个单独的用户或经过身份验证的账户,而不是他们的 IP 地址。对于,可以根据其帐户(例如,通过 API 密钥或 OAuth 令牌)进行速率限制。

import time class UserRateLimiter:     def __init__(self, limit, window_size):         self.limit = limit         self.window_size = window_size         self.user_requests = {}     def is_allowed(self, user_id):         current_time = time.time()         if user_id not in self.user_requests:             self.user_requests[user_id] = []         self.user_requests[user_id] = [req for req in self.user_requests[user_id] if req > current_time - self.window_size]         if len(self.user_requests[user_id]) < self.limit:             self.user_requests[user_id].append(current_time)             return True         else:             return False # Example usage limiter = UserRateLimiter(limit=5, window_size=60)  for user_id in ['user1', 'user2']:     for _ in range(7):         if limiter.is_allowed(user_id):             print(f"Request from {user_id} allowed")         else:             print(f"Rate limit exceeded for {user_id}")         time.sleep(10)  # Sleep for 10 seconds between requests

实施速率限制的最佳实践

使用明确的错误响应,通常为“429 Too Many Requests”。

基于上下文和因素(如用户角色、API 终端节点或订阅层级)的速率限制。

根据 API 的需求,在不同级别(例如,全局、每用户、每 IP)进行精细限制。

记录和监控速率限制,以识别潜在的滥用或误用模式。

对高度分布式系统使用 Redis 或类似的缓存解决方案。

使用指数回退以递增的延迟间隔重试。

结论

API 速率限制是 的一个关键方面,可确保性能、可靠性和安全性。通过根据系统需求选择适当的策略并有效监控使用模式,即使在流量较重的情况下,也可以保持 API 的运行状况和性能。速率限制不仅仅是一种防御措施;它是构建可扩展且强大的 Web 服务不可或缺的一部分。

0 阅读:9