超大用户多租户 LLM 并发解决方案
本文档针对海量租户(数千到数万个租户,每个租户 5-15 用户)场景,提供完整的多租户 LLM 并发管理方案。
文档版本:v1.0
创建日期:2026-05-05
方案状态:规划中
负责人:技术团队
一、设计理念与核心目标
1.1 背景与挑战
| 挑战维度 | 现状 | 目标 |
|---|---|---|
| 租户规模 | 单租户 10 用户 | 扩展到 10,000+ 租户,每租户 5-15 用户 |
| LLM 并发 | 单 credential 15-30 并发 | 多 credential 聚合,支持 1000+ 并发 |
| 模型种类 | 单模型(MiniMax) | 主模型 + Fallback 模型 + 辅助模型 |
| 可用性 | 无容灾 | 多层级熔断降级,99.9% 可用性 |
| 成本 | 不可预测 | 精确配额控制和成本追踪 |
1.2 核心设计原则
- 分层架构:接入层 → 路由层 → 模型层 → 隔离层
- Credential 池化:多个 API Key 统一管理,动态负载均衡
- 模型路由:主模型优先,fallback 兜底,辅助模型分流
- 租户隔离:资源配额硬隔离 + 公平调度软隔离
- 可观测性:全链路 metrics + tracing + logging
1.3 名词定义
| 术语 | 说明 |
|---|---|
| Credential | 一组 LLM API 认证信息(API Key + 对应模型) |
| Credential Pool | 同一模型的多个 Credential 组成的池子 |
| 模型组 | 主模型 + N 个 Fallback 模型 + 辅助模型的集合 |
| 租户组 | 按优先级/套餐划分的租户集合 |
| 调度单元 | 一次 LLM 调用请求的基本单位 |
二、Credential Pool 管理体系
2.1 Credential 池化架构
什么是 Credential Pool
Credential Pool(凭证池)是管理多个 LLM API 凭证(API Key)的核心组件。在超大用户多租户场景下,单个 API Key 的并发能力和请求配额远远无法满足需求。通过池化技术,我们将多个同模型或跨模型的 API Key 统一管理、动态调度,实现并发聚合与高可用保障。
Credential Pool 本质上是一个有状态的资源调度器,它维护着一组 Credential 实例的状态(可用/繁忙/熔断/下线),并根据负载均衡策略对外提供统一的获取(acquire)和释放(release)接口。每次 LLM 调用请求从池中获取一个可用的 Credential,使用完毕后再归还到池中,形成完整的生命周期管理。
为什么需要池化
在超大规模多租户场景下,池化是解决以下痛点的必由之路:
| 痛点维度 | 问题描述 | 池化解决思路 |
|---|---|---|
| 并发瓶颈 | 单个 API Key 并发上限 15-30,远低于实际需求 | 多 Key 并发聚合,线性扩展吞吐 |
| 配额限制 | 云厂商对单 Key 有 RPM/TPM 限制 | 分散请求到多个 Key,均摊配额压力 |
| 单点故障 | 单个 Key 异常或限流导致服务不可用 | 自动熔断摘除 + 故障 Key 隔离 |
| 成本不透明 | 无法精细控制各租户/模型的费用 | 按 Key 维度统计使用量,精确计费 |
| 扩展性差 | 新增 Key 需要修改业务代码 | 配置化声明,动态感知无需重启 |
池化的核心优势
1. 聚合并发能力
通过池化 N 个各带 20 并发的 Credential,理论上可支持 N×20 的并发吞吐。在实际压测中,8 个 MiniMax API Key 聚合后可稳定支撑 160 并发请求,P99 延迟保持在 800ms 以内,相比单 Key 场景吞吐量提升约 8 倍。
2. 提升可用性
池化架构天然支持高可用设计:
- 冗余备份:当某个 Key 被限流或异常时,其他 Key 继续服务
- 自动熔断:连续失败触发熔断,新请求不再分配到故障 Key
- 渐进恢复:故障 Key 恢复后逐步引入流量,避免瞬间压垮
3. 成本优化
通过精细化的 Key 管理和配额控制,可以:
- 识别长期闲置或低效使用的 Key,及时释放资源
- 按模型/租户维度统计用量,实现精准成本分摊
- 结合业务峰谷特征,在低峰期减少活跃 Key 数量
池子的分类
Credential Pool 按模型类型进行分组隔离,不同模型组之间相互独立:
| 池子类型 | 模型示例 | 典型配置 |
|---|---|---|
| 主模型池 | MiniMax-M2.7-highspeed | 每个 Key 20 并发,活跃 8 个 |
| Fallback 池 | DeepSeek-V3、Qwen-Max | 每个 Key 15 并发,活跃 4 个 |
| 辅助模型池 | Embedding、 Whisper | 每个 Key 50 并发,活跃 2 个 |
| 测试/预发池 | 各模型 dev 环境 | 每个 Key 5 并发,按需启用 |
同一种模型可配置多个独立的池子以应对不同租户组,例如面向大客户的专属池(独占资源)与面向普通用户的共享池(复用资源)可以并存。
2.2 Credential 管理接口
核心数据结构
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
import threading
class CredentialStatus(Enum):
ACTIVE = "active" # 正常运行,可分配
BUSY = "busy" # 达到并发上限,暂停分配
CIRCUIT_BROKEN = "circuit_broken" # 熔断中,不分配
RECOVERING = "recovering" # 恢复中,逐步放量
RETIRED = "retired" # 已下线,不再使用
@dataclass
class Credential:
"""
单个 LLM API 凭证的抽象封装。
包含认证信息、状态信息、运行时统计三大类属性。
"""
# === 认证信息(创建时确定,运行时不变)===
api_key: str # API Key 明文(仅在创建时传入)
model: str # 对应模型名称,如 "MiniMax-M2.7-highspeed"
base_url: str # API 端点,如 "https://api.minimax.chat"
max_concurrency: int = 20 # 该 credential 的最大并发上限
weight: float = 1.0 # 权重因子,用于加权负载均衡
# === 运行时状态(线程安全访问)===
current_load: int = 0 # 当前正在使用的数量(原子操作)
status: CredentialStatus = CredentialStatus.ACTIVE
last_used: Optional[datetime] = None # 上一次使用时间
acquired_count: int = 0 # 累计获取次数
released_count: int = 0 # 累计释放次数
# === 健康相关统计 ===
health_score: float = 1.0 # 健康分 0.0-1.0,由 HealthChecker 更新
consecutive_errors: int = 0 # 连续错误次数(成功一次清零)
consecutive_timeouts: int = 0 # 连续超时次数(成功一次清零)
recent_success_count: int = 0 # 窗口内成功次数
recent_failure_count: int = 0 # 窗口内失败次数
recent_request_count: int = 0 # 窗口内总请求数
recent_latency_sum: float = 0.0 # 窗口内延迟总和(ms)
recent_latency_count: int = # 窗口内请求计数
# === 熔断恢复相关 ===
circuit_break_time: Optional[datetime] = None # 熔断触发时间
recovery_attempts: int = 0 # 恢复尝试次数
last_health_check: Optional[datetime] = None # 上次健康检查时间
# === 内部锁 ===
_lock: threading.RLock = field(default_factory=threading.RLock)
@property
def recent_success_rate(self) -> float:
"""计算窗口内成功率"""
total = self.recent_success_count + self.recent_failure_count
if total == 0:
return 1.0
return self.recent_success_count / total
@property
def recent_avg_latency(self) -> float:
"""计算窗口内平均延迟"""
if self.recent_latency_count == 0:
return 0.0
return self.recent_latency_sum / self.recent_latency_count
@property
def is_available(self) -> bool:
"""判断当前是否可分配"""
return (
self.status in (CredentialStatus.ACTIVE, CredentialStatus.RECOVERING)
and self.current_load < self.max_concurrency
)
def try_acquire(self) -> bool:
"""
尝试获取该 credential。
成功返回 True 并递增 current_load;失败返回 False。
"""
with self._lock:
if not self.is_available:
return False
self.current_load += 1
self.last_used = datetime.now()
self.acquired_count += 1
return True
def release(self):
"""释放该 credential,递减 current_load"""
with self._lock:
if self.current_load > 0:
self.current_load -= 1
self.released_count += 1
def record_success(self, latency_ms: float):
"""记录一次成功的请求"""
with self._lock:
self.recent_success_count += 1
self.recent_request_count += 1
self.recent_latency_sum += latency_ms
self.recent_latency_count += 1
self.consecutive_errors = 0
self.consecutive_timeouts = 0
def record_failure(self, is_timeout: bool = False):
"""记录一次失败的请求"""
with self._lock:
self.recent_failure_count += 1
self.recent_request_count += 1
self.consecutive_errors += 1
if is_timeout:
self.consecutive_timeouts += 1
else:
self.consecutive_timeouts = 0 # 非超时错误不计入连续超时
def reset_window_stats(self):
"""重置窗口统计(由定时任务调用)"""
with self._lock:
self.recent_success_count = 0
self.recent_failure_count = 0
self.recent_request_count = 0
self.recent_latency_sum = 0.0
self.recent_latency_count = 0
class CredentialPool:
"""
同一模型的多个 Credential 的统一调度器。
提供 acquire / release / health_check / rotate 四个核心接口。
"""
def __init__(
self,
model: str,
base_url: str,
strategy: str = "least_load", # round_robin | least_load | weighted_random | sticky
sticky_tenant_window_seconds: int = 300,
):
self.model = model
self.base_url = base_url
self.strategy = strategy
self.sticky_tenant_window = sticky_tenant_window_seconds
self._credentials: list[Credential] = []
self._lock = threading.RLock()
self._round_robin_index = 0 # 轮询策略的当前位置
# 粘性会话:tenant_id -> (credential, expire_time)
self._sticky_map: dict[str, tuple[Credential, datetime]] = {}
# === 核心管理接口 ===
def register(self, api_key: str, max_concurrency: int = 20, weight: float = 1.0):
"""向池中注册一个新的 Credential"""
cred = Credential(
api_key=api_key,
model=self.model,
base_url=self.base_url,
max_concurrency=max_concurrency,
weight=weight,
)
with self._lock:
self._credentials.append(cred)
return cred
def acquire(self, tenant_id: str = None, priority: int = 0) -> Optional[Credential]:
"""
从池中获取一个可用的 Credential。
参数:
tenant_id: 租户 ID,用于粘性会话策略
priority: 优先级(暂未使用,预留)
返回:
可用的 Credential 实例,或 None(池中没有可用 Credential)
"""
with self._lock:
candidates = [c for c in self._credentials if c.is_available]
if not candidates:
return None
chosen = self._select_one(candidates, tenant_id)
if chosen and chosen.try_acquire():
return chosen
return None
def release(self, credential: Credential):
"""将 Credential 归还到池中"""
credential.release()
def health_check(self, credential: Credential) -> bool:
"""
对指定 Credential 执行主动健康检查。
返回 True 表示健康,False 表示应熔断。
"""
# 构造一个最小化的 test 请求
import time
start = time.time()
try:
import httpx
resp = httpx.get(
credential.base_url.rstrip("/") + "/health",
headers={"Authorization": f"Bearer {credential.api_key}"},
timeout=5.0,
)
latency = (time.time() - start) * 1000
if resp.status_code == 200:
credential.record_success(latency)
return True
credential.record_failure()
return False
except Exception:
credential.record_failure(is_timeout=True)
return False
def rotate(self) -> list[Credential]:
"""
负载均衡策略触发时调用,返回当前所有 active credential。
"""
with self._lock:
# 清理过期粘性映射
now = datetime.now()
expired = [
tid for tid, (_, exp) in self._sticky_map.items() if now > exp
]
for tid in expired:
del self._sticky_map[tid]
return [
c for c in self._credentials
if c.status in (CredentialStatus.ACTIVE, CredentialStatus.RECOVERING)
]
# === 负载均衡策略实现 ===
def _select_one(
self, candidates: list[Credential], tenant_id: str = None
) -> Optional[Credential]:
if not candidates:
return None
if self.strategy == "round_robin":
return self._round_robin(candidates)
elif self.strategy == "least_load":
return self._least_load(candidates)
elif self.strategy == "weighted_random":
return self._weighted_random(candidates)
elif self.strategy == "sticky":
return self._sticky(candidates, tenant_id)
else:
return self._least_load(candidates)
def _round_robin(self, candidates: list[Credential]) -> Credential:
"""轮询策略:按注册顺序均匀分发请求"""
n = len(candidates)
idx = self._round_robin_index % n
self._round_robin_index = idx + 1
return candidates[idx]
def _least_load(self, candidates: list[Credential]) -> Credential:
"""最少负载策略:优先选择当前 load 最低的"""
return min(candidates, key=lambda c: (c.current_load, -c.health_score))
def _weighted_random(self, candidates: list[Credential]) -> Credential:
"""加权随机策略:根据健康分和权重综合计算概率"""
import random
weights = [c.health_score * c.weight for c in candidates]
total = sum(weights)
if total <= 0:
return random.choice(candidates)
normalized = [w / total for w in weights]
return random.choices(candidates, weights=normalized, k=1)[0]
def _sticky(
self, candidates: list[Credential], tenant_id: str = None
) -> Optional[Credential]:
"""粘性会话策略:同一租户尽量复用同一个 Credential"""
if not tenant_id:
return self._least_load(candidates)
now = datetime.now()
if tenant_id in self._sticky_map:
cached_cred, expire_time = self._sticky_map[tenant_id]
# 如果缓存的 credential 仍然可用,直接复用
if cached_cred.is_available:
return cached_cred
else:
# 已不可用,删除缓存,fallback 到 least_load
del self._sticky_map[tenant_id]
# 选择一个可用的 credential 并建立粘性关联
chosen = self._least_load(candidates)
self._sticky_map[tenant_id] = (
chosen,
now + timedelta(seconds=self.sticky_tenant_window),
)
return chosen
def get_stats(self) -> dict:
"""获取池子的整体统计信息"""
with self._lock:
return {
"model": self.model,
"total_credentials": len(self._credentials),
"active_count": sum(
1 for c in self._credentials
if c.status == CredentialStatus.ACTIVE
),
"busy_count": sum(
1 for c in self._credentials
if c.status == CredentialStatus.BUSY
),
"circuit_broken_count": sum(
1 for c in self._credentials
if c.status == CredentialStatus.CIRCUIT_BROKEN
),
"total_load": sum(c.current_load for c in self._credentials),
"total_acquired": sum(c.acquired_count for c in self._credentials),
}
接口说明
| 接口 | 签名 | 说明 |
|---|---|---|
register |
(api_key, max_concurrency, weight) |
动态注册新的 Credential 到池中 |
acquire |
(tenant_id, priority) -> Credential |
获取可用 Credential,阻塞至超时或成功 |
release |
(credential) |
归还 Credential,更新 load 计数 |
health_check |
(credential) -> bool |
主动探测Credential 可用性 |
rotate |
() -> list[Credential] |
触发负载均衡,返回当前有效 Credential 列表 |
get_stats |
() -> dict |
获取池子整体状态快照 |
2.3 负载均衡策略
四种策略详解
Credential Pool 支持四种负载均衡策略,可通过配置切换。策略的选择直接影响请求分发效率、延迟表现和资源利用率。
1. 轮询(Round Robin)
轮询策略按顺序逐个分发请求到池中的每个 Credential,循环往复。其实现最为简单,开销极低,适用于所有 Credential 规格一致、负载相对稳定的场景。
优势:实现简单,无状态,请求分布可预测
劣势:无法感知各 Credential 的实际负载差异,可能导致"快的被慢的拖累"
适用场景:Credential 规格完全一致,业务负载平稳
2. 最少负载(Least Load)
最少负载策略每次选择当前 current_load 最小的 Credential,实现真正的"能者多劳"。这是生产环境最常用的策略,能有效利用空闲资源。
实现逻辑:
def _least_load(self, candidates):
return min(candidates, key=lambda c: (c.current_load, -c.health_score))
当多个 Credential load 相同时,退化为按健康分排序,选择更健康的。
优势:充分利用空闲资源请求分配均匀
劣势:高负载时可能产生"惊群效应"(所有请求同时打到同一个 Credential)
适用场景:大部分生产环境场景,推荐作为默认策略
3. 加权随机(Weighted Random)
根据各 Credential 的 weight × health_score 计算选择概率,权重越高的被选中概率越大。相比轮询更加平滑,适合对延迟敏感但可接受轻微分布不均的场景。
实现逻辑:
def _weighted_random(self, candidates):
weights = [c.health_score * c.weight for c in candidates]
return random.choices(candidates, weights=weights, k=1)[0]
优势:实现简单,天然实现优雅降级(不健康Credential 被选中概率降低)
劣势:短时间内的分布不如轮询均匀
适用场景:Credential 规格不一致,或需要按权重分配流量的场景
4. 粘性会话(Sticky)
同一租户的请求在一定时间窗口内尽量路由到同一个 Credential,以保证会话一致性(如上下文缓存、Token 限制等)。这是多租户场景下保障用户体验的重要手段。
工作原理:
- 租户首次请求时,通过 Least Load 选择一个 Credential
- 建立
tenant_id → Credential的映射,设置过期时间(默认 5 分钟) - 后续请求优先使用映射中的 Credential;若已不可用则重新选择
- 过期后重新走选择逻辑
def _sticky(self, candidates, tenant_id):
if tenant_id in self._sticky_map:
cached_cred, expire_time = self._sticky_map[tenant_id]
if cached_cred.is_available:
return cached_cred
chosen = self._least_load(candidates)
self._sticky_map[tenant_id] = (chosen, now + timedelta(seconds=self.sticky_window))
return chosen
优势:保证同一租户的会话一致性,利于上下文复用
劣势:可能造成负载不均(热门租户独占某 Credential)
适用场景:需要保持会话上下文的场景,或对延迟稳定性要求高的场景
策略对比与选择建议
| 策略 | 实现复杂度 | 延迟波动 | 资源利用率 | 推荐场景 |
|---|---|---|---|---|
| Round Robin | ★☆☆☆☆ | 低 | 中 | 规格一致的固定场景 |
| Least Load | ★★☆☆☆ | 低 | 高 | 通用生产环境首选 |
| Weighted Random | ★★☆☆☆ | 中 | 高 | 规格不一致的异构环境 |
| Sticky | ★★★☆☆ | 高 | 中 | 会话一致性敏感场景 |
选择建议:
- 通用场景推荐
least_load,兼顾实现简单性和资源利用率 - 若业务需要会话粘性,可在
least_load基础上叠加 sticky 功能 - 若 Credential 规格差异大(如有些 Key 并发上限 20,有些只有 10),使用
weighted_random round_robin仅推荐在 Credential 完全同构、负载稳定的小规模场景使用
2.4 健康检查机制
健康检查是 Credential Pool 高可用保障的核心环节,分为主动探测和被动检测两条路径,两者相互补充。
主动探测
主动探测由 HealthChecker 定时执行,通过发送真实的 API 请求来验证 Credential 的可用性。相比被动检测,主动探测能更早发现问题(在用户请求失败之前就发现问题 Credential)。
配置参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
interval_seconds |
30 | 健康检查执行间隔 |
test_request_timeout |
5.0 | 单次探测超时时间(秒) |
enabled |
true | 是否启用主动探测 |
探测逻辑:
class HealthChecker:
def __init__(self, pool: CredentialPool, config: dict):
self.pool = pool
self.interval = config.get("interval_seconds", 30)
self.timeout = config.get("test_request_timeout", 5.0)
self._running = False
async def run(self):
"""定时任务入口"""
while self._running:
for cred in self.pool.rotate():
is_healthy = self.pool.health_check(cred)
self._update_status(cred, is_healthy)
await asyncio.sleep(self.interval)
def _update_status(self, cred: Credential, is_healthy: bool):
if is_healthy:
if cred.status == CredentialStatus.RECOVERING:
# 恢复中,逐步放量
cred.recovery_attempts += 1
# 恢复进度达到阈值后转为 ACTIVE
if cred.recovery_attempts >= 3:
cred.status = CredentialStatus.ACTIVE
cred.recovery_attempts = 0
elif cred.status == CredentialStatus.CIRCUIT_BROKEN:
# 从熔断中恢复,先进入 RECOVERING 状态
cred.status = CredentialStatus.RECOVERING
cred.recovery_attempts = 0
else:
if cred.status in (CredentialStatus.ACTIVE, CredentialStatus.RECOVERING):
# 触发熔断
cred.status = CredentialStatus.CIRCUIT_BROKEN
cred.circuit_break_time = datetime.now()
被动检测
被动检测依赖每次实际请求的结果来更新 Credential 的健康状态,无需额外的探测流量。被动检测数据是触发熔断的主要依据。
记录时机:
record_success(latency_ms):每次请求成功(HTTP 2xx)时调用record_failure(is_timeout):每次请求失败(HTTP 4xx/5xx 或异常)时调用
统计窗口:默认使用 100 次请求为一个滑动窗口,计算近期成功率、平均延迟等指标。
熔断器模式
熔断器采用三状态机模型:CLOSED(正常)→ OPEN(熔断)→ HALF_OPEN(半开试探)。
触发熔断的条件(满足任一即触发):
class HealthChecker:
def should_circuit_break(self, cred: Credential) -> bool:
"""
连续3次超时 或 成功率<80% → 熔断
"""
# 条件1:连续超时达到阈值
if cred.consecutive_timeouts >= 3:
return True
# 条件2:滑动窗口内成功率低于阈值
if cred.recent_request_count >= 10: # 至少积累10次请求才判断
if cred.recent_success_rate < 0.8:
return True
return False
熔断后的恢复流程:
- OPEN 状态:立即摘除,所有请求不再分配到该 Credential
-
HALF_OPEN 状态(触发条件:熔断超过
recovery_timeout后首次探测): -
允许少量请求通过(逐步放量)
-
成功则进入 RECOVERING;失败则重新进入 OPEN
-
RECOVERING 状态:
-
连续 N 次探测成功(N=3)后转为 ACTIVE
- 恢复期间允许递增比例的流量通过(如 25% → 50% → 100%)
健康分计算
健康分 health_score 是一个 0.0-1.0 的浮点数,由 HealthChecker 定期更新:
def compute_health_score(cred: Credential) -> float:
"""
综合考虑:成功率、延迟、连续错误次数
"""
# 基础分 1.0
score = 1.0
# 成功率扣分(权重 50%)
success_rate = cred.recent_success_rate
score -= (1.0 - success_rate) * 0.5
# 延迟扣分(权重 30%)
# 以 500ms 为基准,每超过 100ms 扣 0.05
avg_latency = cred.recent_avg_latency
if avg_latency > 500:
penalty = ((avg_latency - 500) / 100) * 0.05
score -= min(penalty * 0.3, 0.3)
# 连续错误扣分(权重 20%)
# 每连续错误 1 次扣 0.05
error_penalty = min(cred.consecutive_errors * 0.05, 0.2)
score -= error_penalty
return max(0.0, min(1.0, score))
2.5 Credential 的动态伸缩
自动增减策略
Credential Pool 支持根据实际负载动态调整活跃 Credential 数量,避免资源浪费或资源不足。
扩容触发条件:
- 所有活跃 Credential 的平均负载率超过 80%(即
total_load / total_max_concurrency > 0.8) - 扩容后最多不超过配置的上限
缩容触发条件:
- 所有活跃 Credential 的平均负载率低于 30% 持续 N 分钟(N 可配置)
- 缩容后至少保留最小活跃数量(通常为 1-2 个)
class AutoScaler:
def __init__(self, pool: CredentialPool, config: dict):
self.pool = pool
self.max_active = config.get("max_active", 8)
self.min_active = config.get("min_active", 1)
self.scale_up_threshold = config.get("scale_up_threshold", 0.8)
self.scale_down_threshold = config.get("scale_down_threshold", 0.3)
self.scale_down_cooldown_minutes = config.get("scale_down_cooldown", 10)
self._last_scale_down_time = None
def maybe_scale(self):
stats = self.pool.get_stats()
total_max = sum(c.max_concurrency for c in self.pool._credentials
if c.status != CredentialStatus.RETIRED)
total_load = stats["total_load"]
load_ratio = total_load / total_max if total_max > 0 else 0
active_count = stats["active_count"] + stats["busy_count"]
if load_ratio > self.scale_up_threshold and active_count < self.max_active:
self._scale_up()
elif load_ratio < self.scale_down_threshold and active_count > self.min_active:
if self._can_scale_down():
self._scale_down()
def _scale_up(self):
"""激活一个备用 Credential"""
for cred in self.pool._credentials:
if cred.status == CredentialStatus.RETIRED:
cred.status = CredentialStatus.ACTIVE
logger.info(f"Scaled up credential: {cred.api_key[:8]}...")
break
def _scale_down(self):
"""将一个低负载 Credential 置为休眠态"""
for cred in self.pool._credentials:
if cred.status == CredentialStatus.ACTIVE and cred.current_load == 0:
cred.status = CredentialStatus.RETIRED
self._last_scale_down_time = datetime.now()
logger.info(f"Scaled down credential: {cred.api_key[:8]}...")
break
云厂商配额监控联动
现代云厂商(MiniMax、DeepSeek 等)对 API Key 有 RPM(每分钟请求数)和 TPM(每分钟 Token 数)限制。当接近配额时需要提前预警或自动扩容。
配额监控方案:
| 监控指标 | 数据来源 | 告警阈值 | 自动处置 |
|---|---|---|---|
| Key 级 RPM | 云厂商 API / 自身统计 | > 80% 配额 | 扩容或切换 Key |
| Key 级 TPM | 云厂商 API / 自身统计 | > 80% 配额 | 扩容或切换 Key |
| QPM(每秒查询数) | 应用层 metrics | > 上游限速 90% | 触发限流 |
配额数据获取通常有三种途径:
1. 云厂商 Console API:实时查询 Key 的使用量和配额
2. 429 响应码识别:检测到限流后自动切换 Key
3. 自身统计:在应用层累计每个 Key 的请求次数
冷启动延迟规避
新增 Credential 初始化时存在冷启动问题——新 Key 在首次接受大流量时可能出现性能抖动。采用预热(Warmup)机制解决:
def warmup_credential(cred: Credential, target_qps: float = 5.0, duration_seconds: int = 60):
"""
新 Credential 激活后,先以低 QPS 预热,逐渐提升到目标 QPS
"""
import time
interval = 1.0 / target_qps
elapsed = 0
while elapsed < duration_seconds:
# 发送少量预热请求
try:
httpx.get(
cred.base_url + "/chat/completions",
headers={"Authorization": f"Bearer {cred.api_key}"},
json={"model": cred.model, "messages": [{"role": "user", "content": "hi"}]},
timeout=5.0,
)
except Exception:
pass
time.sleep(interval)
elapsed += interval
# 线性提升 QPS
interval = max(0.2, interval * 0.95)
2.6 安全与合规
API Key 存储
API Key 是敏感凭证,必须加密存储,严禁明文出现在配置文件或代码中。
推荐方案:使用专业密钥管理服务
| 方案 | 特点 | 适用场景 |
|---|---|---|
| AWS Secrets Manager | 完全托管,按调用次数计费,支持自动轮换 | AWS 环境 |
| HashiCorp Vault | 开源,支持多种后端,可自部署 | 跨云 / 混合云 |
| Azure Key Vault | 微软生态集成,原生支持 AAD | Azure 环境 |
| Kubernetes Secrets + Encryption | K8s 原生,需要额外配置静态加密 | K8s 部署 |
配置化注入流程(以 Vault 为例):
# docker-compose.yml 或 K8s Deployment
environment:
- MINIMAX_KEY_1: ${VAULT_SECRET_PATH:path=secret/data/minimax,key=api_key_1}
- MINIMAX_KEY_2: ${VAULT_SECRET_PATH:path=secret/data/minimax,key=api_key_2}
应用启动时从 Vault 拉取 Key 注入环境变量,运行时 Key 不落盘、不打印日志。
轮转机制
为降低 Key 泄露风险和满足合规要求,应定期更换 API Key。轮转过程需要不影响线上服务,采用"先增后删"策略:
轮转步骤:
1. 向池中新增新 Key,状态为 ACTIVE,开始接受流量
2. 观察新 Key 运行稳定(建议至少 1 个健康检查周期)
3. 逐步将旧 Key 的流量降为零(可通过权重调整为 0 或手动下线)
4. 旧 Key 下线后,在云厂商控制台删除或禁用
灰度轮转配置:
credential_pools:
minimax:
credentials:
# 旧 Key,权重逐渐降低
- api_key: "${MINIMAX_KEY_OLD}"
max_concurrency: 20
weight: 0.3 # 逐步降低
draining: true # 开启优雅下线,不再接收新请求
# 新 Key,全量接收
- api_key: "${MINIMAX_KEY_NEW}"
max_concurrency: 20
weight: 1.0
审计日志
每次 Credential 的获取(acquire)和释放(release)都应记录审计日志,用于安全分析和故障排查。
审计日志字段:
| 字段 | 类型 | 说明 |
|---|---|---|
timestamp |
ISO8601 | 操作时间 |
event_type |
string | acquire / release / circuit_break / rotate |
credential_id |
string | Key 的唯一标识(哈希值,不记录明文) |
tenant_id |
string | 发起操作的租户 ID |
model |
string | 模型名称 |
current_load |
int | 操作后的负载值 |
latency_ms |
float | 请求耗时(仅 release 时记录) |
success |
bool | 请求是否成功(仅 release 时记录) |
日志示例:
{
"timestamp": "2026-05-05T20:15:30.123Z",
"event_type": "acquire",
"credential_id": "minimax_key_a1b2c3d4",
"tenant_id": "tenant_001",
"model": "MiniMax-M2.7-highspeed",
"current_load": 12
}
隔离原则
不同租户组之间的 Credential 必须严格隔离,防止相互影响。
隔离层级:
| 隔离级别 | 说明 | 实现方式 |
|---|---|---|
| 池级隔离 | 不同租户组使用完全独立的 Credential Pool | 不同 Pool 实例 |
| 配额隔离 | 租户组间有独立的 RPM/TPM 配额限制 | 限流系统独立计数 |
| ** Key 级隔离** | 高优先级租户独占高质量 Key | 专属 Pool + 共享 Pool 分级 |
| 审计隔离 | 每个租户的操作日志独立存储 | 多租户日志 namespace |
# 示例:租户组配置
tenant_groups:
enterprise:
pool_name: "minimax_enterprise" # 专属池
priority: 100
standard:
pool_name: "minimax_shared" # 共享池
priority: 50
trial:
pool_name: "minimax_trial" # 试用池
priority: 10
2.7 配置示例
以下是一个完整的 credential_pools 配置示例,涵盖 MiniMax 主模型池和 DeepSeek Fallback 池两大类:
# ============================================================
# Credential Pool 配置
# ============================================================
credential_pools:
# 主模型池:MiniMax 系列
minimax:
model: "MiniMax-M2.7-highspeed"
base_url: "https://api.minimax.chat/v1"
strategy: "least_load" # 默认负载均衡策略
sticky_tenant_window_seconds: 300 # 粘性会话窗口 5 分钟
credentials:
# Key 1:标准配置
- api_key: "${MINIMAX_KEY_1}"
max_concurrency: 20
weight: 1.0
# Key 2:标准配置
- api_key: "${MINIMAX_KEY_2}"
max_concurrency: 20
weight: 1.0
# Key 3:低权重,备用
- api_key: "${MINIMAX_KEY_3}"
max_concurrency: 15
weight: 0.8
# Key 4:仅在扩容时启用
- api_key: "${MINIMAX_KEY_4}"
max_concurrency: 20
weight: 0.5
enabled: false # 默认不启用,由 AutoScaler 控制
health_check:
enabled: true
interval_seconds: 30 # 每 30 秒执行一次健康检查
test_request_timeout: 5 # 探测超时 5 秒
circuit_break_threshold:
consecutive_errors: 3 # 连续 3 次错误触发熔断
success_rate_window: 100 # 滑动窗口 100 次请求
min_success_rate: 0.8 # 成功率低于 80% 触发熔断
recovery:
recovery_timeout_seconds: 60 # 熔断 60 秒后尝试恢复
probes_required: 3 # 连续 3 次探测成功才完全恢复
autoscaler:
enabled: true
min_active: 2 # 最少保持 2 个活跃 Key
max_active: 8 # 最多 8 个活跃 Key
scale_up_threshold: 0.8 # 负载 > 80% 时扩容
scale_down_threshold: 0.3 # 负载 < 30% 时缩容
scale_down_cooldown_minutes: 10
# Fallback 池:DeepSeek 系列
deepseek:
model: "deepseek-chat"
base_url: "https://api.deepseek.com/v1"
strategy: "weighted_random" # DeepSeek Key 规格不一致,用加权随机
sticky_tenant_window_seconds: 300
credentials:
- api_key: "${DEEPSEEK_KEY_1}"
max_concurrency: 15
weight: 1.0
- api_key: "${DEEPSEEK_KEY_2}"
max_concurrency: 10
weight: 0.7
health_check:
enabled: true
interval_seconds: 30
test_request_timeout: 5
circuit_break_threshold:
consecutive_errors: 3
success_rate_window: 100
min_success_rate: 0.75 # Fallback 池成功率阈值稍低
autoscaler:
enabled: true
min_active: 1
max_active: 4
scale_up_threshold: 0.75
scale_down_threshold: 0.3
scale_down_cooldown_minutes: 15
# 辅助模型池:Embedding
embedding:
model: "text-embedding-3-small"
base_url: "https://api.openai.com/v1"
strategy: "least_load"
sticky_tenant_window_seconds: 600 # Embedding 会话窗口可以更长
credentials:
- api_key: "${OPENAI_KEY_1}"
max_concurrency: 50
weight: 1.0
health_check:
enabled: true
interval_seconds: 60 # Embedding 请求轻量,降低检查频率
test_request_timeout: 3
circuit_break_threshold:
consecutive_errors: 5
success_rate_window: 50
min_success_rate: 0.9
autoscaler:
enabled: false # Embedding 池规模小,不需要自动伸缩
2.8 与限流系统的联动
Credential Pool 与限流系统(Limiter)是两套互补的保护机制,需要紧密联动才能保障系统在高并发下的稳定性。
两级限流模型
| 限流维度 | 作用对象 | 触发条件 | 处置方式 |
|---|---|---|---|
| 租户级限流 | 单个租户的请求 | 该租户配额耗尽 | 拒绝请求,返回 429 |
| Credential 级限流 | 单个 API Key | 该 Key 配额耗尽或达到并发上限 | 切换到其他可用 Key |
层级关系:租户级限流是第一道防线,Credential 级限流是第二道防线。请求首先检查租户配额;通过后分配到某个 Credential;Credential 内部再受自身并发上限保护。
class TwoLevelRateLimiter:
def __init__(self, pool: CredentialPool, tenant_limiter: TenantLimiter):
self.pool = pool
self.tenant_limiter = tenant_limiter
def acquire_slot(self, tenant_id: str) -> Optional[tuple[Credential, str]]:
"""
尝试获取一个请求槽位。
返回 (credential, request_id) 或 None(被限流)。
"""
# 第一层:检查租户配额
if not self.tenant_limiter.try_acquire(tenant_id):
return None # 租户级限流触发
# 第二层:从池中获取可用 Credential
cred = self.pool.acquire(tenant_id=tenant_id)
if not cred:
# Credential 池耗尽,释放租户配额
self.tenant_limiter.release(tenant_id)
return None # 降级到下一种模型或返回限流
return cred
def release_slot(self, tenant_id: str, cred: Credential):
"""释放 slot"""
self.pool.release(cred)
self.tenant_limiter.release(tenant_id)
Credential 限流状态同步
当某个 Credential 达到并发上限(current_load >= max_concurrency)或被熔断时,限流系统需要实时感知这些状态变化,以调整对该 Key 的流量分发。
状态同步机制:
| 事件 | 触发时机 | 限流系统响应 |
|---|---|---|
BUSY |
current_load >= max_concurrency |
从分发候选中移除该 Key |
CIRCUIT_BROKEN |
连续失败触发熔断 | 从分发候选中移除,等待恢复 |
RECOVERING |
熔断恢复中 | 以较低权重逐步引入流量 |
ACTIVE |
完全恢复 | 恢复正常分发权重 |
# 限流系统订阅 Credential 状态变更事件
class CredentialEventSubscriber:
def __init__(self, limiter):
self.limiter = limiter
self.pool = None # 注入 pool 引用
def on_credential_status_change(self, cred: Credential, old_status, new_status):
if new_status == CredentialStatus.CIRCUIT_BROKEN:
self.limiter.set_key_weight(cred.api_key, 0) # 完全移除
elif new_status == CredentialStatus.RECOVERING:
self.limiter.set_key_weight(cred.api_key, 0.25) # 25% 权重
elif new_status == CredentialStatus.ACTIVE:
self.limiter.set_key_weight(cred.api_key, cred.weight) # 恢复
协同保护策略
在极端高并发场景下,两级限流需要协同工作,防止系统被压垮:
保护原则:
- 先到先得:租户配额和 Credential 并发配额均为先占式
- 快速失败:限流响应要快,避免线程阻塞
- 分级降级:优先保证高优先级租户,牺牲低优先级租户
- 优雅积累:被限流的请求应进入队列等待,而非直接丢弃
协同流程:
请求进入
↓
检查租户配额(TenantLimiter) → 耗尽 → 进入租户级等待队列
↓(通过)
从池中获取 Credential → 无可用 → 进入 Credential 级等待队列
↓(获取成功)
执行业务逻辑
↓
记录结果 → 更新 Credential 健康分和负载
↓
释放租户配额和 Credential slot
关键监控指标:
| 指标 | 告警阈值 | 说明 |
|---|---|---|
tenant_limiter_rejection_rate |
> 5% | 租户级限流拒绝率 |
credential_pool_exhausted_rate |
> 10% | Credential 池耗尽频率 |
acquire_latency_p99 |
> 100ms | 获取 Credential 的 P99 延迟 |
credential_busy_rate |
> 80% | Credential 繁忙率 |
三、多模型路由架构
在超大用户多租户场景下,LLM 调用的可靠性、稳定性和成本效率是企业面临的核心挑战。单模型架构无法同时满足质量、可用性和成本三个维度的需求,因此我们设计了一套完整的多模型路由架构,通过分层模型体系、智能路由策略和统一的模型接口,实现对多模型资源的高效调度与精细化管理。
3.1 三层模型体系
在超大规模并发场景下,单一模型架构存在明显的局限性:主模型承担全部流量时容易成为性能瓶颈和单点故障源;缺乏降级预案时任何模型异常都直接导致服务不可用;所有请求不论难易都使用高端模型则造成严重的成本浪费。因此,我们设计了主模型 + Fallback 模型 + 辅助模型的三层模型体系,各层各司其职、协同配合。
主模型:承载 80% 流量的质量担当
主模型是整个三层体系的核心,通常选择能力最强、效果最好的旗舰模型(如 MiniMax-M2.7-highspeed)。主模型承担系统中约 80% 的常规请求,这些请求对响应质量有较高要求,不能容忍明显的效果降级。主模型通常配置较高的并发上限和充足的 API Key 资源,配有独立的 Credential Pool 和健康监控体系,确保在高负载下依然保持稳定的服务质量。
主模型的设计原则是质量优先,即在能力范围内追求最好的生成效果,而非刻意降低成本。当主模型可用且负载未达上限时,所有请求都会优先路由到主模型,只有当主模型不可用或负载过高时,才会触发降级策略。这种设计确保了绝大多数用户能够享受到最佳的服务体验。
Fallback 模型:主模型不可用时的兜底保障
Fallback 模型是主模型的备用方案,当主模型出现以下情况时自动承接流量:主模型响应超时(通常设置为 30 秒阈值)、返回 429 限流错误、返回 5xx 服务器错误、连续失败触发熔断、或主模型所在池的并发已满无法获取资源。Fallback 模型的存在,使得系统在任何单点故障情况下都能继续提供服务,是保障高可用性的关键设计。
Fallback 模型通常选择与主模型能力接近但不完全相同的模型作为第一备选,再选择能力稍弱但成本更低的模型作为第二、第三备选,形成多级降级链。例如主模型为 MiniMax-M2.7-highspeed 时,第一 Fallback 可选 DeepSeek-V3(能力接近,成本更低),第二 Fallback 可选 GPT-4o-mini(生态成熟,稳定性高)。多级 Fallback 链确保了系统在多种故障场景下都能找到可用的模型进行响应。
辅助模型:轻量任务的专用处理器
辅助模型承担三类轻量级任务:Embedding 向量化任务(如文本相似度计算、向量检索)、文本摘要任务(如长文档的摘要提取)、翻译和关键词提取任务。这些任务的特点是输入输出相对固定、计算量小、对模型能力要求不高,使用高端旗舰模型会造成明显的资源浪费。
辅助模型采用专门的模型系列,如 OpenAI 的 text-embedding-3-small 用于向量嵌入、GPT-3.5-turbo 用于摘要和翻译。这些模型的单 Token 成本通常只有主模型的十分之一甚至更低,但完全能够满足上述轻量任务的质量需求。通过将辅助任务分流到专用模型,主模型的并发资源得以释放给真正需要高质量生成的核心任务,实现了资源的最优配置。
3.2 模型路由策略
多模型路由的核心问题是:在什么情况下,应该把请求路由到哪一个模型。我们设计了一套多维度综合决策的路由策略体系,包括优先级路由、任务路由、成本路由和负载路由四个维度,根据不同的业务场景灵活组合使用。
基于优先级的路由:主模型优先,失败则 Fallback
优先级路由是最基本也是最重要的路由策略,其核心原则是:尽可能使用最好的模型,只有当好模型不可用时才降级。实现上,优先级路由维护一个有序的模型列表(通常为 [主模型, Fallback_1, Fallback_2, ...]),按顺序尝试每个模型直到成功。
优先级路由的优势在于简单可靠,能够保证最好的服务质量下限。当主模型正常工作时,所有请求都使用主模型,用户体验最优;当主模型出现故障时,系统自动无缝切换到 Fallback,用户无感知。优先级路由的缺点是在主模型正常时无法利用低成本的 Fallback 模型,但由于主模型通常只占总流量的小部分(考虑到还有大量辅助任务),整体成本依然可控。
基于任务的路由:按任务类型选择模型
不同类型的任务对模型能力有不同要求,任务路由策略根据请求的类型特征选择最适合的模型。对于对话生成类任务(chat/completion),通常使用主模型以获得最佳对话体验;对于向量嵌入类任务(embedding/similarity),使用专用的 embedding 模型效果更好、成本更低;对于简单问答或分类任务(classification/keyword),可以降级到轻量模型。
任务路由的实现依赖于对请求的意图识别和类型标注。在请求入口处,系统会根据请求特征(如任务描述、选择的工具、API 调用参数等)自动判断任务类型,然后路由到对应类型的专用模型池。例如,当用户请求"帮我总结这段文字"时,系统识别为摘要任务,路由到 auxiliary 模型池中的 GPT-3.5-turbo;当用户请求"回答这个问题"时,识别为对话任务,路由到主模型池。
基于成本的路由:简单任务优先使用低单价模型
成本路由策略的核心思想是让每一分钱的投入都用在刀刃上。对于简单明确的任务,如果低单价模型能够同样胜任,就优先使用低单价模型。成本路由通常与任务路由配合使用,对于可以被辅助模型处理的简单任务,优先选择辅助模型而非主模型。
成本路由需要建立清晰的成本评估模型,包括每个模型的单价(元/千 Token)、预估输入 Token 数、预估输出 Token 数,从而计算出单次请求的预估成本。当某个请求的预估成本超过阈值(如 0.05 元)且可以降级时,系统会尝试将请求路由到更便宜的模型。成本路由的典型应用场景包括:大批量文档处理、客服机器人对话、每日报告生成等对单次效果不敏感但总量巨大的场景。
基于负载的路由:根据模型当前负载动态分配
负载路由策略根据各模型池的当前负载状态动态调整流量分配比例,避免某些模型过载而其他模型闲置。负载路由通常与优先级路由配合使用,在优先级相近的模型之间进行负载均衡。
负载感知的路由实现依赖于对每个模型池实时负载率的监控。当某个模型的负载率超过阈值(如 80%)时,新的请求会被部分或全部路由到负载较低的备选模型。例如,当主模型池的平均负载率超过 80% 时,新的请求会被优先路由到 Fallback 模型,直到主模型负载降至 60% 以下。负载路由策略确保了系统在高峰期的稳定性,避免因单模型过载导致的服务降级。
3.3 路由决策算法
路由决策是整个多模型路由架构的核心,它接收一个 ModelRequest 请求,经过一系列判断和决策,最终返回一个 ModelResponse 响应。下面是路由决策的完整算法实现:
def route(model_request: ModelRequest) -> ModelResponse:
"""
多模型路由决策算法
决策流程:
1. 解析请求,提取任务类型、租户信息、成本约束
2. 按优先级尝试每个模型
3. 每个模型检查:可用性、负载、配额、熔断状态
4. 成功则返回响应;失败则尝试下一个模型或降级
"""
# Step 1: 解析请求上下文
tenant_id = model_request.tenant_id
task_type = model_request.task_type # chat | completion | embedding | summarize
cost_budget = model_request.cost_budget # 可选的最高成本限制
# Step 2: 确定候选模型列表(按优先级排序)
candidates = get_candidate_models(task_type, tenant_id)
# Step 3: 依次尝试每个候选模型
last_error = None
for model_group, model_name, credential_pool in candidates:
# 3.1: 检查模型是否启用
if not is_model_enabled(model_group, model_name):
continue
# 3.2: 检查租户配额是否允许使用该模型
if not check_tenant_quota(tenant_id, model_group, model_name):
continue
# 3.3: 从模型对应的 Credential Pool 获取可用 Credential
credential = credential_pool.acquire(tenant_id=tenant_id)
if credential is None:
continue # 该模型池无可用资源,尝试下一个
# 3.4: 尝试调用模型
try:
response = call_model_with_timeout(credential, model_request)
credential_pool.release(credential)
return response # 成功,直接返回
except ModelTimeoutError:
credential_pool.release(credential)
last_error = "timeout"
continue # 超时,尝试下一个模型
except ModelRateLimitError:
credential_pool.release(credential)
# 429 错误,记录并尝试下一个模型
record_rate_limit(model_group, model_name)
continue
except ModelServerError:
credential_pool.release(credential)
last_error = "server_error"
# 5xx 错误,触发该模型的熔断检查
maybe_circuit_break(model_group, model_name, credential)
continue
except Exception as e:
credential_pool.release(credential)
last_error = str(e)
continue
# Step 4: 所有模型都失败,进入降级或排队
return enqueue_or_degrade(model_request, last_error)
def get_candidate_models(task_type: str, tenant_id: str) -> list:
"""
根据任务类型和租户信息获取候选模型列表
返回格式:[(model_group, model_name, credential_pool), ...]
"""
# 基于任务类型确定路由目标
if task_type in ("embedding", "similarity"):
# 向量嵌入任务,使用辅助模型池
return [("auxiliary", "text-embedding-3-small", embedding_pool)]
elif task_type in ("summarize", "translate", "keyword"):
# 摘要翻译任务,使用辅助模型池
return [("auxiliary", "gpt-3.5-turbo", auxiliary_pool),
("fallback", "deepseek-chat", fallback_pool_1)]
else:
# 对话生成任务,使用主模型 + Fallback 链
return [("primary", "minimax-m2.7-highspeed", primary_pool),
("fallback", "deepseek-chat", fallback_pool_1),
("fallback", "gpt-4o-mini", fallback_pool_2)]
上述算法确保了请求在多个模型之间的有序流转。在实际生产环境中,还需要考虑更多边界情况,如请求队列满、租户被禁用、模型全部熔断等场景。路由决策模块应当与监控告警系统联动,当发现异常路由模式(如大量请求同时涌入 Fallback)时及时发出告警。
3.4 Fallback 模型管理
Fallback 模型管理是多模型路由架构中高可用保障的关键环节。一套完善的 Fallback 管理机制需要解决四个核心问题:如何组织 Fallback 链、如何判断触发 Fallback 的条件、如何防止 Fallback 被滥用导致雪崩、以及主模型恢复后如何平滑切回。
Fallback 链:按优先级顺序尝试,直到成功或耗尽
Fallback 链是一组按优先级排序的模型列表,当主模型不可用时,系统按顺序尝试链上的每个模型,直到某个模型成功响应或所有模型都失败。Fallback 链的设计需要考虑两个因素:模型能力梯度和成本梯度。
典型的 Fallback 链设计为三到四级:第一级是能力最接近主模型的模型(如 DeepSeek-V3),用于主模型短暂不可用时的快速切换;第二级是能力稍弱但生态成熟的模型(如 GPT-4o-mini),用于较长停机时间的承接;第三级是成本极低的轻量模型(如 GPT-3.5-turbo),用于极端情况下的基本服务保障。每一级之间应当有明确的切换条件,确保不会轻易跨级降级。
Fallback 触发条件:超时 / 429 / 5xx 错误
系统会在以下几种情况下自动触发 Fallback:响应超时(通常为主模型超时阈值的 1.5-2 倍)、HTTP 429 限流错误(表明该模型的 API 配额已耗尽)、HTTP 5xx 服务器错误(表明模型服务提供商内部故障)、连续 N 次请求失败(N 可配置,通常为 3)。
触发条件的设置需要在灵敏度和稳定性之间取得平衡。条件设置过于宽松会导致本可以成功的请求被不必要地降级;条件设置过于严格则可能让用户等待过长时间。推荐的做法是设置双重条件:既设置超时阈值(如 30 秒),也设置错误计数阈值(如连续 3 次错误),只有同时满足才触发 Fallback。
Fallback 频率限制:防止全部涌向 Fallback 造成雪崩
当主模型出现故障时,如果所有请求同时涌入 Fallback 模型,可能导致 Fallback 模型也因过载而崩溃,形成"雪崩效应"。为防止这种情况,系统需要实现 Fallback 频率限制机制,控制在单位时间内允许触发 Fallback 的请求比例。
典型的频率限制策略是渐进式降级:当检测到主模型异常时,首先只将 10-20% 的新请求降级到 Fallback,观察 Fallback 的承载情况;如果 Fallback 运行稳定,则逐步提升降级比例(30%、50%、80%),直到主模型恢复或全部流量完成降级。这种渐进式策略能够有效吸收主模型的故障冲击,同时保护 Fallback 模型不被瞬间压垮。
Fallback 冷却期:主模型恢复后需要稳定观察期再切回
主模型从故障中恢复后,不能立即将所有流量切回,否则如果主模型尚未完全恢复就再次崩溃,会造成二次故障。Fallback 冷却期机制要求主模型恢复后必须经过一段稳定观察期,才能重新作为主模型承接流量。
冷却期的长度通常根据主模型的故障时长确定:故障时间越短,冷却期越短(最短可为 1 分钟);故障时间越长,冷却期越长(最长可达 10-15 分钟)。冷却期内,主模型会逐步承接少量流量进行"试探",只有试探请求全部成功才会完全恢复。在冷却期内,系统会持续监控主模型的健康指标(成功率、延迟),如果指标恶化则立即重新触发 Fallback。
3.5 辅助模型分流
辅助模型分流是将轻量级任务从主模型转移到专用辅助模型的过程,是降低成本、提升系统整体吞吐能力的重要手段。辅助模型分流需要解决三个问题:哪些任务适合分流、如何控制分流比例、以及如何保障分流后的结果质量。
哪些任务适合辅助模型:embedding、文本摘要、翻译、关键词提取
辅助模型适合处理输入输出明确、计算量小、对模型能力要求不高的任务,主要包括以下几类:
向量嵌入任务(Embedding):这类任务将文本转换为向量表示,用于相似度计算、向量检索、RAG(检索增强生成)等场景。向量嵌入任务对模型的要求相对简单,主要考察向量的语义表达能力,不需要复杂的推理和生成能力。专用的 embedding 模型(如 text-embedding-3-small)在这些任务上表现优秀,成本却只有主模型的百分之一。
文本摘要任务(Summarize):对长文档或对话记录进行摘要提取,生成简短的总结。摘要任务虽然需要理解文档内容,但输出格式相对固定,通常是几句话的总结,不需要复杂的推理链条。GPT-3.5-turbo 等中等规模模型完全能够胜任,且成本显著低于旗舰模型。
翻译任务(Translate):将文本从一种语言翻译成另一种语言。翻译任务的输入输出结构清晰明确,GPT-3.5-turbo 等模型已经能够提供高质量的翻译效果,成本只有主模型的三分之一到五分之一。
关键词提取任务(Keyword):从文本中提取关键信息、实体或标签。这类任务本质上是一个分类或抽取任务,不需要生成能力,GPT-3.5-turbo 等模型即可很好完成。
分流比例控制:避免辅助模型过载
辅助模型虽然成本低、速度快,但同样有并发上限和 API 配额限制。如果大量请求同时涌入辅助模型,可能导致辅助模型过载,反而影响系统稳定性。因此,需要对辅助模型的分流比例进行控制。
分流比例控制的核心思想是设定辅助模型的负载上限:当辅助模型的负载率超过阈值(如 70%)时,超出部分的任务会被自动路由到主模型;当负载率降至安全范围(如 50%)以下时,恢复向辅助模型分流。负载率的计算基于辅助模型池的当前并发数和最大并发数,实时更新。
另一种分流控制策略是按任务难度分流:对于简单明确的任务(如提取关键词)优先使用辅助模型,对于复杂或有歧义的任务(如复杂文档摘要)使用主模型。任务难度可以通过分析输入长度、结构复杂度、关键词密度等特征自动判断。
结果质量校验:辅助模型结果异常时自动重试主模型
辅助模型的分流虽然成本低、速度快,但也存在结果质量不如主模型的风险。为保障服务质量,系统需要对辅助模型的结果进行质量校验,当发现结果异常时自动切换到主模型重试。
质量校验的方法包括:格式校验(检查输出是否符合预期的格式,如摘要长度是否在合理范围内)、语义校验(使用 embedding 模型计算输入和输出的语义相似度,相似度过低说明可能存在问题)、规则校验(检查输出是否包含敏感词、是否满足业务规则等)。
当校验发现异常时,系统会自动将同一请求路由到主模型进行重试,并将异常记录到日志中用于后续分析。如果主模型重试成功,则返回主模型的结果给用户,同时更新辅助模型的健康分;如果主模型也失败,则返回错误信息给用户。
3.6 多模型统一接口
多租户场景下,系统需要同时接入多个不同提供商的多种模型(MiniMax、DeepSeek、OpenAI、Claude、本地模型等),每个模型的 API 格式、认证方式、响应结构都不尽相同。为了简化上层调用、屏蔽底层差异,我们设计了统一的 ModelAdapter 接口层。
抽象 ModelAdapter 接口,统一不同模型的调用方式
ModelAdapter 是一个抽象接口,定义了所有模型适配器必须实现的方法:
from abc import ABC, abstractmethod
from typing import Any, AsyncIterator
from dataclasses import dataclass
@dataclass
class ModelResponse:
"""统一的模型响应格式"""
model: str # 模型名称
content: str # 生成的文本内容
prompt_tokens: int # 输入 token 数
completion_tokens: int # 输出 token 数
total_tokens: int # 总 token 数
finish_reason: str # 结束原因(stop / length / error)
latency_ms: float # 请求耗时
raw_response: Any # 原始响应(用于调试)
class ModelAdapter(ABC):
"""模型适配器抽象基类"""
@property
@abstractmethod
def provider(self) -> str:
"""返回模型提供商名称"""
pass
@property
@abstractmethod
def model_name(self) -> str:
"""返回模型名称"""
pass
@abstractmethod
async def chat_complete(
self,
messages: list[dict],
temperature: float = 0.7,
max_tokens: int = 2048,
**kwargs
) -> ModelResponse:
"""
发送对话补全请求
参数:
messages: 对话消息列表,格式为 [{"role": "user", "content": "..."}]
temperature: 温度参数,控制随机性
max_tokens: 最大输出 token 数
**kwargs: 其他提供商特定参数
返回:
ModelResponse: 统一的响应格式
"""
pass
@abstractmethod
async def embedding(
self,
text: str,
**kwargs
) -> list[float]:
"""
获取文本的向量嵌入
参数:
text: 输入文本
**kwargs: 其他参数
返回:
list[float]: 嵌入向量
"""
pass
所有具体的模型适配器(如 MiniMaxAdapter、DeepSeekAdapter、OpenAIAdapter 等)都继承自 ModelAdapter 基类,实现其抽象方法。这种设计使得上层路由逻辑无需关心底层模型的具体差异,只需调用统一的接口即可。
支持的模型列表:MiniMax / DeepSeek / OpenAI / Claude / 本地模型
当前系统支持接入的模型提供商和模型列表如下:
| 提供商 | 模型系列 | 支持的任务类型 | 备注 |
|---|---|---|---|
| MiniMax | MiniMax-M2.7-highspeed、MiniMax-Text-01 | chat、completion | 主模型首选 |
| DeepSeek | DeepSeek-V3、DeepSeek-Coder | chat、completion | Fallback 主选 |
| OpenAI | GPT-4o、GPT-4o-mini、GPT-3.5-turbo、text-embedding-3-small | chat、completion、embedding | 生态成熟 |
| Claude | Claude 3.5 Sonnet、Claude 3 Haiku | chat、completion | 高端场景 |
| 本地模型 | Llama 3、Qwen 2、ChatGLM | chat、completion | 私有部署场景 |
每个模型都需要一个对应的 Adapter 实现。对于 MiniMax 和 DeepSeek 等国内厂商,通常需要适配其各自的 API 格式(如消息格式、错误码定义等);对于 OpenAI 和 Claude,可以直接使用官方提供的 SDK。
请求/响应标准化:统一 chatglm / openai-compatible / anthropic 格式
不同模型的 API 格式存在显著差异,适配器层需要完成格式转换工作:
请求格式标准化:将统一的内部请求格式转换为目标模型的 API 格式。例如,内部使用的消息格式为 {"role": "user", "content": "..."},转换为 MiniMax 格式时可能需要调整为 {"role": "user", "content": [{"type": "text", "text": "..."}]};转换为 OpenAI 格式时保持不变;转换为 Claude 格式时可能需要将 system 消息单独提取。
响应格式标准化:将各模型返回的原始响应转换为统一的 ModelResponse 格式。转换工作包括:提取生成的文本内容、计算或解析 token 使用量、映射 finish_reason(如 OpenAI 的 stop 对应 Claude 的 end_turn)、记录请求耗时等。
错误处理标准化:将各模型的错误响应(如 429 限流、500 服务器错误、模型特定错误码)统一映射为内部定义的异常类型,便于上层进行统一的错误处理和路由决策。
3.7 路由配置示例
以下是一个完整的路由配置示例,展示了如何定义三层模型体系、配置各层的模型参数、以及设置路由策略:
# ============================================================
# 多模型路由配置
# ============================================================
model_groups:
# 主模型组:承担 80% 流量的核心模型
primary:
- model: "MiniMax-M2.7-highspeed"
provider: "minimax"
base_url: "https://api.minimax.chat/v1"
max_concurrency: 30 # 单个 Credential 最大并发
timeout: 30 # 请求超时时间(秒)
cost_per_1k_tokens: 0.1 # 输入 + 输出单价(元/千 Token)
priority: 100 # 优先级,越高越优先
warmup_requests: 50 # 冷启动预热请求数
# Fallback 模型组:主模型不可用时的兜底
fallback:
- model: "DeepSeek-V3"
provider: "deepseek"
base_url: "https://api.deepseek.com/v1"
max_concurrency: 50
timeout: 45
cost_per_1k_tokens: 0.05 # Fallback 通常成本更低
priority: 80
fallback_for: ["minimax"] # 声明为哪些主模型的 Fallback
- model: "gpt-4o-mini"
provider: "openai"
base_url: "https://api.openai.com/v1"
max_concurrency: 100
timeout: 30
cost_per_1k_tokens: 0.15
priority: 60
fallback_for: ["minimax", "deepseek"]
# 辅助模型组:轻量任务的专用处理
auxiliary:
- model: "text-embedding-3-small"
provider: "openai"
base_url: "https://api.openai.com/v1"
max_concurrency: 200
timeout: 10
cost_per_1k_tokens: 0.002 # Embedding 成本极低
priority: 50
task_types: ["embedding", "similarity", "vector_search"]
description: "向量嵌入专用模型"
- model: "gpt-3.5-turbo"
provider: "openai"
base_url: "https://api.openai.com/v1"
max_concurrency: 150
timeout: 20
cost_per_1k_tokens: 0.03
priority: 40
task_types: ["summarize", "translate", "keyword", "classify"]
description: "轻量任务处理模型"
# ============================================================
# 路由策略配置
# ============================================================
routing:
# 默认路由策略
strategy: "priority_with_load_balancing"
# Fallback 链配置:按顺序尝试,直到成功
fallback_chain:
- "primary" # 首先尝试主模型
- "fallback_1" # 然后尝试 DeepSeek
- "fallback_2" # 最后尝试 GPT-4o-mini
# 辅助模型分流条件
auxiliary分流threshold:
# 当延迟超过 2 秒或预估成本超过 0.05 元时,评估是否分流
latency_threshold_ms: 2000
cost_threshold_yuan: 0.05
# 哪些任务类型可以分流
eligible_tasks: ["embedding", "similarity", "summarize", "translate", "keyword"]
# 负载均衡配置
load_balancing:
# 负载率超过多少时开始向 Fallback 分流
scale_up_fallback_threshold: 0.8
# 负载率降到多少以下时恢复主模型
scale_down_fallback_threshold: 0.6
# 负载计算方式:avg_load(平均负载)或 max_load(最大负载)
load_calculation: "avg_load"
# Fallback 频率限制
fallback_throttle:
# 单位时间内允许 Fallback 的最大比例
max_fallback_rate: 0.5 # 最多 50% 请求可降级
window_seconds: 60 # 时间窗口
# 渐进式降级配置
gradual_degrade:
enabled: true
initial_rate: 0.1 # 初始降级比例 10%
increment_rate: 0.1 # 每分钟增加 10%
max_rate: 0.8 # 最高降级比例 80%
# Fallback 冷却期配置
fallback_cooldown:
# 根据主模型故障时长计算冷却期
# 故障时长 < 5 分钟,冷却期 1 分钟
# 故障时长 5-30 分钟,冷却期 5 分钟
# 故障时长 > 30 分钟,冷却期 15 分钟
min_cooldown_seconds: 60
max_cooldown_seconds: 900
# 质量校验配置
quality_check:
enabled: true
# 辅助模型结果校验
auxiliary_result_check:
# 摘要长度检查:输出长度应在输入的 5%-30% 之间
summary_length_ratio: [0.05, 0.3]
# Embedding 相似度检查:输入输出语义相似度应 > 0.7
embedding_similarity_threshold: 0.7
# 校验失败时自动重试主模型
auto_retry_on_failure: true
max_retry_attempts: 1
# ============================================================
# 路由规则:按任务类型和场景的精确路由
# ============================================================
routing_rules:
# 任务类型 -> 模型组映射
task_type_routing:
chat: "primary"
completion: "primary"
embedding: "auxiliary"
similarity: "auxiliary"
summarize: "auxiliary"
translate: "auxiliary"
keyword: "auxiliary"
# 租户等级 -> 可用的模型范围
tenant_tier_routing:
enterprise: # 企业版租户
allowed_models: ["primary", "fallback", "auxiliary"]
default_model: "primary"
standard: # 标准版租户
allowed_models: ["primary", "fallback", "auxiliary"]
default_model: "primary"
# 标准版在主模型负载高时会被降级到 Fallback
allow_degrade: true
trial: # 试用版租户
allowed_models: ["fallback", "auxiliary"]
default_model: "fallback_1"
# 试用版不提供主模型访问
priority_access: false
# 特殊场景路由
scene_routing:
# 高并发场景:超过 500 QPS 时启用降级策略
high_concurrency:
enabled: true
threshold_qps: 500
strategy: "prefer_auxiliary"
# 成本敏感场景:启用成本优先路由
cost_sensitive:
enabled: true
max_cost_per_request: 0.02
strategy: "lowest_cost_first"
# 质量优先场景:禁用辅助模型分流
quality_first:
enabled: true
allow_auxiliary: false
上述配置实现了完整的多模型路由体系,涵盖了从基础的模型定义、到路由策略选择、再到精细化的场景化配置,能够满足超大规模多租户场景下的各种复杂需求。
四、超大规模租户隔离机制
4.1 租户隔离的分层设计
在超大规模多租户场景下(数千到数万个租户),租户隔离是保障系统稳定性、服务质量和公平性的核心机制。如果没有有效的隔离机制,单个租户的异常流量(例如突发的大量请求、恶意刷接口、资源耗尽式攻击)将迅速蔓延至整个系统,导致其他无辜租户的服务质量下降,甚至引发系统级的雪崩。因此,租户隔离不是可选项,而是多租户系统的必修课。
为什么需要分层隔离:防止单个租户耗尽全局资源
分层隔离的核心思想是将系统资源划分为多个层级,在每个层级实施不同粒度的隔离控制。单一维度的隔离(如仅在应用层做限流)无法应对所有风险场景。例如,应用层限流可以控制请求频率,但如果某个租户的所有请求都堆积在队列中等待模型调用,仍会耗尽模型层的并发资源。分层隔离通过在多个层级分别设置隔离屏障,确保任何单一租户都无法突破任一层级的资源上限。
三层隔离:命名空间隔离 → 资源配额隔离 → 模型层隔离
我们的租户隔离体系分为三个层次,自上而下形成纵深防御:
第一层:命名空间隔离(Namespace Isolation) 是最基础的隔离维度。通过为每个租户分配独立的命名空间,实现数据、资源标识符的逻辑分离。租户 A 的 Redis Key、文件系统路径、数据库 Schema 与租户 B 完全独立,互不干扰。这一层主要解决的是数据隔离问题,防止租户间的数据泄露和资源命名冲突。
第二层:资源配额隔离(Quota Isolation) 是核心的资源控制维度。在命名空间隔离的基础上,为每个租户设置硬性的资源配额上限,包括最大并发请求数、日/周/月请求量、Token 消耗上限等。配额隔离在请求入口处生效,超出配额直接拒绝,不占用任何下游资源。这一层主要解决的是资源公平性问题,防止单个租户耗尽全局资源。
第三层:模型层隔离(Model Isolation) 是精细化的服务保障维度。高优先级租户可以获得专属的模型 Credential 池,确保其请求在任何情况下都能得到及时响应;普通租户则共享公共池,通过公平调度算法分配资源。这一层主要解决的是服务质量保障问题,确保高价值租户获得与其付费等级匹配的服务体验。
隔离粒度:租户级 vs 用户级 vs 会话级
隔离粒度决定了资源分配的精细程度。我们支持三种粒度的隔离控制:
租户级(Tenant-level)隔离 是最常用的粒度,以租户为单位进行资源配额控制。同一租户下的所有用户共享该租户的配额总和,租户管理员可以在内部自行分配。这种粒度实现简单、开销较低,适合大部分场景。
用户级(User-level)隔离 是更精细的粒度,以租户内的单个用户为单位进行配额控制。用户级隔离能够更精确地识别和控制资源消耗源头,防止租户内个别用户的异常行为影响同租户的其他用户。这种粒度需要更多的存储和计算开销,适合对内部公平性要求较高的场景。
会话级(Session-level)隔离 是最精细的粒度,以单个对话会话为单位进行配额控制。会话级隔离主要用于防止单次会话内的异常消耗(如超长上下文导致的 Token 爆炸),适合对单次请求有严格限制的场景。
4.2 命名空间隔离
命名空间隔离是租户隔离体系的基础层,通过在系统的各个存储层面为每个租户分配独立的命名空间,实现租户间的数据、资源和标识符的逻辑分离。命名空间隔离不仅能够防止数据泄露和命名冲突,还能简化问题排查——当某个租户出现异常时,通过其命名空间可以快速定位和隔离相关资源。
租户 ID 编码规范:UUID / 有意义前缀 / 哈希
租户 ID 是租户在系统中的唯一标识符,其编码规范直接影响存储效率和查询性能。我们支持三种租户 ID 编码方案:
UUID 方案 是最通用的标识符生成方式。UUID 具有全球唯一性,不依赖中心化的 ID 生成服务,适合大规模分布式系统。缺点是存储空间较大(36 字符),且无业务含义,难以直观辨识。推荐在对租户 ID 无特殊业务含义需求的场景使用。
有意义前缀方案 通过特定前缀编码租户的业务属性。例如,ent_1001 表示企业套餐的 1001 号租户,basic_2005 表示基础套餐的 2005 号租户。有意义前缀便于人工辨识和问题排查,但需要注意前缀与数字 ID 的组合规范,避免冲突。这种方案适合租户 ID 有业务管理需求的场景。
哈希方案 通过哈希函数将业务标识(如租户名称、企业邮箱域名等)转换为固定长度的哈希值。哈希方案兼具标识唯一性和业务无关性,但存在哈希碰撞的理论风险(可通过增加哈希位数降低碰撞概率)。这种方案适合需要将业务标识映射为系统标识但又不想直接暴露业务信息的场景。
无论采用哪种方案,租户 ID 的设计都应满足以下原则:唯一性(全局唯一)、不可变性(创建后不应变更)、可索引性(支持高效查询)。我们推荐在数据库内部使用 UUID 作为主键,在业务接口层面支持租户名称或邮箱域名等有业务含义的标识。
Redis Key 隔离:tenant:{tenant_id}:{resource} 前缀
Redis 是多租户系统中常用的缓存和存储组件,租户间的 Redis Key 隔离至关重要。我们采用 tenant:{tenant_id}:{resource_type}:{resource_id} 的四级键名规范:
tenant:tnt_1001:quota:daily # 租户 tnt_1001 的日配额信息
tenant:tnt_1001:counter:requests # 租户 tnt_1001 的请求计数
tenant:tnt_1001:session:abc123 # 租户 tnt_1001 的会话 abc123
tenant:tnt_1001:credential:pool_main # 租户 tnt_1001 的 Credential 池配置
这种键名规范的优势在于:前缀层级清晰,便于按租户维度批量操作(使用 SCAN 或 KEYS 的前缀匹配);与 Redis Cluster 的哈希槽机制兼容,便于分片扩展;键名长度可控,不会超出 Redis 的键名限制(1024 字节)。
对于需要跨租户共享的数据(如全局配置、公共缓存等),我们使用 global: 前缀明确区分,避免误操作导致的租户数据泄露。
文件系统隔离:/data/tenant/{tenant_id}/
对于需要持久化存储租户数据的场景(如文件上传、日志归档、模型缓存等),我们采用层级化的目录结构隔离:
/data/tenant/tnt_1001/uploads/ # 租户 tnt_1001 的上传文件
/data/tenant/tnt_1001/logs/ # 租户 tnt_1001 的日志归档
/data/tenant/tnt_1001/cache/ # 租户 tnt_1001 的模型缓存
/data/tenant/tnt_1001/exports/ # 租户 tnt_1001 的数据导出
目录隔离的实现要点包括:创建租户目录时设置正确的权限(700,确保仅对应租户可访问);定期清理过期文件,释放存储空间;对共享存储(如 NFS)实施配额限制,防止单一租户耗尽磁盘空间。
数据库 Schema 隔离:每个租户独立 Schema vs 共享 Schema + tenant_id 列
数据库层面的租户隔离有两种主要方案:
方案一:每个租户独立 Schema 为每个租户创建一个独立的数据库 Schema,租户的所有数据存储在自己的 Schema 下。这种方案的隔离性最强,一个租户的查询不会影响其他租户;但随着租户数量增加,数据库的 Schema 数量线性增长,管理复杂度急剧上升。通常仅在租户数量较少(< 100)且隔离性要求极高的场景使用。
方案二:共享 Schema + tenant_id 列 是更常用的方案。所有租户的数据存储在同一组表中,通过 tenant_id 列进行逻辑分离。这种方案管理简单,扩展性强;但需要注意索引设计(tenant_id 应作为复合索引的首列)、查询优化(避免跨租户的全局扫描)、以及数据安全(防止 SQL 注入导致的跨租户数据泄露)。
在超大规模多租户场景下(数万个租户),共享 Schema + tenant_id 列是更实际的选择。我们通常会将 tenant_id 作为必填字段在数据库层面强制约束,并在应用层实施行级安全策略(Row-Level Security),进一步降低数据泄露风险。
4.3 资源配额硬隔离
资源配额硬隔离是租户隔离体系的核心机制,通过为每个租户设置不可逾越的资源上限,确保所有租户都能获得公平的服务质量。硬隔离的实现需要在入口处进行严格检查,配额耗尽时直接拒绝请求,不消耗任何下游资源。
租户资源配额表
租户配额信息是硬隔离的数据基础,我们通过以下数据结构管理:
class TenantQuota:
"""租户资源配额模型"""
tenant_id: str # 租户唯一标识
plan_type: str # 套餐类型:basic/premium/enterprise
# === 并发控制 ===
max_concurrent_requests: int # 最大并发请求数,超过此数将排队等待
# === 请求量控制 ===
max_daily_requests: int # 日请求上限,-1 表示无限制
max_weekly_requests: int # 周请求上限,-1 表示无限制
max_monthly_requests: int # 月请求上限,-1 表示无限制
# === Token 消耗控制 ===
max_tokens_per_day: int # 日 Token 上限,-1 表示无限制
max_tokens_per_month: int # 月 Token 上限,-1 表示无限制
# === Credential 池访问控制 ===
credential_pool_access: list[str] # 该租户可用的 Credential 池列表
exclusive_pools: list[str] # 该租户的独占 Credential 池
# === 配额使用统计 ===
used_daily_requests: int # 当日已使用请求数
used_monthly_requests: int # 当月已使用请求数
used_daily_tokens: int # 当日已使用 Token 数
current_concurrent: int # 当前并发请求数
# === 告警阈值配置 ===
daily_alert_threshold: float # 日配额告警阈值(如 0.8 表示 80%)
monthly_alert_threshold: float # 月配额告警阈值
# === 配额元数据 ===
quota_updated_at: datetime # 配额最后更新时间
quota_updated_by: str # 配额最后更新操作者
配额表的设计要点:
并发控制通过 max_concurrent_requests 和 current_concurrent 实现。当 current_concurrent >= max_concurrent_requests 时,新请求进入排队队列而非直接拒绝,排队时间超过阈值后才拒绝。这是一种"软拒绝"策略,在系统有余量时允许一定程度的突发流量。
请求量控制通过多时间维度的配额字段实现。日/周/月三级配额形成嵌套约束,任一维度超限都会触发限流。这种设计既防止了短期突发(如单日请求量暴增 10 倍),又保证了长期稳定需求(如月度平均使用量达标但某日超标)。
Token 消耗控制是成本控制的关键。LLM 按 Token 计费,Token 配额直接关联运营成本。我们同时控制日 Token 和月 Token 消耗,并设置 max_tokens_per_request 单次请求上限,防止单次请求消耗过多 Token(如超长上下文的恶意请求)。
Credential 池访问控制实现了租户与 Credential 池的绑定关系。通过 credential_pool_access 控制普通共享池的访问权限;通过 exclusive_pools 配置高优先级租户的独占资源池。Enterprise 级别租户可配置独占池,实现物理隔离。
硬隔离实现:配额耗尽 → 直接拒绝,不进入队列
硬隔离的核心原则是:配额耗尽时直接拒绝请求,不占用任何下游资源。拒绝发生在请求处理的最早阶段——通常是在接入层的限流中间件中完成,此时请求尚未到达模型层或队列系统。
硬隔离的实现逻辑:
def check_hard_quota(tenant_id: str, request: LLMRequest) -> QuotaResult:
"""
检查硬配额,返回配额检查结果。
超配额时立即拒绝,不进入队列。
"""
quota = get_tenant_quota(tenant_id)
# 1. 检查并发配额
if quota.current_concurrent >= quota.max_concurrent_requests:
return QuotaResult(
allowed=False,
reason="CONCURRENT_LIMIT_EXCEEDED",
retry_after=calculate_queue_time(quota),
message=f"并发数超限,当前{quota.current_concurrent},上限{quota.max_concurrent_requests}"
)
# 2. 检查日请求配额
if quota.max_daily_requests > 0 and quota.used_daily_requests >= quota.max_daily_requests:
return QuotaResult(
allowed=False,
reason="DAILY_REQUEST_LIMIT_EXCEEDED",
retry_after=seconds_until_midnight(),
message=f"日请求量超限,请明日重试"
)
# 3. 检查月请求配额
if quota.max_monthly_requests > 0 and quota.used_monthly_requests >= quota.max_monthly_requests:
return QuotaResult(
allowed=False,
reason="MONTHLY_REQUEST_LIMIT_EXCEEDED",
retry_after=seconds_until_month_start(),
message=f"月请求量超限,请次月重试"
)
# 4. 检查日 Token 配额
if quota.max_tokens_per_day > 0 and quota.used_daily_tokens >= quota.max_tokens_per_day:
return QuotaResult(
allowed=False,
reason="DAILY_TOKEN_LIMIT_EXCEEDED",
retry_after=seconds_until_midnight(),
message=f"日 Token 消耗超限,请明日重试"
)
# 5. 检查单次请求 Token 上限
estimated_tokens = estimate_tokens(request)
if quota.max_tokens_per_request > 0 and estimated_tokens > quota.max_tokens_per_request:
return QuotaResult(
allowed=False,
reason="SINGLE_REQUEST_TOKEN_LIMIT_EXCEEDED",
retry_after=0,
message=f"单次请求 Token 数超限,最大支持{quota.max_tokens_per_request}"
)
return QuotaResult(allowed=True)
硬隔离的优势:零下游资源消耗(拒绝发生在入口)、零排队延迟(直接返回结果)、零雪崩风险(不会因下游排队导致级联失败)。
配额告警:达到 80% / 95% 时通知
配额告警是软配额机制的实现,目的是在配额即将耗尽前通知租户管理员,以便其及时处理(如升级套餐、排查异常流量等)。我们采用两级告警阈值:
一级告警(80% 阈值):当配额使用达到 80% 时触发,提醒租户关注用量情况。告警方式包括企业微信消息、邮件通知等。租户管理员收到告警后可以主动排查是否有异常消耗,或提前准备扩容方案。
二级告警(95% 阈值):当配额使用达到 95% 时触发,表明配额即将耗尽。告警升级为高优先级,并附带紧急处理建议(如临时扩容、暂停非核心业务等)。
告警的实现采用异步模式,避免影响正常请求处理流程:
async def check_quota_alert(tenant_id: str, quota: TenantQuota):
"""检查配额告警,异步发送通知"""
# 检查日请求配额告警
if quota.max_daily_requests > 0:
daily_ratio = quota.used_daily_requests / quota.max_daily_requests
if daily_ratio >= 0.95:
await send_critical_alert(tenant_id, "daily_requests", daily_ratio)
elif daily_ratio >= 0.80:
await send_warning_alert(tenant_id, "daily_requests", daily_ratio)
# 检查月请求配额告警
if quota.max_monthly_requests > 0:
monthly_ratio = quota.used_monthly_requests / quota.max_monthly_requests
if monthly_ratio >= 0.95:
await send_critical_alert(tenant_id, "monthly_requests", monthly_ratio)
elif monthly_ratio >= 0.80:
await send_warning_alert(tenant_id, "monthly_requests", monthly_ratio)
# 检查并发配额告警
if quota.max_concurrent_requests > 0:
concurrent_ratio = quota.current_concurrent / quota.max_concurrent_requests
if concurrent_ratio >= 0.95:
await send_critical_alert(tenant_id, "concurrent", concurrent_ratio)
elif concurrent_ratio >= 0.80:
await send_warning_alert(tenant_id, "concurrent", concurrent_ratio)
4.4 数万租户的性能优化
当租户规模扩展到数万户时,租户隔离机制本身的管理和查询开销会成为显著的性能瓶颈。如果每次请求都需要查询数据库验证配额,或者在内存中遍历数万个租户对象,全系统的吞吐能力将大打折扣。因此,数万租户场景下的性能优化是租户隔离机制能否支撑大规模系统的关键。
缓存策略:租户配额信息缓存到 Redis(TTL 60s)
租户配额信息属于读多写少的数据类型(配额变更远少于请求次数),非常适合采用缓存策略。我们在 Redis 中缓存租户的完整配额信息,缓存键名为 tenant:{tenant_id}:quota:cache,缓存 TTL 设置为 60 秒。
# 配额缓存读写逻辑
async def get_tenant_quota_cached(tenant_id: str) -> TenantQuota:
cache_key = f"tenant:{tenant_id}:quota:cache"
# 尝试从缓存获取
cached = await redis.get(cache_key)
if cached:
return deserialize_quota(cached)
# 缓存未命中,从数据库加载
quota = await db.tenant_quotas.find_one(tenant_id=tenant_id)
if quota:
# 写入缓存,TTL 60 秒
await redis.setex(cache_key, 60, serialize_quota(quota))
return quota
60 秒的 TTL 是在数据新鲜度和性能收益之间的平衡:足够短的刷新周期(保证配额变更能在分钟级内生效),又足够长的缓存时间(将数据库查询频率降低 99% 以上)。
批量加载:启动时批量加载所有租户配额到内存
对于需要频繁访问的热点租户(如日活租户),我们可以采用启动时批量加载的策略。系统启动时从数据库一次性加载所有租户的配额信息到内存字典,后续请求直接读取内存,避免频繁的 Redis 缓存访问。
# 启动时批量加载配额
async def preload_all_quotas():
all_quotas = await db.tenant_quotas.find_many()
quota_cache = {q.tenant_id: q for q in all_quotas}
logger.info(f"Loaded {len(quota_cache)} tenant quotas into memory")
return quota_cache
# 内存缓存 + 异步更新
class InMemoryQuotaCache:
def __init__(self):
self._cache: dict[str, TenantQuota] = {}
self._lock = asyncio.Lock()
self._last_sync = datetime.min
async def get(self, tenant_id: str) -> Optional[TenantQuota]:
async with self._lock:
return self._cache.get(tenant_id)
async def reload_if_needed(self):
"""检查是否需要全量刷新"""
if datetime.now() - self._last_sync > timedelta(minutes=5):
await self._full_sync()
async def _full_sync(self):
"""全量同步数据库最新配额"""
async with self._lock:
all_quotas = await db.tenant_quotas.find_many()
self._cache = {q.tenant_id: q for q in all_quotas}
self._last_sync = datetime.now()
批量加载策略的适用场景是租户数量适度(< 10 万)且内存充足的情况。对于超大规模(> 10 万租户),全量加载会占用过多内存,此时应依赖 Redis 缓存或分层缓存策略。
增量更新:配额变更通过消息队列异步更新缓存
当管理员修改某个租户的配额时,我们需要更新缓存。如果立即更新所有缓存层(Redis + 内存),会面临一致性和性能的双重挑战。我们采用消息队列实现增量更新的最终一致性:
# 配额变更发布到消息队列
async def update_tenant_quota(tenant_id: str, updates: dict):
# 1. 更新数据库
await db.tenant_quotas.update_one(tenant_id, updates)
# 2. 发布配额变更事件
await mq.publish("quota_updates", {
"tenant_id": tenant_id,
"updates": updates,
"timestamp": datetime.now().isoformat()
})
# 消费者异步更新缓存
async def consume_quota_updates():
async for msg in mq.subscribe("quota_updates"):
tenant_id = msg["tenant_id"]
updates = msg["updates"]
# 更新 Redis 缓存
cache_key = f"tenant:{tenant_id}:quota:cache"
existing = await redis.get(cache_key)
if existing:
quota = deserialize_quota(existing)
quota.update(updates)
await redis.setex(cache_key, 60, serialize_quota(quota))
# 更新内存缓存(如果存在)
if tenant_id in in_memory_cache._cache:
in_memory_cache._cache[tenant_id].update(updates)
这种异步更新机制的优势:数据库写操作不阻塞缓存更新;消息队列提供重试机制,保证最终一致性;多个缓存层(Redis + 内存)可以渐进式更新。
Trie 树优化:租户 ID 前缀匹配查找
在某些场景下,我们需要根据租户 ID 前缀进行批量查找。例如,查找所有 ent_* 前缀的租户配额,或者根据域名后缀(如 example.com)查找对应的租户。传统的线性扫描效率低下,我们可以使用 Trie 树(前缀树)优化:
class TrieNode:
def __init__(self):
self.children: dict[str, TrieNode] = {}
self.tenant_ids: list[str] = [] # 存储该前缀下的所有租户 ID
class TenantIdTrie:
"""租户 ID 前缀 Trie 树,用于高效的前缀匹配查询"""
def __init__(self):
self.root = TrieNode()
def insert(self, tenant_id: str):
node = self.root
for char in tenant_id:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
# 在每个节点存储经过它的租户 ID(用于前缀匹配)
if tenant_id not in node.tenant_ids:
node.tenant_ids.append(tenant_id)
def search_prefix(self, prefix: str) -> list[str]:
"""查找所有以 prefix 开头的租户 ID"""
node = self.root
for char in prefix:
if char not in node.children:
return []
node = node.children[char]
return node.tenant_ids
def batch_load(self, tenant_ids: list[str]):
"""批量加载租户 ID 到 Trie 树"""
for tid in tenant_ids:
self.insert(tid)
# 使用示例
trie = TenantIdTrie()
trie.batch_load([t.tenant_id for t in all_tenants])
# 查找所有 ent_ 前缀的租户
enterprise_tenants = trie.search_prefix("ent_")
Trie 树的查询时间复杂度为 O(k),其中 k 是前缀长度,与租户总数无关。这使得前缀匹配查询在超大规模租户场景下仍能保持稳定性能。
4.5 租户间的公平调度
在共享公共资源的场景下,公平调度是保障所有租户都能获得合理服务的关键。公平调度不仅要防止高优先级租户过度占用资源,还要确保低优先级租户不会因为资源不足而完全无法服务。同时,需要设计防饥饿机制,确保长期未获得服务的租户能够得到优先处理。
时间片轮转:每个租户分配固定时间片
时间片轮转(Time Slice Round Robin)是最基本的公平调度算法。其核心思想是将时间划分为固定长度的时间片,每个租户在每个时间片内只能消耗一定量的资源,时间片用完后切换到下一个租户。
class TimeSliceScheduler:
def __init__(self, slice_duration_ms: int = 100):
self.slice_duration = slice_duration_ms / 1000.0 # 转换为秒
self.tenant_queue = deque() # 租户轮转队列
self.tenant_last_slice = {} # 租户上次获得时间片的时间
def enqueue(self, tenant_id: str):
if tenant_id not in self.tenant_last_slice:
self.tenant_queue.append(tenant_id)
self.tenant_last_slice[tenant_id] = 0
def select_next(self) -> Optional[str]:
if not self.tenant_queue:
return None
tenant_id = self.tenant_queue.popleft()
now = time.time()
# 检查该租户是否在冷却期
elapsed = now - self.tenant_last_slice[tenant_id]
if elapsed < self.slice_duration:
# まだ時間切れではない場合、キューの末尾に回す
self.tenant_queue.append(tenant_id)
return None
# 選択された租户を更新
self.tenant_last_slice[tenant_id] = now
self.tenant_queue.append(tenant_id) # 下一轮继续轮转
return tenant_id
时间片轮转的优点是实现简单、公平性高;缺点是无法区分租户优先级,高优先级租户无法获得更多资源。我们通常将时间片轮转用于同优先级租户间的调度。
信用桶算法:每个租户有信用,用完需等待补充
信用桶算法(Token Bucket)是一种更精细的流量控制算法。每个租户持有一个"信用桶",桶中装有定量"信用",每次请求消耗一定量信用,信用耗尽后需要等待补充。
class CreditBucket:
def __init__(
self,
max_credits: int,
refill_rate: float, # 每秒补充的信用量
refill_interval: float = 1.0 # 补充间隔(秒)
):
self.max_credits = max_credits
self.credits = max_credits
self.refill_rate = refill_rate
self.refill_interval = refill_interval
self.last_refill = time.time()
def try_consume(self, cost: int = 1) -> bool:
"""尝试消耗信用,返回是否成功"""
self._refill()
if self.credits >= cost:
self.credits -= cost
return True
return False
def _refill(self):
"""补充信用"""
now = time.time()
elapsed = now - self.last_refill
if elapsed >= self.refill_interval:
# 按时间比例补充信用
refill_amount = elapsed * self.refill_rate
self.credits = min(self.max_credits, self.credits + refill_amount)
self.last_refill = now
@property
def available_credits(self) -> float:
self._refill()
return self.credits
class TenantCreditManager:
"""租户信用管理器"""
def __init__(self, default_max_credits: int = 100, default_refill_rate: float = 10.0):
self.default_max = default_max_credits
self.default_refill = default_refill_rate
self.buckets: dict[str, CreditBucket] = {}
self._lock = threading.Lock()
def get_bucket(self, tenant_id: str) -> CreditBucket:
with self._lock:
if tenant_id not in self.buckets:
self.buckets[tenant_id] = CreditBucket(self.default_max, self.default_refill)
return self.buckets[tenant_id]
def try_request(self, tenant_id: str, cost: int = 1) -> bool:
bucket = self.get_bucket(tenant_id)
return bucket.try_consume(cost)
信用桶算法的优势:允许一定程度的突发流量(桶中有信用时),同时通过补充机制防止长期透支;可以按租户等级设置不同的桶容量和补充速率。缺点是实现相对复杂,需要维护每个租户的信用状态。
租户优先级:高优先级租户(如付费用户)获得更多资源
在公平调度的基础上,我们引入优先级机制确保高价值租户获得更好的服务体验。优先级调度不意味着高优先级租户可以完全抢占低优先级租户的资源,而是通过以下方式实现差异化:
优先级权重:为每个租户分配调度权重,高优先级租户的权重更高。例如,Enterprise 租户权重为 10,Premium 租户权重为 5,Basic 租户权重为 1。在轮转调度中,高权重租户每隔 1 次就能获得一次调度机会。
独立队列:为高优先级租户创建独立的请求队列,与普通队列分离。高优先级队列享有优先调度权,但系统仍为普通队列保留最小资源保障(Guaranteed Minimum)。
资源预留:为高优先级租户预留一定比例的模型并发资源,确保即使在系统繁忙时也能及时响应。
class PriorityScheduler:
"""多优先级调度器"""
PRIORITY_LEVELS = {
"enterprise": 0, # 最高优先级
"premium": 1,
"basic": 2,
"trial": 3, # 试用用户,最低优先级
}
def __init__(self):
self.queues: dict[int, deque] = {
level: deque() for level in set(self.PRIORITY_LEVELS.values())
}
self.current_priority = 0 # 当前轮询到的优先级
def enqueue(self, tenant_id: str, request: Request):
priority = self._get_tenant_priority(tenant_id)
self.queues[priority].append((tenant_id, request))
def select_next(self) -> Optional[tuple]:
# 从高优先级到低优先级遍历
for level in sorted(self.queues.keys()):
if self.queues[level]:
return self.queues[level].popleft()
return None
def _get_tenant_priority(self, tenant_id: str) -> int:
# 从租户配置获取优先级
plan = get_tenant_plan(tenant_id)
return self.PRIORITY_LEVELS.get(plan, 2)
饥饿预防:长时间未调度的租户优先处理
公平调度算法的一个潜在问题是"饥饿"(Starvation):低优先级租户可能因为持续到来的高优先级请求而长时间无法获得调度机会。为了防止饥饿,我们引入等待时间作为调度优先级的影响因素:
class FairSchedulerWithAging:
"""
带老化机制的公平调度器。
等待时间越长的租户,其有效优先级越高。
"""
def __init__(self, aging_factor: float = 0.1):
self.aging_factor = aging_factor # 老化系数
self.tenant_queue: dict[str, tuple] = {} # tenant_id -> (priority, enqueue_time, request)
self._lock = threading.Lock()
def enqueue(self, tenant_id: str, request: Request, base_priority: int):
with self._lock:
self.tenant_queue[tenant_id] = (base_priority, time.time(), request)
def select_next(self) -> Optional[tuple]:
with self._lock:
if not self.tenant_queue:
return None
now = time.time()
# 计算每个租户的有效优先级(考虑等待时间)
def effective_priority(item):
tenant_id, (base_priority, enqueue_time, _) = item
wait_time = now - enqueue_time
# 等待时间越长,有效优先级越高(数值越小越优先)
age_bonus = wait_time * self.aging_factor
return base_priority - age_bonus
# 选择有效优先级最高的租户
selected_id, (_, _, request) = min(
self.tenant_queue.items(),
key=effective_priority
)
del self.tenant_queue[selected_id]
return (selected_id, request)
老化机制的核心思想:随着等待时间增加,租户的有效优先级逐渐提升,最终必然会获得调度机会。这种设计在保证调度的公平性的同时,也满足了低优先级租户的最低服务保障要求。
4.6 租户隔离配置示例
以下是一个完整的租户隔离配置示例,展示了如何为不同套餐等级的租户设置差异化的隔离策略:
tenant_isolation:
# 命名空间配置
namespace_prefix: "tnt"
# Redis Key 隔离前缀
redis_key_pattern: "tenant:{tenant_id}:{resource}"
# 文件系统隔离根目录
filesystem_root: "/data/tenant/{tenant_id}/"
# 数据库 Schema 隔离策略:shared(共享 Schema + tenant_id 列)
database_strategy: "shared"
# 配额配置
quotas:
# === 基础套餐 ===
basic:
max_concurrent_requests: 5 # 最多 5 个并发请求
max_daily_requests: 1000 # 日请求上限 1000
max_monthly_requests: 10000 # 月请求上限 10000
max_tokens_per_day: 500000 # 日 Token 上限 50 万
max_tokens_per_request: 8192 # 单次请求最多 8K Token
credential_pools: ["minimax_basic"] # 仅可使用基础 Credential 池
exclusive_pools: [] # 无独占池
priority: 2 # 调度优先级(数值越小越高)
# 告警阈值
alert_thresholds:
daily_requests: 0.8 # 日请求量 80% 时告警
monthly_requests: 0.8 # 月请求量 80% 时告警
concurrent: 0.8 # 并发数 80% 时告警
# === 高级套餐 ===
premium:
max_concurrent_requests: 20 # 最多 20 个并发请求
max_daily_requests: 10000 # 日请求上限 1 万
max_monthly_requests: 200000 # 月请求上限 20 万
max_tokens_per_day: 5000000 # 日 Token 上限 500 万
max_tokens_per_request: 32768 # 单次请求最多 32K Token
credential_pools:
- "minimax_basic" # 可使用基础池
- "minimax_premium" # 可使用高级池
exclusive_pools: [] # 无独占池
priority: 1 # 较高调度优先级
alert_thresholds:
daily_requests: 0.85
monthly_requests: 0.85
concurrent: 0.85
# === 企业套餐 ===
enterprise:
max_concurrent_requests: 100 # 最多 100 个并发请求
max_daily_requests: -1 # 不限制日请求量
max_monthly_requests: -1 # 不限制月请求量
max_tokens_per_day: -1 # 不限制日 Token
max_tokens_per_request: 131072 # 单次请求最多 128K Token
credential_pools:
- "minimax_basic"
- "minimax_premium"
- "deepseek"
exclusive_pools: ["enterprise_dedicated"] # 独享企业专属池
priority: 0 # 最高调度优先级
alert_thresholds:
daily_requests: 0.9
monthly_requests: 0.9
concurrent: 0.9
# 企业套餐额外保障
guarantees:
min_concurrent_reserved: 10 # 保留最少 10 个并发槽位
fallback_timeout_ms: 1000 # Fallback 超时时间(毫秒)
dedicated_sla: true # 启用专属 SLA 保障
# 公平调度配置
fair_scheduling:
# 调度策略:priority_fair(优先级 + 公平)
strategy: "priority_fair"
# 时间片轮转配置
time_slice:
enabled: true
duration_ms: 100 # 每个时间片 100ms
# 信用桶配置
credit_bucket:
enabled: true
default_max_credits: 100 # 默认桶容量
default_refill_rate: 10 # 每秒补充 10 信用
# 按套餐调整
plan_overrides:
basic:
max_credits: 50
refill_rate: 5
premium:
max_credits: 150
refill_rate: 15
enterprise:
max_credits: 500
refill_rate: 50
# 饥饿预防配置
starvation_prevention:
enabled: true
max_wait_time_seconds: 300 # 等待超过 5 分钟提升优先级
aging_factor: 0.1 # 老化系数
# 配额缓存配置
quota_cache:
redis:
ttl_seconds: 60 # 缓存 TTL 60 秒
key_prefix: "tenant:quota:cache"
in_memory:
enabled: true
max_entries: 100000 # 内存缓存最多 10 万租户
sync_interval_seconds: 300 # 全量同步间隔 5 分钟
# 告警渠道配置
alert_channels:
warning:
- type: "wechat_work"
mention: ["tenant_admin"]
- type: "email"
to: ["ops@example.com"]
critical:
- type: "wechat_work"
mention: ["tenant_admin", "ops_oncall"]
- type: "sms"
to: ["+86-138xxxx"]
- type: "phone"
to: ["+86-139xxxx"]
五、优先级与公平调度算法
5.1 调度架构设计
在超大用户多租户场景下,调度器是整个系统的核心协调组件。不同租户、不同优先级的请求竞争有限的 LLM 资源,如果没有有效的调度机制,低优先级请求可能抢占高优先级请求的资源,导致核心用户的体验急剧下降。此外,公平性也是多租户系统的生命线——不能让某些租户长期"饥饿",也不能让某些租户无限抢占资源。因此,调度架构的设计直接决定了系统的服务质量和资源利用效率。
我们的调度架构采用四层流水线设计:请求入口 → 优先级评估 → 队列排序 → 调度执行。请求首先到达入口层,入口层负责解析请求元数据(租户 ID、用户 ID、模型选择等),并提取或计算请求的优先级;随后进入优先级评估层,该层根据租户套餐、业务规则、当前负载等因素综合计算请求的实际优先级;接着请求进入对应的优先级队列,队列内部按等待时间、权重等因素进行排序;最后调度执行器按照调度策略从队列中取出请求,交给模型路由层执行。
调度架构的核心设计理念是延迟可控 + 资源高效利用 + 多租户公平三个目标的平衡。延迟可控意味着高优先级请求的 P99 延迟必须维持在可接受范围内;资源高效利用要求不能让任何资源处于空闲状态;多租户公平则要求调度器不能偏向任何租户,即使是高付费租户也不能无限抢占资源。这三个目标有时会相互冲突,例如严格优先可能导致低优先级饿死,因此调度器需要内置多种保护机制。
5.2 优先级体系
优先级体系是调度的基础,它决定了不同类型请求的调度顺序。我们设计了一个五级优先级体系,从 CRITICAL(最高)到 BATCH(最低),覆盖了所有业务场景:
class Priority:
CRITICAL = 0 # 系统管理员 / 紧急故障处理
HIGH = 1 # 付费企业用户
NORMAL = 2 # 普通付费用户
LOW = 3 # 免费/试用用户
BATCH = 4 # 批量处理/非实时任务
CRITICAL 级别是最高优先级的特殊级别,仅供系统管理员操作或紧急故障处理使用。典型场景包括:紧急修复线上问题、手动触发大规模数据处理、系统级健康检查等。CRITICAL 级别享有绝对优先权,其他所有请求都必须为其让路。为了防止滥用,CRITICAL 级别的调用需要进行额外审计,并限制单次调用的最大 Token 数。
HIGH 级别面向付费企业用户,这是系统的核心收入来源。企业用户通常对延迟敏感、业务关键、付费意愿强,因此需要保障其服务质量。HIGH 级别的请求在调度时享有优先权,队列等待时间过长时还会触发预警机制。
NORMAL 级别面向普通付费用户,这是用户基数最大的群体。NORMAL 级别保障基本的服务质量,但不享有特殊优先权。在资源紧张时,NORMAL 级别可能需要等待更高优先级请求完成后才能被调度。
LOW 级别面向免费或试用用户。免费用户是潜在付费用户的重要来源,但系统资源有限,因此只能为其提供尽力而为的服务。LOW 级别在资源紧张时可能长时间等待甚至被降级。
BATCH 级别是最低优先级,专门为批量处理和非实时任务设计。典型场景包括:日志分析、数据导出、定期报告生成等。 BATCH 任务的显著特点是容许较高延迟,适合在系统空闲时段执行。调度器会为 BATCH 级别设置专门的时间窗口,通常是业务低峰期(如凌晨时段)。
5.3 公平调度算法
公平调度是多租户系统的核心挑战之一。我们实现了三种主流的公平调度算法,以适应不同的业务场景:
加权公平队列(Weighted Fair Queueing, WFQ) 是一种精确权重控制的调度算法。每个租户被分配一个权重值,调度器按权重比例分配时间片。例如,租户 A 权重为 3,租户 B 权重为 1,则在相同时间内,租户 A 获得 75% 的调度机会,租户 B 获得 25%。WFQ 的优点是能够精确控制各租户的资源比例,适用于对服务质量有差异化要求的场景;缺点是计算开销较大,需要维护每个租户的虚拟时间。
令牌桶调度 是最简单直观的调度算法。每个租户按固定速率获取令牌,令牌桶满了之后不再累积。请求只有获得令牌才能被调度。令牌桶的优点是实现简单、易于理解,适用于吞吐量优先的场景;缺点是不能保证延迟,因为即使令牌充足,请求仍可能因其他因素等待。
多级反馈队列(Multi-Level Feedback Queue, MLFQ) 是一种动态调整优先级的调度算法。初始时所有请求进入最高优先级队列;如果请求等待时间过长或占用资源过多,则降低其优先级;如果高优先级队列为空,则调度低优先级队列。MLFQ 的优点是能够动态适应负载变化,防止低优先级请求饿死;缺点是参数众多(队列数量、优先级转换阈值、时间片长度等),调优复杂。
| 算法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| WFQ | 精确权重控制,公平性高 | 计算开销大,需要维护状态 | 差异化服务,需要严格公平 |
| 令牌桶 | 实现简单,无状态,开销低 | 不保证延迟,可能短时突发 | 吞吐量优先,灵活限制速率 |
| MLFQ | 动态适应,防止饿死 | 参数调优复杂,行为难预测 | 混合负载,高峰低谷变化大 |
在实际生产环境中,我们推荐混合使用三种算法:高优先级队列使用严格优先级确保关键业务;同优先级队列使用 WFQ 保证公平;BATCH 队列使用令牌桶限制总体吞吐量。这种组合能够在保障核心业务的同时,实现资源的高效利用。
5.4 调度器实现
调度器的核心实现需要考虑线程安全、高性能和可扩展性。以下是一个简化但完整的调度器实现:
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import threading
import time
@dataclass
class Request:
"""调度请求单元"""
request_id: str
tenant_id: str
user_id: str
priority: int # 0-4,越小越高
created_at: float = field(default_factory=time.time)
wait_time: float = 0.0 # 累计等待时间
model: str = "default"
payload: dict = field(default_factory=dict)
class Scheduler:
"""
多级优先级调度器
支持:
- 严格优先级调度(高优先级先于低优先级)
- 饥饿预防(等待过长的低优先级请求提升)
- 租户级公平(同一优先级内按权重轮询)
"""
def __init__(self):
# priority -> deque of requests
self.queues: dict[int, deque] = {
0: deque(), # CRITICAL
1: deque(), # HIGH
2: deque(), # NORMAL
3: deque(), # LOW
4: deque(), # BATCH
}
self._lock = threading.RLock()
self._tenant_last_served: dict[str, float] = {} # 租户上次服务时间
self._current_priority = 0
def enqueue(self, request: Request) -> bool:
"""将请求加入调度队列"""
with self._lock:
if request.priority not in self.queues:
request.priority = 2 # 未知优先级默认为 NORMAL
# 饥饿预防:等待超时的请求提升优先级
now = time.time()
request.wait_time = now - request.created_at
self._boost_priority_if_starved(request)
self.queues[request.priority].append(request)
return True
def next(self) -> Optional[Request]:
"""
获取下一个待调度的请求
调度策略:
1. 严格按优先级顺序遍历
2. 同优先级内按租户轮询,防止单租户饥饿
"""
with self._lock:
# 严格优先级:始终取最高优先级非空队列
for priority in sorted(self.queues.keys()):
queue = self.queues[priority]
if not queue:
continue
# 同优先级内轮询
request = self._fair_select(queue)
if request:
return request
return None # 所有队列为空
def _fair_select(self, queue: deque) -> Optional[Request]:
"""
同优先级内公平选择
使用租户轮询:每个租户按时间顺序轮流服务
"""
if not queue:
return None
# 找到最近最少被服务的租户的请求
best_request = None
best_last_served = float('inf')
for request in queue:
last_served = self._tenant_last_served.get(request.tenant_id, 0)
if last_served < best_last_served:
best_last_served = last_served
best_request = request
if best_request:
queue.remove(best_request)
self._tenant_last_served[best_request.tenant_id] = time.time()
return best_request
def _boost_priority_if_starved(self, request: Request):
"""
饥饿预防:等待时间过长则提升优先级
规则:
- BATCH 等待 > 5分钟 -> LOW
- LOW 等待 > 3分钟 -> NORMAL
- NORMAL 等待 > 2分钟 -> HIGH
- HIGH 等待 > 1分钟 -> CRITICAL
"""
boost_rules = {
4: (180, 3), # BATCH > 3min -> LOW
3: (120, 2), # LOW > 2min -> NORMAL
2: (60, 1), # NORMAL > 1min -> HIGH
1: (30, 0), # HIGH > 30s -> CRITICAL
}
if request.priority in boost_rules:
threshold, new_priority = boost_rules[request.priority]
if request.wait_time > threshold:
old_priority = request.priority
request.priority = new_priority
# 从原队列移除(简化实现,实际需要索引结构加速)
def get_queue_stats(self) -> dict:
"""获取队列统计信息"""
with self._lock:
return {
priority: len(queue)
for priority, queue in self.queues.items()
}
def clear(self, tenant_id: str = None):
"""清空队列,可指定租户"""
with self._lock:
if tenant_id is None:
for q in self.queues.values():
q.clear()
else:
for q in self.queues.values():
q[:] = [r for r in q if r.tenant_id != tenant_id]
调度器的核心是 next() 方法,它决定了下一个被调度的请求。实现中采用了严格优先级加租户级公平的选择策略:首先按优先级高低遍历队列,找到非空队列后,使用 _fair_select() 在该队列内选择最近最少被服务的租户的请求。这种设计既保证了高优先级请求的绝对优先权,又防止了同一优先级内单租户独占的问题。
5.5 饥饿预防机制
饥饿(Starvation)是调度系统中经典的问题,指低优先级请求长时间无法获得调度资源。在多租户场景下,如果高优先级请求持续涌入,低优先级请求可能永远无法被执行。此外,即使没有高优先级请求,如果某些租户的请求特别多,也可能导致同队列内其他租户的请求饿死。为此,我们实现了三重饥饿预防机制:
年龄增幅机制(Age Boost) 是最直接的饥饿预防方式。每个请求有一个"年龄"属性,随着等待时间增长而增加。当年龄超过特定阈值时,调度器自动提升该请求的优先级。具体规则如 5.4 节所示:BATCH 队列请求等待超过 3 分钟自动升级为 LOW,LOW 队列请求等待超过 2 分钟自动升级为 NORMAL,以此类推。通过年龄增幅,长时间等待的低优先级请求最终会获得调度机会。
资源预留机制(Resource Reservation) 是另一种重要的保护手段。调度器为每个优先级保留最低资源份额,即使高优先级队列持续繁忙,也会预留一定比例的资源给低优先级。例如,设定 BATCH 队列最低保障 5% 的调度份额,则在任何情况下,BATCH 请求都能获得至少 5% 的资源。资源预留确保了低优先级业务的基本存活能力,不会完全被饿死。
动态调整机制(Dynamic Adjustment) 是一种更智能的防护方式。调度器持续监控每个租户的调度历史,如果发现某个租户在连续 N 次调度中都是"被抢占"方(每次轮到自己时发现队列为空或被更高优先级抢占),则动态提升该租户请求的优先级。例如,如果租户 A 的请求连续 3 次被租户 B 的更高优先级请求抢占,则租户 A 的下一次请求临时获得 +1 优先级提升。
三种机制相互配合:年龄增幅防止请求本身等待过久;资源预留防止低优先级队列完全饿死;动态调整防止特定租户被其他租户持续抢占。通过这些机制,系统能够在保证高优先级服务质量的同时,为低优先级请求提供基本的存活保障。
5.6 调度与模型路由的协同
调度器和模型路由层是多租户 LLM 系统的两大核心引擎,它们虽然职责不同,但紧密协作共同保障系统的整体效率和稳定性。理解两者的关系和协同方式,是设计高效系统的关键。
调度器决定"谁先调用"。当多个租户的请求同时到达时,调度器根据优先级、等待时间、租户权重等因素决定请求的调度顺序。调度器不关心请求会被哪个模型处理,只关心谁应该先获得处理机会。例如,租户 A 的 HIGH 优先级请求和租户 B 的 NORMAL 优先级请求同时到达,调度器会让租户 A 的请求先被调度。
路由层决定"用哪个模型调用"。当调度器将请求交给路由层时,路由层根据模型选择策略(主模型优先、Fallback 链、成本策略等)决定使用哪个模型处理。例如,一个简单问答请求可能被路由到轻量的 Fallback 模型,而一个复杂推理请求则必须使用主模型。
两者结合:优先级引导资源分配。高优先级请求不仅在调度顺序上优先,还会优先获得优质资源。在模型路由层面,高优先级请求通常被配置为优先使用主模型,而低优先级请求可能被降级到 Fallback 模型甚至直接返回降级响应。这种优先级引导的资源分配确保了核心用户获得最佳体验。
协同工作的流程如下:请求到达后首先进入调度队列等待;当请求被调度时,调度器将其交给路由层;路由层根据该请求的优先级和策略选择模型;模型调用完成后,请求被释放并记录完成时间;调度器更新队列统计,准备调度下一个请求。在这个流程中,调度器和路由层通过请求对象传递信息,紧密配合完成整个请求的生命周期管理。
六、熔断、降级与限流体系
6.1 多层级保护架构
在超大用户多租户场景下,系统的稳定性面临着多重挑战:LLM API 的不稳定性和速率限制、突发流量导致的资源争抢、单一租户异常导致的级联故障、以及系统自身的容量瓶颈。面对这些挑战,单一的限流机制远远不够,我们需要构建一套多层级、相互配合的保护体系,在每个层级设置专门的防护手段,确保即使某一层防护失效,也能被下一层兜底。
我们的多层级保护架构分为四层,从外到内依次是:接入限流层、调度限流层、模型限流层、系统限流层。这四层形成层层递进的保护体系,每一层都有明确的职责范围和防护目标,互不干扰但又相互配合。
接入限流层(Access Rate Limiting) 是系统的最外层大门,负责控制所有进入系统的请求。接入限流的目的是防止突发流量冲垮系统,常见的限流维度包括全局请求数、单个 IP 的请求频率、单个租户的并发连接数等。接入限流在请求进入系统的最早阶段进行拦截,开销最小,能够快速丢弃明显恶意的过量请求。
调度限流层(Scheduling Rate Limiting) 位于调度器和队列之间,负责控制进入调度队列的请求速率。调度限流的目的是在调度层面实现更精细的流量控制,结合优先级和配额制度,对不同类型的请求进行差异化的速率限制。调度限流还负责维护各租户的配额余额,防止单一租户过度消耗调度资源。
模型限流层(Model Rate Limiting) 位于模型路由层和 Credential Pool 之间,负责控制对各个模型和 Credential 的请求速率。模型限流是保障 LLM API 稳定性的关键环节,需要根据各模型的 RPM/TPM 限制和当前负载,动态调整请求的分发速率。当某个模型的负载过高或可用 Credential 不足时,模型限流层会触发排队或降级机制。
系统限流层(System Rate Limiting) 是最后一层防线,负责监控系统的整体健康状态。当系统 CPU、内存、网络、连接数等资源接近上限时,系统限流层会主动拒绝新的请求或将其放入等待队列。系统限流是保护服务器本身不发生过载的最后屏障。
请求 → [接入限流] → [调度限流] → [模型限流] → [系统限流] → LLM
↓ ↓ ↓ ↓
全局限流 租户配额 Credential 资源保护
IP 限流 优先级调度 限流池
6.2 接入层限流
接入层限流是多租户系统的第一道防线,它在请求进入系统的最早阶段进行流量控制。接入层限流的核心目标是防止突发流量、恶意攻击和资源滥用,同时保障合法请求的公平接入。
我们采用令牌桶算法实现接入层限流,令牌桶算法相比滑动窗口算法具有更好的突发处理能力。令牌桶的基本思想是:系统以固定速率向桶中添加令牌,桶有最大容量,当桶满时新令牌被丢弃;请求需要获取令牌才能通过,没有令牌则被拒绝或排队。
以下是接入层限流的核心实现:
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
class TokenBucket:
"""
令牌桶实现
参数:
rate: 每秒添加的令牌数
capacity: 桶的最大容量
"""
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self._tokens = capacity
self._last_refill = time.time()
self._lock = threading.Lock()
def _refill(self):
"""补充令牌"""
now = time.time()
elapsed = now - self._last_refill
new_tokens = elapsed * self.rate
self._tokens = min(self.capacity, self._tokens + new_tokens)
self._last_refill = now
def try_consume(self, tokens: int = 1) -> bool:
"""尝试消耗令牌,返回是否成功"""
with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
class AccessRateLimiter:
"""
接入层限流器
实现三层限流:
1. 全局限流:限制系统总请求量
2. 租户级限流:限制每个租户的请求量
3. 用户级限流:限制每个用户的并发数
"""
def __init__(
self,
global_rate: float = 10000,
global_capacity: int = 15000,
tenant_default_rate: float = 1000,
tenant_default_capacity: int = 1500,
):
# 全局限流器
self.global_limiter = TokenBucket(
rate=global_rate,
capacity=global_capacity
)
# 租户级限流器字典
self.tenant_limiters: dict[str, TokenBucket] = {}
self.tenant_default_rate = tenant_default_rate
self.tenant_default_capacity = tenant_default_capacity
# 用户级并发控制(使用计数器而非令牌桶)
self.user_counters: dict[str, int] = {}
self.user_max_concurrency = 10
self._lock = threading.RLock()
def allow(self, tenant_id: str, user_id: str, tokens: int = 1) -> tuple[bool, str]:
"""
检查请求是否允许通过
返回:
(是否允许, 拒绝原因) 元组
"""
# 第一层:全局限流
if not self.global_limiter.try_consume(tokens):
return False, "GLOBAL_RATE_LIMITED"
# 第二层:租户级限流
with self._lock:
if tenant_id not in self.tenant_limiters:
self.tenant_limiters[tenant_id] = TokenBucket(
rate=self.tenant_default_rate,
capacity=self.tenant_default_capacity,
)
tenant_limiter = self.tenant_limiters[tenant_id]
if not tenant_limiter.try_consume(tokens):
return False, "TENANT_RATE_LIMITED"
# 第三层:用户级并发限制
with self._lock:
user_key = f"{tenant_id}:{user_id}"
current = self.user_counters.get(user_key, 0)
if current >= self.user_max_concurrency:
return False, "USER_CONCURRENCY_LIMITED"
self.user_counters[user_key] = current + 1
return True, ""
def release(self, tenant_id: str, user_id: str):
"""释放用户并发计数"""
with self._lock:
user_key = f"{tenant_id}:{user_id}"
if user_key in self.user_counters:
self.user_counters[user_key] = max(
0, self.user_counters[user_key] - 1
)
def set_tenant_limit(
self, tenant_id: str, rate: float, capacity: int
):
"""为指定租户设置自定义限流参数"""
with self._lock:
self.tenant_limiters[tenant_id] = TokenBucket(
rate=rate,
capacity=capacity,
)
def get_stats(self) -> dict:
"""获取限流统计信息"""
with self._lock:
return {
"global_tokens": self.global_limiter._tokens,
"tenant_count": len(self.tenant_limiters),
"user_concurrency": self.user_counters.copy(),
}
这个实现提供了三层限流保护:全局限流保护系统不被突发流量冲垮;租户级限流防止单一租户消耗过多资源;用户级并发限制防止单个用户占用过多连接。每层限流都有明确的拒绝原因,便于后续的监控和问题排查。
6.3 模型层熔断器
模型层熔断器是保障 LLM 服务高可用的核心组件。在多租户场景下,LLM API 的不稳定性和速率限制是常态,而非异常。如果系统不能正确处理这些情况,轻则导致部分请求失败,重则导致整个系统雪崩。熔断器模式正是为了解决这类问题而设计的。
熔断器的基本原理类似电路保险丝:当电流过大时,保险丝熔断,切断电路,保护电器不被烧毁。在软件系统中,当某个组件(通常是远程服务)的错误率或延迟超过阈值时,熔断器"熔断",后续请求快速失败而不是等待超时;一段时间后,熔断器进入"半开"状态,允许少量请求通过试探,如果成功则恢复正常,否则继续熔断。
我们的熔断器实现支持三种状态:
CLOSE 状态(正常) 是熔断器的默认状态。此时请求正常调用后端服务,熔断器持续监控失败率、错误率和延迟。当任一指标超过阈值时,熔断器转换到 OPEN 状态。
OPEN 状态(熔断) 时,所有请求立即失败,不再调用后端服务。熔断器会启动一个超时计时器,当超时到期时,转换到 HALF_OPEN 状态。
HALF_OPEN 状态(探活) 时,熔断器允许少量请求通过(通常是 1-3 个),用于探测后端服务是否恢复。如果探活请求成功率达到阈值(如 50%),则转换回 CLOSE 状态;否则重新转换到 OPEN 状态。
以下是熔断器的核心实现:
import threading
import time
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any, Optional
import logging
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitOpenError(Exception):
"""熔断器打开时抛出的异常"""
def __init__(self, message: str = "Circuit is open"):
self.message = message
super().__init__(self.message)
@dataclass
class CircuitBreakerConfig:
"""熔断器配置"""
failure_threshold: int = 5 # 连续失败次数阈值
success_threshold: int = 2 # 半开状态下成功次数阈值
timeout: float = 60.0 # 熔断持续时间(秒)
half_max_calls: int = 3 # 半开状态允许的最大调用数
error_rate_threshold: float = 0.5 # 错误率阈值(0.0-1.0)
latency_threshold_ms: float = 5000 # 延迟阈值(毫秒)
class CircuitBreaker:
"""
熔断器实现
支持三种状态转换:
CLOSED -> OPEN: 连续失败达到阈值
OPEN -> HALF_OPEN: 超时到期
HALF_OPEN -> CLOSED: 成功次数达到阈值
HALF_OPEN -> OPEN: 探活失败
"""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._half_open_calls = 0
self._lock = threading.RLock()
self._logger = logging.getLogger(f"CircuitBreaker.{name}")
@property
def state(self) -> CircuitState:
"""获取当前状态(带超时检查)"""
with self._lock:
if self._state == CircuitState.OPEN:
if self._last_failure_time is None:
return self._state
elapsed = time.time() - self._last_failure_time
if elapsed >= self.config.timeout:
self._logger.info(
f"Circuit {self.name}: OPEN -> HALF_OPEN (timeout)"
)
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
self._success_count = 0
return self._state
def call(self, func: Callable[[], Any]) -> Any:
"""
通过熔断器执行函数
Args:
func: 要执行的函数
Returns:
函数的返回值
Raises:
CircuitOpenError: 熔断器打开时
Exception: 函数执行失败时
"""
current_state = self.state
if current_state == CircuitState.OPEN:
self._logger.warning(
f"Circuit {self.name} is OPEN, rejecting call"
)
raise CircuitOpenError(f"Circuit {self.name} is open")
if current_state == CircuitState.HALF_OPEN:
with self._lock:
if self._half_open_calls >= self.config.half_max_calls:
raise CircuitOpenError(
f"Circuit {self.name} half_open max calls reached"
)
self._half_open_calls += 1
start_time = time.time()
try:
result = func()
latency = (time.time() - start_time) * 1000
self._on_success(latency)
return result
except Exception as e:
latency = (time.time() - start_time) * 1000
self._on_failure(latency)
raise
def _on_success(self, latency_ms: float):
"""记录成功调用"""
with self._lock:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
self._logger.info(
f"Circuit {self.name}: HALF_OPEN success "
f"({self._success_count}/{self.config.success_threshold})"
)
if self._success_count >= self.config.success_threshold:
self._logger.info(
f"Circuit {self.name}: HALF_OPEN -> CLOSED"
)
self._state = CircuitState.CLOSED
self._success_count = 0
def _on_failure(self, latency_ms: float):
"""记录失败调用"""
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
self._logger.warning(
f"Circuit {self.name}: failure {self._failure_count} "
f"(latency={latency_ms:.0f}ms)"
)
if self._state == CircuitState.HALF_OPEN:
self._logger.warning(
f"Circuit {self.name}: HALF_OPEN -> OPEN (探活失败)"
)
self._state = CircuitState.OPEN
self._half_open_calls = 0
elif self._failure_count >= self.config.failure_threshold:
self._logger.warning(
f"Circuit {self.name}: CLOSED -> OPEN "
f"(连续{self._failure_count}次失败)"
)
self._state = CircuitState.OPEN
def reset(self):
"""手动重置熔断器"""
with self._lock:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._half_open_calls = 0
self._logger.info(f"Circuit {self.name}: reset to CLOSED")
def get_stats(self) -> dict:
"""获取熔断器状态统计"""
with self._lock:
return {
"name": self.name,
"state": self.state.value,
"failure_count": self._failure_count,
"success_count": self._success_count,
"last_failure_time": self._last_failure_time,
}
熔断器通过持续监控调用结果来自动判断后端服务的健康状态。当后端服务出现故障(如连续失败、响应超时、错误率升高等)时,熔断器会迅速切断流量,防止故障扩散;当后端服务恢复后,熔断器会自动逐步恢复流量,确保后端服务不会被瞬间涌入的流量再次压垮。
6.4 降级策略
降级策略是系统在部分组件不可用时的兜底方案。降级的核心理念是"保核心、去非核心",在系统压力增大或部分组件故障时,主动放弃部分功能或质量,换取系统整体的可用性和稳定性。
我们的降级策略分为四个层次,从重到轻依次是:功能降级、模型降级、响应降级、数据降级。这四种降级策略层层递进,可以单独使用,也可以组合使用,形成完整的降级体系。
功能降级 是最激进的降级方式,直接关闭某些非核心功能。例如,在系统压力过大时,可以暂时关闭"历史对话摘要生成"、"知识库检索增强"等辅助功能,只保留核心的对话生成能力。功能降级通过配置开关实现,当系统恢复正常时,可以快速恢复功能。
模型降级 是最常用的降级方式,当主模型不可用或负载过高时,自动切换到轻量级的 Fallback 模型。例如,主模型 MiniMax-M2.7-highspeed 不可用时,切换到 DeepSeek-V3;DeepSeek 也不可用时,切换到更轻量的 text-davinci-003。模型降级会导致一定的质量下降,但能保障服务的连续性。
响应降级 是更轻量的降级方式,当所有模型都不可用时,返回预设的降级响应。例如,返回"当前服务繁忙,请稍后再试"的友好提示,或者返回之前缓存的相似问题的回答。响应降级不消耗 LLM 资源,但用户体验会有明显下降。
数据降级 是一种精细化的降级方式,通过减少返回数据的字段来降低 Token 消耗和响应时间。例如,在高峰期限制返回的上下文长度,或者只返回回答的核心内容而不包含详细解释。数据降级可以在一定程度上缓解资源压力,同时保持服务的可用性。
class DegradationManager:
"""
降级管理器
负责管理各降级策略的状态和切换
"""
def __init__(self):
self._degradation_levels = {
"function": "normal", # normal | degraded | disabled
"model": "primary", # primary | fallback | minimal
"response": "full", # full | cached | static
"data": "complete", # complete | reduced | minimal
}
self._lock = threading.RLock()
def set_level(self, category: str, level: str):
"""设置指定类别的降级级别"""
with self._lock:
if category in self._degradation_levels:
old_level = self._degradation_levels[category]
self._degradation_levels[category] = level
logging.info(
f"Degradation: {category} {old_level} -> {level}"
)
def get_level(self, category: str) -> str:
"""获取指定类别的当前降级级别"""
with self._lock:
return self._degradation_levels.get(category, "normal")
def should_use_fallback_model(self) -> bool:
"""判断是否应使用 Fallback 模型"""
return self.get_level("model") != "primary"
def should_use_cache(self) -> bool:
"""判断是否应使用缓存响应"""
return self.get_level("response") != "full"
def get_context_limit(self) -> int:
"""获取当前上下文长度限制"""
level = self.get_level("data")
limits = {
"complete": 128000,
"reduced": 32000,
"minimal": 8000,
}
return limits.get(level, 128000)
降级策略的触发条件通常与熔断器联动:当熔断器打开时,触发相应的降级策略;当熔断器恢复时,逐步退出降级。降级和熔断的联动形成了完整的"保护-降级-恢复"闭环,确保系统在各种异常情况下都能保持可用。
6.5 系统级限流
系统级限流是整个保护体系的最后一道防线,它监控的是服务器本身的资源使用状况,而不是业务层面的指标。当服务器的 CPU、内存、网络、连接数等资源接近上限时,系统限流会主动干预,防止服务器过载。
系统级限流的核心是资源监控和自保护机制。我们采用以下策略实现系统级限流:
基于负载的限流(Load-based Throttling) 监控系统的关键指标,包括 CPU 使用率、内存使用率、磁盘 I/O、网络带宽等。当这些指标超过预设阈值时,触发限流。例如,当 CPU 使用率超过 80% 时,开始限制新请求的进入;当 CPU 超过 90% 时,大幅减少新请求进入。负载限流可以有效防止系统过载导致的级联故障。
基于容量的限流(Capacity-based Throttling) 监控系统的整体处理能力,包括当前处理的请求数、队列长度、连接池使用率等。当容量指标接近上限时,触发限流。例如,当全局并发数超过 1000 时,开始限流;当队列长度超过 500 时,拒绝新请求。容量限流可以防止突发流量导致的瞬时过载。
自适应限流(Adaptive Throttling) 是一种更智能的限流方式,根据系统的实时状态动态调整限流参数。例如,在系统低负载时可以放宽限流阈值,允许更多请求进入;在系统高负载时收紧限流阈值,保护系统不被压垮。自适应限流通过持续监控系统指标,自动学习最优的限流参数。
限流触发后的处理策略有三种:排队(Queue) 将请求放入队列等待处理,适用于短时过载的情况;拒绝(Reject) 直接拒绝请求并返回错误,适用于持续过载的情况;降级(Degrade) 将请求引导到降级路径,返回降级响应,适用于极端情况。三种策略可以组合使用,形成完整的降级体系。
import psutil
import os
class SystemRateLimiter:
"""
系统级限流器
基于系统资源使用情况进行限流
"""
def __init__(
self,
cpu_threshold: float = 80.0,
memory_threshold: float = 85.0,
max_concurrent: int = 1000,
max_queue_length: int = 500,
):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.max_concurrent = max_concurrent
self.max_queue_length = max_queue_length
self._current_concurrent = 0
self._current_queue_length = 0
self._lock = threading.Lock()
# 获取当前进程
self._process = psutil.Process(os.getpid())
def _get_system_metrics(self) -> dict:
"""获取系统指标"""
return {
"cpu_percent": self._process.cpu_percent(interval=0.1),
"memory_percent": self._process.memory_percent(),
"num_threads": self._process.num_threads(),
}
def should_accept(self, queue_length: int = 0) -> tuple[bool, str]:
"""
判断是否接受新请求
Returns:
(是否接受, 拒绝原因)
"""
metrics = self._get_system_metrics()
# 检查 CPU
if metrics["cpu_percent"] >= self.cpu_threshold:
return False, f"CPU_HIGH ({metrics['cpu_percent']:.1f}%)"
# 检查内存
if metrics["memory_percent"] >= self.memory_threshold:
return False, f"MEMORY_HIGH ({metrics['memory_percent']:.1f}%)"
# 检查并发数
with self._lock:
if self._current_concurrent >= self.max_concurrent:
return False, f"CONCURRENT_LIMIT ({self._current_concurrent})"
# 检查队列长度
if queue_length > self.max_queue_length:
return False, f"QUEUE_FULL ({queue_length})"
return True, ""
def increment_concurrent(self):
"""增加当前并发计数"""
with self._lock:
self._current_concurrent += 1
def decrement_concurrent(self):
"""减少当前并发计数"""
with self._lock:
self._current_concurrent = max(
0, self._current_concurrent - 1
)
def get_stats(self) -> dict:
"""获取限流统计"""
metrics = self._get_system_metrics()
with self._lock:
return {
"current_concurrent": self._current_concurrent,
"max_concurrent": self.max_concurrent,
**metrics,
}
6.6 限流配置示例
以下是完整的限流配置示例,展示了如何配置各层限流参数:
rate_limiting:
# 接入层全局限流
global:
rate: 10000 # 每秒10000请求
capacity: 15000 # 桶容量(允许一定突发)
# 租户级限流(按套餐等级)
per_tenant:
basic:
rate: 100 # 每秒100请求
capacity: 150 # 允许小规模突发
premium:
rate: 1000 # 每秒1000请求
capacity: 1500
enterprise:
rate: 5000 # 每秒5000请求
capacity: 7500
# 用户级并发限制
per_user:
max_concurrency: 10 # 单用户最大并发连接数
# 模型级限流
per_model:
MiniMax-M2.7-highspeed:
rate: 30 # 单模型最大并发
per_credential:
key_1: 20 # 各个 credential 的并发限制
key_2: 20
key_3: 15
DeepSeek-V3:
rate: 20
per_credential:
ds_key_1: 15
ds_key_2: 15
GPT-4:
rate: 10
per_credential:
openai_key_1: 10
# 系统级限流
system:
cpu_threshold: 80 # CPU 80% 开始限流
memory_threshold: 85 # 内存 85% 开始限流
max_concurrent: 1000 # 全局最大并发
max_queue_length: 500 # 最大队列长度
# 降级策略配置
degradation:
cpu_threshold_for_degrade: 90 # CPU 90% 触发降级
model_fallback_threshold: 3 # 连续失败3次切换模型
cache_ttl_seconds: 300 # 缓存响应有效期
这套配置覆盖了限流的各个层面,从全局到租户到用户,从模型到系统,形成完整的限流体系。配置参数可以根据实际负载情况进行调整,建议在生产环境部署前进行充分的压测和调优。
七、成本控制与配额管理
在超大规模多租户场景下,LLM 调用成本是企业运营的主要成本来源。LLM API 采用按 Token 计费的模式,成本与请求量、输入输出长度、模型选择直接相关。在数千租户、每日百万级请求的规模下,精细化的成本控制不仅能有效降低运营支出,更能通过配额机制保障公平性、避免单一租户耗尽全部资源。因此,我们设计了一套完整的成本控制与配额管理体系,覆盖从预算制定、配额分配、调用计费、到成本优化的全链路。
7.1 成本控制框架
成本控制是整个费用管理体系的总纲,它定义了成本控制的组织结构、控制层次和可视化维度。合理的成本控制框架能够帮助企业在保证服务质量的前提下,最大限度地降低 LLM 调用成本,提高投入产出比。
为什么成本控制重要:LLM 调用成本是主要成本来源
在 LLM 应用中,API 调用费用通常占据总成本的 60%-80%。不同于传统的 CPU 或存储成本,LLM 的成本具有以下显著特点:
与业务量线性相关:每一次对话、每一次生成都需要消耗 Token,业务量增长直接导致成本线性增长。在多租户场景下,如果缺乏有效控制,单个租户的异常流量可能迅速耗尽整个系统的预算。
单价差异巨大:不同模型的单价差异可达数十倍。MiniMax-M2.7-highspeed 的单价约为 0.1 元/千 Token,而 text-embedding-3-small 仅需 0.002 元/千 Token,相差 50 倍。通过合理的模型路由,可以将成本降低数倍而不影响核心体验。
隐性成本风险:超时重试、错误响应、缓存失效等因素可能导致额外的 Token 消耗。如果不加以控制,这些隐性成本可能占据总成本的 10%-20%。
因此,成本控制不是简单地"省钱",而是在保证服务质量和用户体验的前提下,实现成本与收益的最优平衡。
成本控制四层:预算层 → 配额层 → 路由层 → 计量层
我们的成本控制体系分为四个层次,自上而下形成完整的控制闭环:
第一层:预算层(Budget Layer) 是成本控制的上限约束。预算层在系统级别和租户级别分别设定成本上限。系统级预算防止整体支出超过财务承受范围;租户级预算确保各租户的费用在其套餐范围内,防止无限透支。预算层还负责月度、季度的预算分配和调整。
第二层:配额层(Quota Layer) 是成本控制的分配机制。配额层将预算拆分为可管理的配额单元,按租户、用户、时间等维度进行分配。配额层实现了软配额(警告)和硬配额(阻断)的双重机制,确保资源分配公平合理。配额还支持流转机制,某个租户未使用的配额可以共享给其他需要的租户。
第三层:路由层(Routing Layer) 是成本控制的执行环节。路由层根据成本策略决定请求应该使用哪个模型。对于简单任务,路由层会优先选择低单价模型;对于敏感任务,路由层会选择质量优先的高单价模型。路由层的成本感知能力是成本控制的关键。
第四层:计量层(Metering Layer) 是成本控制的数据基础。计量层负责精确记录每一次 LLM 调用的用量和费用,按租户、用户、模型、时间等维度进行聚合统计。计量数据是预算制定、配额调整、成本优化的数据依据,也是向租户提供账单和报告的基础。
预算层(上限约束)
↓ 分配预算
配额层(资源分配)
↓ 检查配额
路由层(智能选型)
↓ 选择模型
计量层(精确计量)
↓ 记录用量
报表层(成本分析)
成本可视化:按租户/用户/模型/时间维度统计
成本可视化的核心是建立多维度的成本分析体系,让运营人员能够从不同角度审视成本构成,发现异常和优化空间。
租户维度:查看每个租户的累计成本、当月成本、每日成本趋势。租户维度帮助识别高成本租户和异常消费模式。
用户维度:查看每个租户下各用户的 Token 消耗和费用分摊。用户维度支持租户内部的成本分摊和绩效考核。
模型维度:查看各模型(主模型、Fallback、辅助模型)的费用占比和趋势。模型维度帮助评估模型选择策略的效果,指导路由优化。
时间维度:按小时/日/周/月查看成本变化趋势,识别业务高峰和成本高峰。时间维度帮助进行成本预测和预算规划。
成本可视化通过数据仪表板呈现,核心指标包括:当日成本、月度累计成本、成本环比增长率、预算消耗进度、预测月末成本等。当成本指标出现异常(如单日成本环比增长超过 50%)时,系统自动触发告警,通知运营人员排查。
7.2 租户配额体系
租户配额体系是成本控制的核心机制,它决定了每个租户能够消耗多少资源,以及超配额时的处理策略。一个设计良好的配额体系既能防止资源滥用,保障公平性,又能提供足够的弹性满足正常业务需求。
配额设计原则:软配额(警告)+ 硬配额(阻断)
我们采用软配额和硬配额相结合的双重机制:
软配额(Soft Quota) 是警告阈值,当配额使用达到软配额时,系统会发出警告但不会阻断请求。软配额通常设置为硬配额的 80%-90%,给租户足够的时间调整用量或升级套餐。软配额触发时,系统会通过企业微信、邮件等渠道通知租户管理员,提醒其关注用量情况。
硬配额(Hard Quota) 是绝对的资源上限,当配额耗尽时,新的 LLM 请求会被拒绝或降级。硬配额确保了资源的严格隔离,防止单一租户耗尽全部资源。硬配额耗尽时,系统返回明确的错误码(如 QUOTA_EXCEEDED),租户管理员可以申请临时扩容或等待配额重置。
软硬双配额的设计在严格控制和用户体验之间取得平衡:硬配额保障了系统的稳定性和公平性;软配额提供了人性化的预警机制,避免租户因突发流量而措手不及。
配额维度:日配额、月配额、累计配额
配额按时间维度分为三个层次:
日配额(Daily Quota):单日允许消耗的最大 Token 数或费用。日配额防止租户在短时间内产生巨额费用,是应对异常流量的第一道防线。日配额每日零点重置(北京时间)。
月配额(Monthly Quota):单月允许消耗的最大 Token 数或费用。月配额是租户套餐的核心约束,决定了租户每月能够享受的服务量。月配额每月第一天重置。
累计配额(Total Quota):账户级别的总消耗上限,通常用于预付费场景。累计配额耗尽意味着账户余额不足,需要充值后才能继续使用。
三个维度的配额相互关联:日配额 ≤ 月配额 ÷ 天数 × 系数(通常为 2-3 倍,允许短期突发);月配额 ≤ 累计配额。系统在任何时刻都会同时检查三个维度的配额,任一配额耗尽都会触发限流。
配额分配算法:按套餐等级分配,剩余可流转
配额分配算法决定了每个租户能够获得的资源量,我们采用"套餐等级 + 动态调整"的分配模式:
套餐等级分配:根据租户的套餐等级(免费版、标准版、企业版、高级企业版等)确定基础配额。各等级的大致配额范围如下:
| 套餐等级 | 日 Token 配额 | 月 Token 配额 | 可用模型 |
|---|---|---|---|
| 免费试用 | 10,000 | 300,000 | Fallback 池 |
| 标准版 | 100,000 | 3,000,000 | 主模型 + Fallback |
| 企业版 | 1,000,000 | 30,000,000 | 全部模型 |
| 高级企业版 | 10,000,000 | 300,000,000 | 全部模型 + 优先调度 |
剩余流转机制:租户本月未使用的配额可以部分流转到下个月,但流转比例有限(通常为 50%),且流转配额优先于月配额使用。流转机制既奖励节约用量的租户,又防止无限累积。
动态调整:运营人员可以根据租户的实际消费情况手动调整配额。高增长租户可以获得临时扩容;异常消费租户可以降低配额进行约束。
超配额处理:排队等待 / 降级处理 / 通知用户
当租户请求时发现配额不足,系统有三种处理策略:
排队等待(Queue):将请求放入优先级队列,等待配额重置(如下一分钟日配额重置)或前序请求完成后处理。排队策略适用于短期配额紧张的情况,用户体验较好但响应时间不确定。
降级处理(Degrade):将请求降级到低成本的模型或返回降级响应。例如,对话请求降级为关键词提取,返回简洁的关键词列表而非完整对话。降级策略保持了服务的可用性,但牺牲了部分功能。
通知用户(Notify):直接向用户返回配额不足的错误提示,引导用户等待配额重置、升级套餐或联系销售。通知策略最为直接,适用于配额严重不足的场景。
三种策略可以组合使用:优先降级,降级不可行时排队,排队超时后通知。例如,当检测到配额不足时,首先尝试使用辅助模型处理;如果辅助模型也耗尽配额,则进入队列等待 30 秒;30 秒后仍无法处理则返回明确的配额不足提示。
7.3 Token 计量与计费
Token 计量是成本控制的数据基础,每一次 LLM 调用都必须被精确记录,才能实现准确计费和成本分析。我们设计了一套完整的 Token 计量和计费体系。
Token 计量架构
Token 计量在路由层执行,每次模型调用完成后,系统记录以下信息:
请求元数据:租户 ID、用户 ID、模型名称、提供商、请求 ID、调用时间。
用量数据:输入 Token 数(prompt_tokens)、输出 Token 数(completion_tokens)、总 Token 数(total_tokens)。
质量数据:响应延迟(latency_ms)、是否成功(success)、错误类型(error_type,如超时、限流、服务器错误)。
成本数据:根据用量和单价计算的本次调用费用。
计量数据首先写入内存缓冲区(用于实时聚合),然后定期持久化到数据库(用于历史分析和账单生成)。这种设计避免了频繁的数据库写入影响性能,同时保证了数据的持久化。
计费核心类实现
class TokenMeter:
"""
Token 用量计费核心类
负责:
1. 记录每次调用的用量和费用
2. 按租户、用户、模型等维度聚合统计数据
3. 生成计费记录和账单
"""
def __init__(self, price_table: dict[str, dict[str, float]]):
"""
初始化计量器
参数:
price_table: 价格表,格式为
{
"MiniMax-M2.7-highspeed": {"input": 0.1, "output": 0.1},
"DeepSeek-V3": {"input": 0.05, "output": 0.05},
"text-embedding-3-small": {"input": 0.002, "output": 0.002},
...
}
"""
self.price_table = price_table
self._cache = {} # 租户用量缓存
self._lock = threading.RLock()
def measure(self, model: str, usage: Usage) -> CostRecord:
"""
计量一次调用的用量并生成计费记录
参数:
model: 模型名称
usage: 用量信息,包含 prompt_tokens、completion_tokens 等
返回:
CostRecord: 计费记录
"""
# 获取模型单价
tier = getattr(usage, 'tier', 'standard') # 支持差异化定价
unit_price = self._get_unit_price(model, tier)
# 计算费用:输入和输出分别计费
prompt_cost = usage.prompt_tokens / 1000 * unit_price["input"]
completion_cost = usage.completion_tokens / 1000 * unit_price["output"]
total_cost = prompt_cost + completion_cost
# 生成计费记录
record = CostRecord(
tenant_id=usage.tenant_id,
user_id=usage.user_id,
model=model,
provider=usage.provider,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
prompt_cost=prompt_cost,
completion_cost=completion_cost,
total_cost=total_cost,
latency_ms=usage.latency_ms,
success=usage.success,
timestamp=now(),
)
# 更新缓存统计
self._update_cache(record)
return record
def _get_unit_price(self, model: str, tier: str) -> dict:
"""获取模型单价,支持模型和套餐双重维度"""
if model in self.price_table:
model_prices = self.price_table[model]
if tier in model_prices:
return model_prices[tier]
elif "standard" in model_prices:
return model_prices["standard"]
# 默认单价(未配置的模型使用较高单价防止逃费)
return {"input": 0.2, "output": 0.2}
def _update_cache(self, record: CostRecord):
"""更新租户用量缓存(用于实时聚合)"""
key = f"{record.tenant_id}:{record.model}:{record.timestamp.date()}"
with self._lock:
if key not in self._cache:
self._cache[key] = {
"tenant_id": record.tenant_id,
"model": record.model,
"date": record.timestamp.date(),
"prompt_tokens": 0,
"completion_tokens": 0,
"total_cost": 0.0,
"request_count": 0,
}
self._cache[key]["prompt_tokens"] += record.prompt_tokens
self._cache[key]["completion_tokens"] += record.completion_tokens
self._cache[key]["total_cost"] += record.total_cost
self._cache[key]["request_count"] += 1
def get_tenant_daily_usage(self, tenant_id: str, date: date) -> dict:
"""获取指定租户指定日期的用量汇总"""
result = {
"tenant_id": tenant_id,
"date": date,
"total_prompt_tokens": 0,
"total_completion_tokens": 0,
"total_cost": 0.0,
"total_requests": 0,
"by_model": {},
}
prefix = f"{tenant_id}:"
with self._lock:
for key, data in self._cache.items():
if key.startswith(prefix) and data["date"] == date:
result["total_prompt_tokens"] += data["prompt_tokens"]
result["total_completion_tokens"] += data["completion_tokens"]
result["total_cost"] += data["total_cost"]
result["total_requests"] += data["request_count"]
model = data["model"]
if model not in result["by_model"]:
result["by_model"][model] = {
"prompt_tokens": 0,
"completion_tokens": 0,
"cost": 0.0,
"requests": 0,
}
result["by_model"][model]["prompt_tokens"] += data["prompt_tokens"]
result["by_model"][model]["completion_tokens"] += data["completion_tokens"]
result["by_model"][model]["cost"] += data["total_cost"]
result["by_model"][model]["requests"] += data["request_count"]
return result
计费数据持久化
计量数据需要持久化到数据库以支持账单生成、成本分析和历史查询。我们采用以下持久化策略:
实时写入缓冲区:每次调用后,计量数据写入内存缓冲区(ConcurrentHashMap)。
定期批量持久化:每分钟将缓冲区数据批量写入数据库,避免频繁写入影响性能。
日终全量聚合:每日凌晨对前一天的用量进行全量聚合计算,生成日统计记录,减少实时查询压力。
长时间归档:超过 90 天的明细数据归档到冷存储(如对象存储),降低热数据存储成本。
7.4 成本优化策略
成本控制不仅依赖配额约束,更需要从源头优化消耗。我们设计了一系列成本优化策略,帮助企业在不降低服务质量的前提下显著降低成本。
Prompt 压缩:减少 prompt token 数量,降低调用成本
Prompt 是输入 Token 的主要来源,优化 Prompt 可以直接降低输入成本。Prompt 压缩的主要方法包括:
模板精简:去除 Prompt 中不必要的修饰词、重复说明、示例等。例如,将"请根据以下内容回答问题,答案要准确、完整、详细"简化为"根据内容回答"。研究表明,精简 Prompt 通常能将输入 Token 减少 15%-30%,而不影响输出质量。
Few-shot 压缩:在使用 Few-shot Learning 时,通过精选示例而非堆砌示例来减少 Token 消耗。通常 2-3 个高质量示例效果优于 10 个低质量示例。
系统 Prompt 提取:将固定的系统配置(如角色设定、响应格式)提取到独立的系统 Prompt,通过缓存机制复用,而非每次请求都重复发送。
输入截断策略:对于超长输入,设置合理的截断策略。例如,保留最新的对话历史(保留最后 N 轮)或截断到最大 Token 限制,确保核心信息不丢失。
def compress_prompt(prompt: str, max_tokens: int, strategy: str = "smart") -> str:
"""
Prompt 压缩
参数:
prompt: 原始 prompt
max_tokens: 最大 token 数限制
strategy: 压缩策略
- "head": 保留开头
- "tail": 保留结尾(默认,用于对话场景)
- "smart": 智能压缩,优先保留关键信息
返回:
压缩后的 prompt
"""
current_tokens = estimate_tokens(prompt)
if current_tokens <= max_tokens:
return prompt
if strategy == "head":
return truncate_from_head(prompt, max_tokens)
elif strategy == "tail":
return truncate_from_tail(prompt, max_tokens)
elif strategy == "smart":
# 智能策略:识别关键段落,优先保留
return smart_truncate(prompt, max_tokens)
缓存复用:相同/相似请求返回缓存结果
对于相同或高度相似的请求,直接返回缓存结果可以完全规避 LLM 调用成本。我们设计了多级缓存体系:
精确缓存:基于请求内容的哈希值(如 SHA256)建立缓存键,适用于完全相同的请求。精确缓存命中率较低但结果绝对可靠,适用于模板化的请求(如固定格式的摘要、翻译)。
语义缓存:基于 embedding 向量相似度判断请求是否"足够相似"。当相似度超过阈值(如 0.95)时,返回缓存结果。语义缓存的命中率显著高于精确缓存,适用于对话场景的上下文复用。
TTL 控制:缓存结果设置有效期(如 5 分钟、1 小时),避免返回过期内容。
class SemanticCache:
"""语义缓存实现"""
def __init__(self, embedding_model: str, similarity_threshold: float = 0.95):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self._cache: dict[str, tuple[ModelResponse, list[float], float]] = {}
self._lock = threading.RLock()
def get_or_compute(
self,
request: ModelRequest,
compute_fn: callable
) -> ModelResponse:
"""
获取缓存或计算新结果
参数:
request: 模型请求
compute_fn: 计算函数,当缓存未命中时调用
返回:
ModelResponse: 响应结果
"""
# 获取请求的 embedding
query_vector = self._embed(request)
with self._lock:
# 查找相似请求
for key, (response, cached_vector, timestamp) in self._cache.items():
if self._is_expired(timestamp):
del self._cache[key]
continue
similarity = cosine_similarity(query_vector, cached_vector)
if similarity >= self.similarity_threshold:
# 缓存命中
response.from_cache = True
response.cache_similarity = similarity
return response
# 缓存未命中,计算新结果
response = compute_fn()
# 写入缓存
self._cache[self._make_key(request)] = (
response,
query_vector,
time.time()
)
return response
模型降级:简单任务自动降级到低单价模型
对于简单明确的任务,使用低单价模型能够显著降低成本而不影响用户体验。模型降级策略的关键是准确识别可降级的任务:
任务复杂度评估:根据请求特征(输入长度、是否包含复杂指令、是否需要推理)评估任务复杂度。简单任务(如简单问答、格式转换)可降级;复杂任务(如代码生成、创意写作)必须使用主模型。
降级实验:对可降级任务先尝试低成本模型,如果结果质量不合格则自动重试主模型。质量校验方法见 3.5 节。
def maybe_downgrade(model_request: ModelRequest) -> str:
"""
评估是否可以将请求降级到低成本模型
返回:
建议的模型名称,如果不应降级则返回 None
"""
# 不可降级的场景
if model_request.force_primary:
return None
# 简单任务识别
if is_simple_task(model_request):
# 检查当前主模型负载
if primary_model.load_factor > 0.7:
return "gpt-3.5-turbo" # 降级到辅助模型
return None
Batch 处理:积攒请求批量调用,享受批量折扣
部分 LLM 提供商(如 OpenAI)提供 Batch API,批量调用可享受折扣。Batch 处理策略将多个请求积攒后一次性提交:
积攒规则:积攒 N 个请求或等待 T 秒(哪个先到触发),然后一次性提交 Batch API。
折扣比例:OpenAI Batch API 通常提供 50% 的价格折扣,适合非即时性请求。
延迟容忍:Batch 请求的响应时间通常较长(几分钟到几十分钟),只适用于可以容忍延迟的场景。
class BatchProcessor:
def __init__(self, batch_size: int = 100, batch_timeout: float = 60.0):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self._pending: list[ModelRequest] = []
self._lock = threading.Lock()
async def add(self, request: ModelRequest) -> ModelResponse:
"""添加请求,触发批量条件时自动提交"""
with self._lock:
self._pending.append(request)
if len(self._pending) >= self.batch_size:
return await self._submit_batch()
# 等待超时提交
await asyncio.sleep(self.batch_timeout)
return await self._submit_batch()
async def _submit_batch(self) -> list[ModelResponse]:
"""提交批量请求"""
with self._lock:
if not self._pending:
return []
batch = self._pending[:]
self._pending.clear()
# 调用 Batch API
return await openai.Batch.create(batch)
空闲时段利用:非高峰时段处理非紧急任务
LLM 调用存在波峰波谷,非高峰时段的 API 配额更加充裕、价格更优惠。空闲时段利用策略将非紧急任务安排到低峰时段:
任务分类:将 LLM 任务分为紧急(需要即时响应)和非紧急(可以延迟处理)。报告生成、数据分析、批量翻译等属于非紧急任务。
定时执行:对于非紧急任务,设置延迟执行策略,在低峰时段(如凌晨)批量处理。
价格预测:结合历史数据和当前负载,预测未来时段的 API 价格,自动将任务调度到最优时段。
7.5 预算告警机制
预算告警是成本控制的最后一道防线,当成本接近或超过预设阈值时及时通知相关人员,避免产生超出预期的费用。
告警阈值设计
我们设置了多级告警阈值,确保在不同成本消耗阶段能够及时响应:
BUDGET_ALERT_THRESHOLDS = [
# 级别1:日常提醒
("usage_rate > 50%", "INFO: 已消耗 50% 本期配额"),
# 级别2:警告
("usage_rate > 80%", "WARNING: 已消耗 80% 本期配额,请关注"),
# 级别3:严重警告
("usage_rate > 95%", "CRITICAL: 已消耗 95% 本期配额,即将耗尽"),
# 级别4:已超支
("usage_rate > 100%", "ALERT: 已耗尽本期配额,服务即将受限"),
# 级别5:超支预测
("projected_exceed", "ALERT: 预计本期将超支 ¥{amount},建议调整配额"),
]
告警触发条件
配额消耗率触发:当租户的日配额、月配额或累计配额消耗达到阈值时触发。
超支预测触发:基于历史用量趋势,预测本期(月度)总用量将超过配额时触发。预测算法考虑日均用量、增长率、季节性因素等。
异常用量触发:当日用量环比增长超过 100%、或单小时用量超过日配额的 50% 时触发异常告警。
告警通知渠道
告警信息通过多种渠道发送,确保及时触达:
| 渠道 | 适用场景 | 响应时效 |
|---|---|---|
| 企业微信 | 日常提醒、警告 | 即时 |
| 邮件 | 正式通知、汇总报告 | 5-15 分钟 |
| 短信 | 严重告警、超支预警 | 即时 |
| 电话 | 紧急故障、预算耗尽 | 即时 |
告警通知支持按角色和级别配置:日常提醒发送给租户管理员;严重告警同时发送给租户管理员和平台运营;紧急告警同时发送给平台高管。
告警抑制与去重
为防止告警风暴,同一租户的同类告警在短时间内(如 1 小时)不重复发送。告警抑制规则包括:
- 同一租户、同一告警类型,1 小时内不重复
- 配额从 80% 增长到 90% 期间,不再发送"已达 80%"的告警
- 已发送"即将超支"告警后,配额继续增长不再重复告警
7.6 成本报表
成本报表为运营人员和管理层提供全面的成本可视化和分析支持,帮助做出科学的成本决策。
报表类型
| 报表类型 | 维度 | 更新频率 | 主要用户 |
|---|---|---|---|
| 租户成本明细 | 租户 / 模型 / 时间 | 每日凌晨 | 运营人员、租户管理员 |
| 模型成本排行 | 模型 / 租户 | 每周 | 产品经理、成本分析 |
| 成本趋势图 | 月度 / 模型 | 每月 | 管理层、财务 |
| ROI 分析 | 租户 / 使用场景 | 每月 | 管理层、BD |
| 异常消费报告 | 租户 / 时间 | 每日 | 运营人员 |
租户成本明细报表
租户成本明细报表是日常运营最常用的报表,展示每个租户的成本构成和变化趋势:
核心指标:
- 当日 Token 消耗(输入 / 输出 / 总计)
- 当日费用(元)
- 月度累计消耗和费用
- 配额消耗进度(百分比)
- 环比昨日/上周变化
模型维度分解:
- 各模型的调用次数和费用占比
- 各模型的平均单次成本
- 主模型 vs Fallback vs 辅助模型的使用分布
用户维度分解:
- 租户下各用户的 Token 消耗排名
- 用户人均消耗和趋势
模型成本排行报表
模型成本排行帮助识别高成本来源,为模型选择和路由优化提供依据:
按模型汇总:统计各模型的总调用次数、总 Token 消耗、总费用、平均单次成本。
按租户排名:展示成本最高的前 N 个租户,以及它们在各模型的分布。
成本归因分析:拆解成本增量的来源(如业务量增长、模型单价上调、单次消耗增加)。
成本趋势图
成本趋势图展示成本的长期变化规律,支持月度环比、同比分析:
日趋势图:展示最近 30 天每日成本曲线,标注异常峰值。
月趋势图:展示最近 12 个月月度成本柱状图,标注季节性规律。
预测线:基于历史数据预测未来 3 个月的预期成本。
ROI 分析报表
ROI 分析报表将成本与业务价值关联,评估 LLM 投入的效益:
租户维度 ROI:计算各租户的收入与 LLM 成本比值,识别高价值/低价值租户。
使用场景 ROI:按使用场景(如客服机器人、文档处理、代码生成)统计成本和产出,帮助产品决策。
7.7 退款与争议处理
在 LLM API 调用中,由于网络异常、超时重试、API 提供商错误等原因,可能出现计费争议。完善的退款与争议处理机制是保障租户权益、维护平台信誉的重要环节。
LLM API 计费争议类型
常见的计费争议包括:
重复计费:同一请求因超时重试或客户端重试导致被计费多次。这是最常见的争议类型,通常发生在网络不稳定或客户端超时设置不合理的情况下。
错误计费:API 返回错误(如 429 限流、500 服务器错误),但仍被计费。部分 API 提供商的计费政策是"无论成功失败,只要发送了请求就计费"。
超额计费:实际 Token 数超出预估,导致费用超出预期。部分场景下,输入截断策略的变化也会导致 Token 数与预期不符。
未使用计费:租户认为自己没有使用服务,但收到了账单。可能原因包括凭证泄露被滥用、或系统错误。
争议处理流程
我们设计了标准化的争议处理流程:
租户提交争议申请
↓
客服初审(24小时内)
↓
判断争议类型
↓
|--- 重复计费 --> 核查请求日志,确认为重复后退款
|--- 错误计费 --> 核查 API 响应日志,确认后协调提供商退款
|--- 超额计费 --> 核查 Token 计数,确认后按实际用量调整
|--- 未使用计费 --> 核查凭证使用记录,确认为滥用后更换凭证并退款
↓
财务审批(争议金额 > ¥100 需要财务审批)
↓
执行退款或账单调整
↓
处理结果反馈给租户
争议金额的暂挂和追缴机制
对于争议金额,我们采用暂挂机制:
暂挂:争议金额在争议处理期间不计入租户的实际费用,也不计入平台收入。如果争议最终有利于租户,直接退款;如果有利于平台,则从暂挂金额中扣除。
证据保留:我们会保留完整的请求日志(包含请求时间、请求内容哈希、API 响应、Token 计数、费用计算过程),作为争议处理的依据。日志保留期限不少于 6 个月。
追缴:如果发现租户存在恶意行为(如故意制造超时骗取退款、伪造使用记录),平台有权追缴已退款金额,并可能暂停或终止该租户的服务。
典型争议案例:超时重试导致重复计费
案例描述:某租户反映某日账单金额异常增高,核查发现当日有大量重复请求。
根因分析:
1. 客户端设置了 10 秒超时
2. LLM 服务在高峰期响应时间 > 10 秒
3. 客户端超时后自动重试,导致同一请求被发送 2-3 次
4. API 提供商对每次请求都计费
处理方案:
1. 核查请求日志,确认存在大量重复请求(相同请求哈希)
2. 根据日志去重,仅保留首次成功请求的费用
3. 多计费部分退还租户
4. 建议租户优化超时设置或启用幂等机制
预防措施:
1. 在平台侧实现请求去重(基于请求哈希)
2. 提供客户端 SDK,内置超时和重试的最佳实践
3. 在控制台展示重试次数和因此产生的额外费用
八、监控指标体系
8.1 监控设计原则
可观测性是分布式系统的基石。在超大规模多租户 LLM 并发场景下,系统的复杂性导致问题定位困难、故障影响面广。如果没有完善的监控体系,运维团队将在故障发生时陷入被动,响应速度和准确性都将大打折扣。因此,我们设计了一套完整的监控指标体系,覆盖从基础设施到业务层的所有关键路径,确保系统运行状态可见、问题可溯、故障可控。
可观测性三支柱:Metrics / Tracing / Logging
现代可观测性体系基于三大支柱构建:
Metrics(指标) 是聚合后的定量数据,通过数值形式反映系统状态。指标具有可加和性(如请求总数可累加)、可测量性(如延迟可精确记录)、可报警性(如错误率超阈值可触发告警)。指标最适合用于监控告警和容量规划。在 LLM 并发场景中,核心指标包括 QPS、延迟分布、错误率、Token 消耗量等。
Tracing(链路追踪) 是请求在分布式系统中的完整路径记录。链路追踪通过唯一的 Trace ID 将一次请求在各个服务组件中的处理串联起来,展现请求的全流程耗时和状态。链路追踪最适合用于慢请求分析和故障根因定位。在 LLM 并发场景中,一次请求的完整链路可能涉及:API 网关 → 限流中间件 → 路由层 → 模型调用 → 响应返回。
Logging(日志) 是系统运行事件的原始记录。日志提供了最丰富的信息维度,包括事件详情、上下文数据、调试信息等。但日志的缺点是存储量大、查询效率低,因此通常只保留异常或关键事件的日志。日志最适合用于问题排查和审计追踪。
三者相互补充:指标用于监控告警,链路追踪用于性能分析,日志用于深度排查。只有同时具备三个维度的可观测性,才能实现真正的"黑盒监控"到"白盒监控"的转变。
告警优先:影响用户体验的问题优先告警
告警是监控体系的核心输出,其设计质量直接影响故障响应效率。我们遵循以下告警设计原则:
影响用户优先:与用户体验直接相关的指标(如请求成功率、延迟)拥有最高告警优先级。当错误率超过 1% 或 P99 延迟超过 5 秒时,无论其他指标如何,都应立即告警。
层层递进:告警按严重程度分为 Warning(警告)和 Critical(严重)两级。Warning 通知相关人员关注,Critical 则触发更广泛的告警渠道(如电话、短信)和应急响应流程。
去重聚合:避免同一问题产生大量重复告警。通过告警聚合和去重机制,将短时间内的重复告警合并为一条,减少告警疲劳。
告警收敛:相关的多个告警应能智能收敛为一个综合告警。例如,当模型不可用时,可能同时触发"模型错误率升高"、"Fallback 触发率升高"、"队列堆积"等多个告警,根因其实是模型服务异常,运维人员不需要分别处理每个告警。
分层监控:基础设施 → 模型层 → 业务层
监控指标按层次组织,每个层次关注不同的核心指标:
基础设施层 监控底层资源状态,包括 CPU、内存、磁盘、网络等。这些指标反映系统容量和健康度,是预防性监控的关键。当基础设施指标出现异常(如磁盘使用率超过 80%)时,应在故障发生前处理。
模型层 监控 LLM 调用的核心指标,包括 Token 消耗、延迟分布、错误类型分布、模型健康状态等。模型层直接反映 LLM 服务的质量,是用户感知的核心。
业务层 监控租户维度的指标,包括租户请求量、配额使用率、租户成功率等。业务层指标帮助识别异常租户(如配额滥用)并支持成本分析。
8.2 核心 Metrics 指标
核心 Metrics 是整个监控体系的骨架,定义了系统中最关键的量化指标及其采集方式。我们基于 Prometheus 规范定义指标,并使用 OpenTelemetry 进行埋点和上报。
Prometheus Metrics 定义
以下是系统中核心指标的 Prometheus 定义:
# ==================== 请求类指标 ====================
LLM_REQUESTS_TOTAL = Counter(
"llm_requests_total",
"LLM 请求总数,按租户、模型、状态分组",
labelnames=["tenant_id", "model", "status"]
)
# status 取值:success / error / timeout / fallback / rejected
LLM_REQUEST_DURATION = Histogram(
"llm_request_duration_seconds",
"LLM 请求从发送到接收完整响应的耗时分布",
labelnames=["tenant_id", "model", "credential_id"],
buckets=[0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0] # 单位:秒
)
LLM_REQUEST_INPUT_TOKENS = Histogram(
"llm_request_input_tokens",
"LLM 请求输入 Token 数分布",
labelnames=["tenant_id", "model"],
buckets=[100, 500, 1000, 2000, 4000, 8000, 16000, 32000]
)
LLM_REQUEST_OUTPUT_TOKENS = Histogram(
"llm_request_output_tokens",
"LLM 请求输出 Token 数分布",
labelnames=["tenant_id", "model"],
buckets=[100, 500, 1000, 2000, 4000, 8000, 16000, 32000]
)
# ==================== 并发类指标 ====================
LLM_CONCURRENT_REQUESTS = Gauge(
"llm_concurrent_requests",
"当前正在处理的 LLM 请求数,按 Credential 和模型分组",
labelnames=["credential_id", "model"]
)
LLM_QUEUE_LENGTH = Gauge(
"llm_queue_length",
"当前等待调度的请求队列长度",
labelnames=["model", "priority_class"]
)
LLM_QUEUE_WAIT_TIME = Histogram(
"llm_queue_wait_time_seconds",
"请求在队列中等待的时间分布",
labelnames=["model", "priority_class"],
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)
# ==================== Credential 指标 ====================
CREDENTIAL_HEALTH_SCORE = Gauge(
"credential_health_score",
"Credential 健康评分 (0.0-1.0),按 Credential 和模型分组",
labelnames=["credential_id", "model"]
)
CREDENTIAL_LOAD_RATIO = Gauge(
"credential_load_ratio",
"Credential 负载率 (0.0-1.0),即当前并发 / 最大并发",
labelnames=["credential_id", "model"]
)
CREDENTIAL_CIRCUIT_BREAK_COUNT = Counter(
"credential_circuit_breaks_total",
"Credential 熔断次数累计",
labelnames=["credential_id", "model", "trigger_reason"]
)
CREDENTIAL_REQUESTS_TOTAL = Counter(
"credential_requests_total",
"Credential 请求数累计,按 Credential、模型、状态分组",
labelnames=["credential_id", "model", "status"]
)
# ==================== 租户配额指标 ====================
TENANT_QUOTA_USAGE_RATIO = Gauge(
"tenant_quota_usage_ratio",
"租户配额使用率,按租户、配额类型分组",
labelnames=["tenant_id", "quota_type"]
)
# quota_type 取值:daily_requests / monthly_requests / daily_tokens / concurrent
TENANT_CURRENT_CONCURRENT = Gauge(
"tenant_current_concurrent",
"租户当前并发请求数",
labelnames=["tenant_id"]
)
TENANT_REQUESTS_TOTAL = Counter(
"tenant_requests_total",
"租户请求数累计,按租户、状态分组",
labelnames=["tenant_id", "status"]
)
# ==================== 系统容量指标 ====================
SYSTEM_TOTAL_CONCURRENT = Gauge(
"system_total_concurrent",
"系统当前总并发请求数"
)
SYSTEM_MAX_CAPACITY = Gauge(
"system_max_capacity",
"系统设计的最大并发容量"
)
SYSTEM_TOKEN_RATE = Gauge(
"system_token_rate",
"系统当前 Token 处理速率(tokens/秒)",
labelnames=["model"]
)
# ==================== Fallback 指标 ====================
FALLBACK_TRIGGER_COUNT = Counter(
"fallback_trigger_total",
"Fallback 触发次数累计,按原始模型和 Fallback 模型分组",
labelnames=["original_model", "fallback_model", "trigger_reason"]
)
FALLBACK_SUCCESS_RATE = Gauge(
"fallback_success_rate",
"Fallback 成功率 (0.0-1.0)",
labelnames=["original_model", "fallback_model"]
)
指标采集与上报
指标通过客户端库自动采集,并通过 OpenTelemetry 协议上报到 Prometheus Pushgateway 或直接拉取。采集的关键要点:
Label 维度的选择:Label 用于对指标进行多维度切分,应选择具有高区分度且基数可控的字段。tenant_id 是高基数 Label(可能有数万个取值),需注意 Prometheus 的 Label 基数问题;model 是低基数 Label(通常只有几个取值),适合作为 Label。
Cardinality 控制:避免使用高基数 Label(如 user_id、session_id、具体请求内容等)作为指标 Label,这会导致 Prometheus 序列化和存储开销急剧增加。
Histogram 的 buckets 设置:Histogram 用于记录延迟、Token 数等连续值的分布,buckets 的设置应覆盖业务场景的实际范围。对于 LLM 请求,0.1s-30s 的延迟范围已覆盖大部分场景。
8.3 分层监控指标
分层监控指标按系统层次组织,每个层次包含一组相关的核心指标和对应的告警阈值。
接入层监控指标
接入层是请求进入系统的第一站,负责限流、鉴权、路由等处理。接入层指标反映系统的入口流量状况:
| 指标名称 | 说明 | 类型 | 告警阈值 | 采集方式 |
|---|---|---|---|---|
requests_total |
接入层收到的总请求数 | Counter | - | 每请求递增 |
requests_by_tenant |
各租户的请求数分布 | Counter(labels: tenant_id) | - | 按租户聚合 |
request_duration_p99 |
P99 请求处理耗时 | Histogram | > 5s | 按 Percentile 计算 |
request_duration_p95 |
P95 请求处理耗时 | Histogram | > 2s | 按 Percentile 计算 |
error_rate |
接入层错误率(4xx/5xx) | Gauge | > 1% | 错误数/总请求数 |
rate_limit_rejected |
因限流被拒绝的请求数 | Counter | > 100/分钟 | 拒绝时递增 |
auth_failures |
认证失败次数 | Counter | > 10/分钟 | 认证失败时递增 |
接入层告警的核心关注点:P99 延迟反映最坏情况下的用户体验;错误率反映系统稳定性;限流拒绝数反映是否存在异常流量。
模型层监控指标
模型层是 LLM 调用的核心处理层,指标反映模型服务的质量和效率:
| 指标名称 | 说明 | 类型 | 告警阈值 | 采集方式 |
|---|---|---|---|---|
llm_requests_total |
LLM 调用总请求数 | Counter | - | 每调用递增 |
llm_tokens_total |
Token 消耗总量 | Counter | 接近日限额 | 按 Token 数递增 |
llm_input_tokens |
输入 Token 总量 | Counter | - | 按输入 Token 递增 |
llm_output_tokens |
输出 Token 总量 | Counter | - | 按输出 Token 递增 |
llm_latency_p50 |
P50 响应延迟 | Histogram | > 1s | 按 Percentile 计算 |
llm_latency_p95 |
P95 响应延迟 | Histogram | > 3s | 按 Percentile 计算 |
llm_latency_p99 |
P99 响应延迟 | Histogram | > 5s | 按 Percentile 计算 |
llm_errors_by_type |
错误分类统计 | Counter(labels: error_type) | - | 按错误类型递增 |
llm_timeout_rate |
超时率 | Gauge | > 5% | 超时次数/总次数 |
llm_model_errors |
各模型错误数 | Counter(labels: model, error_type) | - | 按模型+错误类型 |
fallback_trigger_rate |
Fallback 触发率 | Gauge | > 10% | Fallback次数/总次数 |
fallback_success_rate |
Fallback 成功率 | Gauge | < 80% | Fallback成功数/Fallback总次数 |
模型层告警的核心关注点:延迟分布反映模型服务质量;Token 消耗反映成本状态;Fallback 触发率反映主模型可用性;错误类型分布帮助快速定位问题。
Credential 层监控指标
Credential 层是模型调用的资源池,指标反映各 Credential 的负载和健康状态:
| 指标名称 | 说明 | 类型 | 告警阈值 | 采集方式 |
|---|---|---|---|---|
credential_load_ratio |
Credential 负载率 | Gauge | > 80% | current_load/max_load |
credential_health_score |
Credential 健康评分 | Gauge | < 0.5 | 由健康检查计算 |
credential_circuit_breaks |
熔断次数 | Counter | > 3次/分钟 | 每次熔断递增 |
credential_current_load |
Credential 当前并发 | Gauge | - | 实时采集 |
credential_requests_success |
Credential 成功请求数 | Counter | - | 成功时递增 |
credential_requests_failed |
Credential 失败请求数 | Counter | - | 失败时递增 |
credential_avg_latency |
Credential 平均延迟 | Gauge | > 阈值 | 滑动窗口计算 |
credential_error_rate |
Credential 错误率 | Gauge | > 5% | 失败数/总请求数 |
Credential 层告警的核心关注点:负载率反映资源是否充足;健康评分反映 Credential 可用性;熔断次数反映故障频率;平均延迟反映性能表现。
8.4 Tracing 链路追踪
链路追踪提供了请求在分布式系统中端到端的完整视图,是慢请求分析和故障根因定位的关键工具。在超大规模多租户场景下,一次 LLM 请求可能涉及十几个服务组件,没有链路追踪,排查问题将如同大海捞针。
Trace ID 生成与传递
Trace ID 是链路追踪的核心,它将一次请求在各个服务中的处理串联起来。Trace ID 的生成和传递规则:
# Trace ID 生成
def generate_trace_id() -> str:
"""生成 32 位十六进制 Trace ID"""
import uuid
return uuid.uuid4().hex
# Trace ID 传递(以 HTTP 为例)
def inject_trace_context(headers: dict, trace_id: str, span_id: str):
"""将 Trace 上下文注入 HTTP Header"""
headers["X-Trace-ID"] = trace_id
headers["X-Span-ID"] = span_id
headers["X-Trace-Flags"] = "01" # 采样标志
def extract_trace_context(headers: dict) -> tuple:
"""从 HTTP Header 提取 Trace 上下文"""
trace_id = headers.get("X-Trace-ID", generate_trace_id())
span_id = headers.get("X-Span-ID", "0000000000000000")
return trace_id, span_id
Trace ID 应在请求入口处(API 网关)生成,并随请求在各个服务间传递。对于异步消息队列场景,Trace ID 应嵌入消息 Header 中。Trace ID 的传递必须保证全程透传,不能出现断链。
Span 划分:端到端的完整链路
Span 是链路追踪的基本单元,代表一次操作或一个代码段的执行。Span 记录了操作的开始时间、结束时间、标签(Tags)和日志(Logs)。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
# 初始化 Tracer
provider = TracerProvider()
processor = SimpleSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
def handle_llm_request(request: LLMRequest, trace_id: str):
"""处理 LLM 请求的完整链路"""
# 1. 入口 Span:API 网关接收请求
with tracer.start_as_current_span(
"api_gateway_receive",
attributes={
"http.method": "POST",
"http.url": request.url,
"tenant.id": request.tenant_id,
"trace.id": trace_id,
}
) as span:
# 认证鉴权
with tracer.start_as_current_span("auth_validate") as auth_span:
auth_result = validate_auth(request)
auth_span.set_attribute("auth.success", auth_result)
# 限流检查
with tracer.start_as_current_span("rate_limit_check") as limit_span:
quota_result = check_quota(request)
limit_span.set_attribute("quota.allowed", quota_result.allowed)
# 路由选择
with tracer.start_as_current_span("model_route") as route_span:
selected_model = route_request(request)
route_span.set_attribute("model.selected", selected_model)
# 模型调用
with tracer.start_as_current_span(
"llm_call",
attributes={
"llm.model": selected_model,
"llm.tenant": request.tenant_id,
}
) as llm_span:
start_time = time.time()
try:
response = call_llm_model(request, selected_model)
llm_span.set_attribute("llm.success", True)
llm_span.set_attribute("llm.latency_ms", (time.time() - start_time) * 1000)
except Exception as e:
llm_span.set_attribute("llm.success", False)
llm_span.record_exception(e)
raise
完整的 Span 链路应覆盖:API 网关 → 认证鉴权 → 限流检查 → 租户配额检查 → 请求排队 → 模型路由 → Credential 获取 → 模型调用 → 响应处理 → 日志记录。
关键路径追踪:慢请求端到端耗时分析
慢请求是影响用户体验的主要因素。通过链路追踪,我们可以定位慢请求的瓶颈所在:
# 慢请求判定逻辑
SLOW_REQUEST_THRESHOLD_MS = 5000 # 5 秒
def analyze_slow_request(span_data: dict):
"""分析慢请求的耗时分布"""
spans = span_data["spans"]
# 计算各阶段耗时
stage_durations = {}
for span in spans:
stage_name = span["name"]
duration_ms = (span["end_time"] - span["start_time"]) / 1000
stage_durations[stage_name] = duration_ms
# 找出耗时最长的阶段
slowest_stage = max(stage_durations, key=stage_durations.get)
slowest_duration = stage_durations[slowest_stage]
# 判断瓶颈类型
if "llm_call" == slowest_stage:
bottleneck_type = "MODEL_LATENCY"
elif "queue_wait" in slowest_stage:
bottleneck_type = "QUEUE_CONGESTION"
elif "rate_limit" in slowest_stage:
bottleneck_type = "RATE_LIMIT_THROTTLING"
else:
bottleneck_type = "UNKNOWN"
return {
"bottleneck_type": bottleneck_type,
"slowest_stage": slowest_stage,
"slowest_duration_ms": slowest_duration,
"total_duration_ms": sum(stage_durations.values()),
"stage_breakdown": stage_durations,
}
通过聚合分析所有慢请求的瓶颈类型分布,运维团队可以识别系统的主要性能瓶颈,是模型调用慢、还是队列堆积、还是限流过于严格。
错误追踪:异常请求的完整调用栈
当请求出错时,链路追踪记录的错误信息对于快速定位根因至关重要:
def record_error(span, error: Exception, context: dict):
"""记录错误到当前 Span"""
# 记录异常堆栈
span.record_exception(error)
# 设置错误标签
span.set_attribute("error", True)
span.set_attribute("error.type", type(error).__name__)
span.set_attribute("error.message", str(error))
# 记录上下文信息
for key, value in context.items():
span.set_attribute(f"error.context.{key}", value)
# 添加错误日志
span.add_event(
"exception",
attributes={
"exception.type": type(error).__name__,
"exception.message": str(error),
"exception.stacktrace": traceback.format_exc(),
}
)
# 在 except 块中使用
try:
response = call_llm_model(request)
except LLMAPIError as e:
record_error(span, e, {
"tenant_id": request.tenant_id,
"model": request.model,
"api_error_code": e.error_code,
})
raise
错误链路追踪的关键点:异常类型和消息应作为 Span 属性记录;异常堆栈应作为 Event 日志记录;与异常相关的业务上下文(如租户 ID、模型名称、API 错误码)应一并记录。
8.5 监控仪表盘
监控仪表盘是监控数据的可视化呈现,为运维团队提供系统运行状态的全局视图。我们设计了三个核心仪表盘,分别关注不同层面的监控需求。
LLM 并发总览仪表盘
该仪表盘展示 LLM 服务的整体运行状态,是运维人员的首要查看入口:
dashboard:
name: "LLM 并发总览"
layout:
- row:
- panel: "系统总览"
cols: 12
widgets:
- type: "stat"
title: "当前总并发"
expr: "system_total_concurrent"
unit: "req"
- type: "stat"
title: "系统容量余量"
expr: "(system_max_capacity - system_total_concurrent) / system_max_capacity * 100"
unit: "%"
- type: "stat"
title: "今日请求总量"
expr: "sum(increase(llm_requests_total[24h]))"
unit: "req"
- type: "stat"
title: "今日 Token 消耗"
expr: "sum(increase(llm_tokens_total[24h]))"
unit: "tokens"
- row:
- panel: "实时并发监控"
cols: 6
widgets:
- type: "timeseries"
title: "各模型并发请求数(实时)"
expr: "llm_concurrent_requests"
legend:
- "{model}"
- "{credential_id}"
draw_style: "line"
- type: "timeseries"
title: "Token 消耗速率(分钟级)"
expr: "rate(llm_tokens_total[1m])"
legend:
- "{model}"
draw_style: "area"
- panel: "延迟分布"
cols: 6
widgets:
- type: "timeseries"
title: "请求延迟 P50/P95/P99"
expr: |
histogram_quantile(0.50, rate(llm_request_duration_bucket[5m]))
histogram_quantile(0.95, rate(llm_request_duration_bucket[5m]))
histogram_quantile(0.99, rate(llm_request_duration_bucket[5m]))
legend:
- "P50"
- "P95"
- "P99"
draw_style: "line"
- type: "heatmap"
title: "Credential 负载率热力图"
expr: "credential_load_ratio"
y_axis: "{credential_id}"
color_scale: "inferno"
- row:
- panel: "错误与 Fallback"
cols: 12
widgets:
- type: "timeseries"
title: "错误率趋势"
expr: |
sum(rate(llm_requests_total{status="error"}[5m])) by (model)
/ sum(rate(llm_requests_total[5m])) by (model)
draw_style: "line"
- type: "timeseries"
title: "Fallback 触发率"
expr: "fallback_trigger_rate"
draw_style: "line"
alert_threshold: 0.1
租户健康仪表盘
该仪表盘展示各租户的运行状态,帮助识别异常租户:
dashboard:
name: "租户健康"
layout:
- row:
- panel: "Top 活跃租户"
cols: 6
widgets:
- type: "table"
title: "Top 10 活跃租户(按请求量)"
expr: |
topk(10, sum by (tenant_id) (rate(llm_requests_total[5m])))
columns:
- tenant_id
- requests_per_second
- avg_latency_ms
- type: "timeseries"
title: "各租户请求量分布"
expr: "sum by (tenant_id) (rate(llm_requests_total[5m]))"
draw_style: "area"
- panel: "配额使用"
cols: 6
widgets:
- type: "gauge"
title: "配额使用率分布"
expr: "tenant_quota_usage_ratio"
thresholds:
- color: "green"
value: 0
- color: "yellow"
value: 0.7
- color: "red"
value: 0.9
- type: "timeseries"
title: "Top 5 配额消耗租户"
expr: |
topk(5, max by (tenant_id) (tenant_quota_usage_ratio{quota_type="daily_requests"}))
draw_style: "line"
- row:
- panel: "租户成功率"
cols: 6
widgets:
- type: "timeseries"
title: "租户请求成功率"
expr: |
sum by (tenant_id) (rate(llm_requests_total{status="success"}[5m]))
/ sum by (tenant_id) (rate(llm_requests_total[5m]))
draw_style: "line"
legend:
- "{tenant_id}"
- type: "stat"
title: "异常租户数(成功率 < 95%)"
expr: |
count((
sum by (tenant_id) (rate(llm_requests_total{status="success"}[5m]))
/ sum by (tenant_id) (rate(llm_requests_total[5m]))
) < 0.95)
alert_threshold: 0
- panel: "租户告警"
cols: 6
widgets:
- type: "table"
title: "配额告警列表"
expr: |
tenant_quota_usage_ratio > 0.9
columns:
- tenant_id
- quota_type
- usage_ratio
- severity
- type: "log"
title: "租户异常日志"
expr: |
{app="llm-gateway"} |= "error" |= pattern践踏 tenant_id=%s
系统容量仪表盘
该仪表盘展示系统容量规划相关指标,支持容量预测和规划:
dashboard:
name: "系统容量"
layout:
- row:
- panel: "容量概览"
cols: 12
widgets:
- type: "gauge"
title: "全局并发数 vs 上限"
expr: "system_total_concurrent"
max: "system_max_capacity"
- type: "stat"
title: "各模型剩余容量"
expr: |
sum by (model) (llm_concurrent_requests{model=~".*"})
/ on(model)
group_left(max)
sum by (model) (llm_max_capacity{model=~".*"})
unit: "percent"
- type: "timeseries"
title: "等待队列长度"
expr: "llm_queue_length"
draw_style: "line"
legend:
- "{model}"
- "{priority_class}"
- type: "timeseries"
title: "系统吞吐率趋势"
expr: "sum(rate(llm_requests_total[5m]))"
draw_style: "line"
- row:
- panel: "容量预测"
cols: 12
widgets:
- type: "timeseries"
title: "Token 消耗趋势(7天)及预测"
expr: |
predict_linear(sum by (model) (rate(llm_tokens_total[1h]))[7d:1h], 24*7)
draw_style: "line"
legend:
- "{model} 预测"
- type: "stat"
title: "预计日限额耗尽时间"
expr: |
(llm_daily_token_limit - sum by (model) (increase(llm_tokens_total[24h])))
/ sum by (model) (rate(llm_tokens_total[1h]))
unit: "hours"
8.6 告警规则
告警规则定义了触发告警的条件和通知方式,是监控体系的核心输出。我们按严重程度分级告警,确保关键问题能被及时响应。
告警规则定义
alerts:
# ==================== Critical 级别告警 ====================
- name: "LLM 模型完全不可用"
identifier: "llm_model_unavailable"
condition: |
(
sum(rate(llm_requests_total{status="error"}[5m])) by (model)
/ sum(rate(llm_requests_total[5m])) by (model)
) > 0.5
AND
(
sum(rate(fallback_trigger_total[5m])) by (original_model)
/ sum(rate(llm_requests_total[5m])) by (original_model)
) > 0.5
severity: "critical"
description: "模型错误率超过 50% 且 Fallback 触发率超过 50%,主模型可能已完全不可用"
channels:
- "企业微信紧急告警"
- "短信"
- "电话"
auto_resolve_after: "30m"
- name: "系统容量耗尽"
identifier: "system_capacity_exhausted"
condition: |
system_total_concurrent > system_max_capacity * 0.95
severity: "critical"
description: "系统并发数接近或达到容量上限,新请求将无法处理"
channels:
- "企业微信紧急告警"
- "短信"
- "电话"
auto_resolve_after: "5m"
# ==================== Warning 级别告警 ====================
- name: "Credential 负载过高"
identifier: "credential_high_load"
condition: |
credential_load_ratio > 0.9
severity: "warning"
description: "Credential 负载率超过 90%,可能即将达到瓶颈"
channels:
- "企业微信"
for: "5m" # 持续 5 分钟才触发
auto_resolve_after: "10m"
- name: "Credential 健康分过低"
identifier: "credential_low_health"
condition: |
credential_health_score < 0.5
severity: "warning"
description: "Credential 健康评分低于 0.5,可能存在性能问题"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "10m"
- name: "Credential 熔断频繁"
identifier: "credential_circuit_breaking"
condition: |
increase(credential_circuit_breaks_total[1m]) > 3
severity: "warning"
description: "Credential 在 1 分钟内熔断超过 3 次,存在服务不稳定风险"
channels:
- "企业微信"
auto_resolve_after: "5m"
- name: "租户配额耗尽"
identifier: "tenant_quota_exhausted"
condition: |
tenant_quota_usage_ratio > 0.95
severity: "warning"
description: "租户配额使用率超过 95%,即将耗尽"
channels:
- "企业微信"
for: "1m"
auto_resolve_after: "1h"
- name: "租户配额告警(80%)"
identifier: "tenant_quota_warning"
condition: |
tenant_quota_usage_ratio > 0.8 AND tenant_quota_usage_ratio <= 0.95
severity: "warning"
description: "租户配额使用率超过 80%"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "1h"
- name: "系统接近容量上限"
identifier: "system_capacity_warning"
condition: |
system_total_concurrent > system_max_capacity * 0.8
severity: "warning"
description: "系统并发数超过容量上限的 80%,应关注扩容"
channels:
- "企业微信"
for: "10m"
auto_resolve_after: "30m"
- name: "请求延迟过高"
identifier: "high_request_latency"
condition: |
histogram_quantile(0.99, sum(rate(llm_request_duration_bucket[5m])) by (le)) > 5
severity: "warning"
description: "P99 请求延迟超过 5 秒"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "10m"
- name: "错误率升高"
identifier: "error_rate_elevated"
condition: |
(
sum(rate(llm_requests_total{status="error"}[5m]))
/ sum(rate(llm_requests_total[5m]))
) > 0.01
severity: "warning"
description: "系统整体错误率超过 1%"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "10m"
- name: "Fallback 触发率过高"
identifier: "fallback_rate_high"
condition: |
sum(rate(fallback_trigger_total[5m]))
/ sum(rate(llm_requests_total[5m])) > 0.1
severity: "warning"
description: "Fallback 触发率超过 10%,主模型可能存在性能问题"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "10m"
- name: "Token 消耗接近日限额"
identifier: "token_limit_warning"
condition: |
(
sum by (model) (increase(llm_tokens_total[24h]))
/ on(model) group_left
sum by (model) (llm_daily_token_limit)
) > 0.8
severity: "warning"
description: "各模型日 Token 消耗超过限额的 80%"
channels:
- "企业微信"
for: "5m"
auto_resolve_after: "1h"
# ==================== 告警渠道配置 ====================
notification_channels:
企业微信:
type: "webhook"
url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send"
mention_rules:
critical:
- "@all"
warning:
- "tenant_admin"
短信:
type: "sms"
providers:
- "aliyun"
phone_numbers:
- "+86-138xxxx"
电话:
type: "voice_call"
phone_numbers:
- "+86-139xxxx"
retry_times: 3
retry_interval_seconds: 60
告警抑制与去重
为避免告警风暴,我们实现告警抑制和去重机制:
class Alert deduplication:
"""告警去重:相同告警在冷却期内不重复发送"""
def __init__(self, cooldown_seconds: int = 300):
self.cooldown = cooldown_seconds
self.last_alert_time: dict[str, datetime] = {}
self._lock = threading.Lock()
def should_send(self, alert_key: str) -> bool:
with self._lock:
now = datetime.now()
if alert_key in self.last_alert_time:
elapsed = (now - self.last_alert_time[alert_key]).total_seconds()
if elapsed < self.cooldown:
return False
self.last_alert_time[alert_key] = now
return True
class AlertSuppression:
"""告警抑制:高优先级告警抑制低优先级告警"""
# 告警优先级映射
ALERT_PRIORITY = {
"critical": 0,
"warning": 1,
}
def should_suppress(self, new_alert: dict, existing_alerts: list[dict]) -> bool:
new_priority = self.ALERT_PRIORITY.get(new_alert["severity"], 99)
for existing in existing_alerts:
existing_priority = self.ALERT_PRIORITY.get(existing["severity"], 99)
# 如果已有更高级别的告警,抑制当前告警
if existing_priority < new_priority:
# 检查是否相关(同租户、同模型等)
if self._is_related(new_alert, existing):
return True
return False
def _is_related(self, a: dict, b: dict) -> bool:
"""判断两个告警是否相关(应相互抑制)"""
# 相同租户
if a.get("tenant_id") and a["tenant_id"] == b.get("tenant_id"):
return True
# 相同模型
if a.get("model") and a["model"] == b.get("model"):
return True
return False
8.7 巡检与报告
巡检与报告是监控体系的延伸,通过定期检查和汇总分析,确保系统持续健康运行,并为容量规划和成本优化提供数据支持。
每日自动巡检
每日定时任务自动执行系统健康检查,生成巡检报告:
# 每日巡检任务
DAILY_CHECK_TASKS = [
{
"name": "model_health_check",
"description": "检查各模型健康状态",
"check_logic": """
检查过去 24 小时内:
1. 各模型的错误率是否超过 5%
2. 各模型的平均延迟是否超过阈值
3. 是否有模型完全不可用
""",
"alert_on_fail": True,
},
{
"name": "capacity_check",
"description": "检查系统容量余量",
"check_logic": """
检查当前:
1. 全局并发数是否超过上限的 80%
2. 各模型的剩余容量是否充足
3. 队列长度是否正常
""",
"alert_on_fail": True,
},
{
"name": "quota_exhaustion_check",
"description": "检查租户配额耗尽情况",
"check_logic": """
检查过去 24 小时:
1. 有多少租户的配额耗尽
2. 有多少租户的配额使用率超过 90%
3. 是否有租户异常消耗(如单日消耗超过月配额 50%)
""",
"alert_on_fail": False, # 汇总到报告中
},
{
"name": "credential_health_check",
"description": "检查 Credential 健康状态",
"check_logic": """
检查当前:
1. 是否有 Credential 健康分低于 0.3
2. 是否有 Credential 熔断次数异常
3. 是否有 Credential 长时间未使用
""",
"alert_on_fail": True,
},
{
"name": "cost_check",
"description": "检查当日成本消耗",
"check_logic": """
检查当日:
1. 各模型 Token 消耗量和费用
2. 与昨日/上周同期对比
3. 预测月末成本
""",
"alert_on_fail": False,
},
]
async def run_daily_inspection():
"""执行每日巡检"""
report = DailyInspectionReport()
report.date = datetime.now().date()
for task in DAILY_CHECK_TASKS:
logger.info(f"Running daily check: {task['name']}")
try:
result = await execute_check(task)
report.add_result(task["name"], result)
if task["alert_on_fail"] and not result["passed"]:
await send_inspection_alert(task, result)
except Exception as e:
logger.error(f"Check {task['name']} failed: {e}")
report.add_result(task["name"], {"passed": False, "error": str(e)})
# 生成并发送巡检报告
await send_inspection_report(report)
每周报告
每周报告汇总一周内的系统运行数据,分析趋势和异常:
weekly_report:
sections:
- name: "系统概览"
metrics:
- "本周总请求量"
- "本周总 Token 消耗"
- "本周总费用"
- "本周平均错误率"
- "本周平均 P99 延迟"
- "与上周环比变化"
- name: "租户排行"
content:
- "Top 10 活跃租户(按请求量)"
- "Top 10 高消费租户(按费用)"
- "配额使用率 Top 10"
- "异常租户列表(成功率 < 95%)"
- name: "模型分析"
content:
- "各模型调用量占比"
- "各模型平均延迟对比"
- "各模型错误率对比"
- "Fallback 触发分析"
- name: "容量分析"
content:
- "各模型容量使用趋势"
- "峰值并发发生时间"
- "容量预警事件汇总"
- "下周容量预测"
- name: "问题汇总"
content:
- "本周告警事件汇总"
- "重大故障复盘"
- "性能瓶颈分析"
- name: "优化建议"
content:
- "基于本周数据的优化建议"
- "成本优化建议"
- "容量扩容建议"
每月报告
每月报告进行更深入的趋势分析和规划:
monthly_report:
sections:
- name: "月度趋势分析"
content:
- "月请求量趋势(3 个月对比)"
- "月 Token 消耗趋势(3 个月对比)"
- "月费用趋势(3 个月对比)"
- "租户增长趋势"
- name: "成本分析"
content:
- "月费用构成(按模型、按租户)"
- "成本异常分析(识别超常消耗)"
- "成本优化效果评估"
- "下月成本预算预测"
- name: "SLA 达成情况"
content:
- "月可用性统计"
- "月延迟统计"
- "SLA 违约事件分析"
- name: "容量规划"
content:
- "当前容量评估"
- "下月容量需求预测"
- "扩容计划建议"
- "成本效益分析"
- name: "租户健康报告"
content:
- "租户活跃度分析"
- "高价值租户服务保障回顾"
- "问题租户分析(滥用、欠费等)"
- name: "技术总结"
content:
- "本月技术变更回顾"
- "重大故障复盘"
- "架构优化记录"
- "下月技术规划"
九、方案选型对比
9.1 核心架构选型:单体网关 vs. 微服务分层
在设计多租户 LLM 并发网关时,有两条主要技术路线可供选择。以下从多个维度进行对比分析。
方案 A:单体网关架构(LLMGateway Monolith)
架构描述:所有功能集中在一个进程中运行,通过内部模块划分实现 Credential Pool、多模型路由、租户隔离、限流熔断等能力。部署形态为单一服务实例或单节点集群,水平扩展通过多进程 + 负载均衡实现。
适用场景:租户规模 < 500,LLM 并发 < 200,日均调用量 < 500 万次。
| 维度 | 评估 |
|---|---|
| 开发效率 | ⭐⭐⭐⭐⭐ 高。代码内调用,无网络开销,开发调试简单 |
| 部署复杂度 | ⭐⭐⭐⭐ 低。单一进程/镜像,一条命令启动 |
| 水平扩展 | ⭐⭐ 中。需外部负载均衡器,无状态设计尚可,有状态模块(Pool)扩展困难 |
| 容错能力 | ⭐⭐ 中。进程崩溃全局影响,无内部熔断隔离 |
| 多租户隔离 | ⭐⭐ 中。进程内隔离,依赖代码层隔离,风险较高 |
| 运维成本 | ⭐⭐⭐⭐ 低。单点监控,故障定位简单 |
| 成本 | ⭐⭐⭐ 中等。扩展到一定规模后需较多机器 |
优势:
- 开发周期短,2-4 周可上线 MVP
- 调试方便,断点可覆盖全部链路
- 团队学习曲线平缓
劣势:
- 租户规模增长后,Pool 状态管理和调度竞争成为瓶颈
- 单点故障影响范围大
- 限流熔断在进程内相互影响,一个租户的故障容易级联到其他租户
方案 B:微服务分层架构(Multi-Tier Gateway)
架构描述:将网关拆分为接入层(API Gateway)、路由层(Router/Broker)、模型层(Model Adapter)三层,各层职责单一,通过消息队列或 RPC 通信。Credential Pool 作为独立服务运行,支持多实例分布式部署。
适用场景:租户规模 500-10,000+,LLM 并发 200-1000+,日均调用量 > 500 万次。
| 维度 | 评估 |
|---|---|
| 开发效率 | ⭐⭐⭐ 中。需定义服务间协议,多服务联调成本较高 |
| 部署复杂度 | ⭐⭐ 中。需编排工具(Docker Compose/K8s),多组件协调 |
| 水平扩展 | ⭐⭐⭐⭐ 高。各层独立扩展,Pool 服务可水平扩展 |
| 容错能力 | ⭐⭐⭐⭐ 高。服务间熔断隔离,单层故障不导致全局崩溃 |
| 多租户隔离 | ⭐⭐⭐⭐ 高。每层可独立配置租户配额,资源硬隔离 |
| 运维成本 | ⭐⭐ 中。多组件多指标,链路追踪复杂 |
| 成本 | ⭐⭐⭐ 中等。资源利用率高,扩展性好,长期成本更低 |
优势:
- 各层职责清晰,扩展性强,适合大规模场景
- 租户隔离彻底,故障不级联
- 可针对瓶颈层独立优化(扩展 Router 而非整个网关)
劣势:
- 建设周期长,4-8 周才能完成核心功能
- 引入消息队列或 RPC 后,延迟增加 5-15ms
- 运维复杂度高,需完善的链路追踪和监控体系
爱优的选型建议
结合爱优现状(当前 582 商户/700 门店,目标是 10,000+ 租户),推荐采用方案 B 微服务分层架构。理由如下:
- 规模预判:当前 582 商户已是方案 A 的上限边缘,未来扩展到 10,000+ 租户势必要重构,不如一次性设计到位
- 隔离需求:多商户场景下,一个商户的 LLM 调用异常不能影响其他商户,进程内隔离风险过高
- 成本可控:引入 Kafka/Redis 的成本远低于因故障导致的业务损失
- 团队成长:微服务架构是团队技术积累的机会,为未来更复杂业务打好基础
9.2 关键技术选型对比
9.2.1 消息队列选型
在路由层与模型层之间,推荐引入消息队列实现异步解耦。以下对比三种方案:
| 维度 | Redis Stream | RabbitMQ | Apache Kafka |
|---|---|---|---|
| 吞吐量 | 10-30 万/秒 | 5-10 万/秒 | 100 万+/秒 |
| 延迟 | < 1ms | 1-5ms | 3-10ms |
| 消息持久化 | 可配置 | 支持 | 支持 |
| 多消费者 | 消费组支持 | 交换机路由 | 消费组原生支持 |
| 运维复杂度 | 低(已有 Redis) | 中 | 高(需独立部署) |
| 爱优适配度 | ⭐⭐⭐⭐ 首选 | ⭐⭐⭐ 备选 | ⭐⭐ 过渡方案 |
推荐:采用 Redis Stream 作为首选方案,理由:
- 爱优已有 Redis 基础设施,无需新增组件
- Redis Stream 的消费组机制完美契合多 Worker 消费模型
- 延迟最低(< 1ms),适合 LLM 调用这种延迟敏感场景
- 支持消息确认和重试,可靠性有保障
9.2.2 限流算法选型
| 算法 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | 单位时间内计数 | 简单,实现成本低 | 边界瞬间两倍突发 | 小型系统,限流精度要求低 |
| 滑动窗口 | 连续时间窗口内计数 | 精度比固定窗口高 | 实现略复杂 | 租户级限流 |
| 令牌桶 | 按速率往桶中放令牌,取令牌处理 | 支持突发流量 | 实现复杂 | API Gateway 层入口限流 |
| 漏桶 | 按固定速率从桶中漏出,满了则拒绝 | 流量整形能力强 | 突发能力弱 | 模型层出口限流(保护下游) |
| 滑动日志 | 记录每个请求时间戳 | 最精确 | 内存消耗大 | 高价值请求精确限流 |
推荐:分层使用不同算法:
- 入口层(API Gateway):令牌桶,容忍突发流量
- 租户配额层:滑动窗口,精确控制租户消耗
- 模型层出口:漏桶,保护下游 LLM API不过载
9.2.3 熔断策略选型
| 策略 | 触发条件 | 处理方式 | 恢复方式 | 爱优适配度 |
|---|---|---|---|---|
| 错误率熔断 | 连续 N 个请求中,错误率 > X% | 快速失败,返回降级响应 | X 秒后半开,允许一个请求通过 | ⭐⭐⭐⭐⭐ 首选 |
| 超时熔断 | 请求响应时间 > X ms 持续 N 次 | 快速失败 | 超时后重置计数器 | ⭐⭐⭐⭐⭐ 首选 |
| 限流熔断 | RPM/TPM 达到阈值 | 快速失败,进入排队 | 配额刷新后自动恢复 | ⭐⭐⭐⭐⭐ 首选 |
| 舱壁隔离 | 单个租户错误率 > X% | 仅隔离该租户 | 租户级联恢复 | ⭐⭐⭐⭐ 重要补充 |
| 全量熔断 | 核心依赖(Redis/Kafka)不可达 | 全部降级,提供有限功能 | 依赖恢复后自动恢复 | ⭐⭐⭐ 兜底 |
推荐:采用五层熔断体系(见 6.3 节),以上所有策略配合使用,形成纵深防御。
9.3 方案选型决策矩阵
| 评估维度(权重) | 方案 A 单体网关 | 方案 B 微服务分层 |
|---|---|---|
| 开发效率(20%) | 5 分 | 3 分 |
| 水平扩展(20%) | 2 分 | 4 分 |
| 容错能力(20%) | 2 分 | 4 分 |
| 多租户隔离(20%) | 2 分 | 4 分 |
| 运维成本(10%) | 4 分 | 2 分 |
| 长期成本(10%) | 3 分 | 4 分 |
| 加权总分 | 3.0 | 3.6 |
结论:方案 B 微服务分层架构以 3.6 分胜出,建议作为最终架构选型。
十、部署架构建议
10.1 部署策略选择
针对爱优现状和未来扩展需求,提供两种部署策略:
方案一:渐进式部署(推荐)
适用于:团队初次构建多租户 LLM 系统,希望平滑过渡,不影响现有业务。
Phase 1:基础设施准备(第 1-2 周)
- 部署 Redis Cluster(3 主 3 从,支撑高可用)
- 部署 Kafka 集群(3 节点,用于路由层与模型层解耦)
- 部署基础设施监控(Prometheus + Grafana)
- 部署日志收集(ELK 或 Loki)
- 设计并评审网关微服务拆分方案
Phase 2:核心服务构建(第 3-6 周)
- 搭建 API Gateway 服务(接入层)
- 搭建 Router 服务(路由层,含租户识别 + 模型选择)
- 搭建 Credential Pool 服务(模型层,含 Pool 管理 + 负载均衡)
- 搭建限流熔断服务(独立 Sidecar 或内嵌)
- 串联测试:API Gateway → Router → Credential Pool 全链路
Phase 3:多租户隔离落地(第 7-8 周)
- 实现租户配额管理(硬隔离 + 软隔离)
- 实现 Token 计数与计费系统
- 实现分租户监控仪表盘
- 业务灰度切换:先接入 10 个租户试运行
Phase 4:全量迁移与优化(第 9-10 周)
- 全量商户迁移
- 性能压测与瓶颈定位
- 限流阈值调优
- 熔断策略参数优化
- 编写运维手册与应急预案
方案二:一步到位式部署
适用于:团队已有微服务经验,希望在更短时间内完成架构升级,愿意接受较高的一次性投入。
第 1-4 周:架构设计与基础设施
- 完整架构设计评审(API Gateway + Router + Pool + 限流熔断 + 监控)
- 基础设施一步到位(Redis Cluster + Kafka + K8s)
- 所有微服务并行开发
第 5-8 周:联调与部署
- 所有服务集成联调
- 蓝绿部署,灰度切换
- 全链路压测
风险提示:一步到位式部署风险集中,团队压力较大,不建议首次构建时采用。
10.2 部署拓扑图
以下为推荐架构的部署拓扑(方案一 Phase 2 完成后的目标状态):
┌─────────────────────────────────┐
│ 腾讯云重庆一区 │
│ │
用户请求 │ ┌─────────────────────────┐ │
──────────► HTTPS │ │ API Gateway(2节点) │ │
443 │ │ 限流 + 认证 + 租户识别 │ │
│ └──────────┬──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────┐ │
│ │ Router 服务(2节点) │ │
│ │ 模型路由 + 优先级调度 │ │
│ └──────────┬──────────────┘ │
│ │ │
│ ┌──────────▼──────────────┐ │
│ │ Kafka(3节点) │ │
│ │ 请求队列持久化 │ │
│ └──────────┬──────────────┘ │
│ │ │
│ ┌──────────▼──────────────┐ │
│ │ Credential Pool 服务 │ │
│ │ (2节点 + 多Key管理) │ │
│ └──────────┬──────────────┘ │
│ │ │
│ ┌──────────▼──────────────┐ │
│ │ MiniMax API │ │
│ │ DeepSeek API │ │
│ │ (多模型Fallback) │ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ │
│ │ Redis Cluster(3主3从) │ │
│ │ 会话存储 + 限流计数 │ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ │
│ │ Prometheus + Grafana │ │
│ │ 全链路监控告警 │ │
│ └─────────────────────────┘ │
│ │
└─────────────────────────────────┘
10.3 服务规格与扩容策略
| 服务 | 推荐规格 | 最小实例数 | 扩容触发条件 | 最大实例数 |
|---|---|---|---|---|
| API Gateway | 2C4G | 2 | CPU > 70% 持续 5min | 10 |
| Router | 4C8G | 2 | 请求队列积压 > 1000 | 8 |
| Credential Pool | 4C8G | 2 | 等待调度的请求 > 500 | 6 |
| Redis Cluster | 4C16G × 6 节点 | 6 | 内存使用 > 70% | - |
| Kafka | 4C8G × 3 节点 | 3 | 消费Lag > 10,000 | 9 |
| Prometheus | 4C8G | 1 | 查询延迟 > 2s | 3 |
10.4 高可用保障方案
原则:任意单个服务节点故障不影响整体服务,目标是 99.9% 可用性。
| 保障措施 | 实现方式 |
|---|---|
| API Gateway 高可用 | 2 节点 Active-Active,SLB 负载均衡,健康检查 |
| Router 高可用 | 2 节点 Active-Active,通过 Kafka 实现请求分发 |
| Credential Pool 高可用 | 2 节点 Active-Active,Redis 分布式锁竞争 |
| Redis 高可用 | 3 主 3 从 Cluster,自动主从切换 |
| Kafka 高可用 | 3 节点集群 + Replication Factor=3 |
| 多模型容灾 | 主模型故障自动切换 Fallback 模型 |
| 多地域容灾 | 未来扩展至其他地域(西南/华东),流量跨地域调度 |
10.5 部署检查清单
每次部署前,必须逐项确认以下内容:
部署前检查(Pre-Deploy)
- [ ] 所有配置已通过配置中心管理,无需硬编码
- [ ] 新版本已在预发环境完成全链路测试
- [ ] 数据库/Redis 迁移脚本已准备并评审
- [ ] 监控告警已配置,阈值已设定
- [ ] 回滚方案已准备(30 分钟内可回滚)
- [ ] 业务方已通知(变更窗口已沟通)
部署后检查(Post-Deploy)
- [ ] 所有服务进程正常,无 OOM 或 Crash
- [ ] 健康检查接口全部通过
- [ ] 关键指标无异常(QPS、延迟、错误率)
- [ ] 上下游依赖全部可用
- [ ] 告警无异常触发
- [ ] 业务日志无大量 ERROR
日常巡检(Daily)
- [ ] 各服务 CPU/内存使用率 < 70%
- [ ] Kafka 消费Lag < 1000
- [ ] Redis 内存使用率 < 70%
- [ ] 无未处理的 P1/P2 告警
- [ ] 限额消耗 < 80%
十一、服务器配置选型
11.1 爱优现有服务器资产梳理
在规划新增服务器之前,先梳理爱优现有的云服务器资产,确保新增资源与现有资产形成互补,避免重复建设。
现有服务器资产总览
| 序号 | 名称 | 实例ID | 地域 | 配置 | 到期时间 | 当前角色 | 承载能力 |
|---|---|---|---|---|---|---|---|
| 1 | 爱优零售系统 | ins-6pqum2dw | 重庆一区 | 8C16G 8Mbps | - | 零售业务主服务 | 支撑现有 582 商户/700 门店业务 |
| 2 | 智能体服务平台 | ins-6pq4nf9u | 重庆一区 | 16C32G 8Mbps | 2029-04-29 | AI 智能体核心平台 | 支撑 Agent 对话、工具调用 |
| 3 | 供应链云数据平台 | - | 重庆一区 | 待定 | - | 规划中 | 供应链数据处理 |
| 4 | TDSQL-C 读写实例 | cynosdbmysql-b1khdph3 | 重庆一区 | 8C16G 独享型 | - | 爱优新零售主库 | 377 表/370 万+订单/582 商户 |
| 5 | TDSQL-C 只读实例 | cynosdbmysql-ins-2ms4mknc | 重庆一区 | 1C2G 通用型 | - | 读库分流 | 报表查询、数据分析 |
| 6 | 供应链云数据平台 | cdb-n31vn1pv | 重庆一区 | 4C8G 200GB 双节点 | - | 需扩容 | 供应链业务数据 |
现有资产能力评估
| 资源 | 当前负载 | 空闲容量 | 可用于 LLM 网关? |
|---|---|---|---|
| 爱优零售系统(8C16G) | 零售业务 | CPU 约 40% 可用,内存约 8G 可用 | 可容器化部署 LLM 网关组件(建议复用) |
| 智能体服务平台(16C32G) | Agent 核心推理 | CPU 约 50% 可用,内存约 16G 可用 | 可部署 Router 服务(推理密集型) |
| TDSQL-C 只读实例(1C2G) | 报表查询 | 几乎满负荷 | 不可用,需扩容 |
| 供应链云数据平台(4C8G) | 规划中 | 全部空闲 | 可用于 Kafka 或监控组件 |
11.2 LLM 网关组件服务器需求分析
基于第十章的微服务分层架构,每个组件的资源需求分析如下:
| 组件 | 推荐规格 | 最低规格 | 推荐实例数 | 主要资源消耗 |
|---|---|---|---|---|
| API Gateway | 2C4G | 2C2G | 2 | CPU-bound(TLS termination + 限流计算) |
| Router 服务 | 4C8G | 2C4G | 2 | CPU-bound(路由查找 + 优先级调度) |
| Credential Pool 服务 | 4C8G | 2C4G | 2 | CPU-bound(Key 管理 + 负载均衡) |
| Redis Cluster 数据节点 | 4C16G | 4C8G | 6 节点(3主3从) | 内存密集(会话 + 限流计数) |
| Kafka Broker | 4C8G | 2C4G | 3 | CPU + 磁盘 IO 密集(消息持久化) |
| Prometheus | 4C8G | 2C4G | 1 | 磁盘密集(时序数据写入) |
11.3 新增服务器配置推荐方案
方案 A:最小化配置(支撑 500-1000 并发)
适用于:初期验证阶段,租户 < 500,并发 < 200。
| 组件 | 云厂商 | 规格 | 数量 | 月成本估算 |
|---|---|---|---|---|
| Redis Cluster 数据节点 | 腾讯云 Redis | 4C8G 性能型 | 3 节点(原有扩容) | ¥600/节点 × 3 |
| Kafka Broker | 自建 | 2C4G 云服务器 | 3 | ¥300/节点 × 3 |
| Prometheus | 自建 | 2C4G 云服务器 | 1 | ¥300 × 1 |
| 合计 | ¥3,300/月 |
注:API Gateway、Router、Credential Pool 服务建议优先复用现有爱优零售系统(8C16G),通过 Docker 容器化部署,利用剩余空闲资源。
方案 B:生产级配置(支撑 1000-5000 并发)
适用于:生产环境,租户 500-5000,并发 200-1000。
| 组件 | 云厂商 | 规格 | 数量 | 月成本估算 |
|---|---|---|---|---|
| Redis Cluster | 腾讯云 Redis | 4C16G 性能型 | 6 节点(3主3从) | ¥1,200/节点 × 6 |
| Kafka Broker | 自建 | 4C8G 云服务器 | 3 | ¥600/节点 × 3 |
| Prometheus + Grafana | 自建 | 4C8G 云服务器 | 1 | ¥600 × 1 |
| API Gateway 独立部署 | 自建 | 2C4G 云服务器 | 2 | ¥500/节点 × 2 |
| Router 服务独立部署 | 自建 | 4C8G 云服务器 | 2 | ¥800/节点 × 2 |
| Credential Pool 独立部署 | 自建 | 4C8G 云服务器 | 2 | ¥800/节点 × 2 |
| 合计 | ¥14,200/月 |
方案 C:弹性高可用配置(支撑 5000-10000+ 并发)
适用于:大规模生产环境,租户 5000+,并发 1000+,支持多活容灾。
| 组件 | 云厂商 | 规格 | 数量 | 月成本估算 |
|---|---|---|---|---|
| Redis Cluster | 腾讯云 Redis | 8C32G 性能型 | 6 节点(3主3从) | ¥3,000/节点 × 6 |
| Kafka 集群 | 自建 | 8C16G 云服务器 | 5(含 Observer) | ¥1,500/节点 × 5 |
| Prometheus 集群 | 自建 | 4C8G 云服务器 | 2 | ¥600 × 2 |
| API Gateway | 自建 | 4C8G 云服务器 | 3 | ¥800 × 3 |
| Router 服务 | 自建 | 8C16G 云服务器 | 3 | ¥1,500 × 3 |
| Credential Pool 服务 | 自建 | 8C16G 云服务器 | 3 | ¥1,500 × 3 |
| 合计 | ¥34,200/月 |
11.4 爱优实际情况的配置选型建议
结合爱优现有资产状况(爱优零售系统 8C16G 和智能体服务平台 16C32G),给出如下实操建议:
第一阶段(当前 ~500 租户)
零新增服务器——基于现有资产容器化部署:
| 现有资产 | 复用方式 | 部署组件 | 剩余容量 |
|---|---|---|---|
| 爱优零售系统(8C16G) | Docker Compose 部署 | API Gateway + Router + Pool + Kafka 单节点 | CPU 40%,内存 8G |
| 智能体服务平台(16C32G) | 独享命名空间 | Kafka Broker × 2 + Kafka Observer × 1 | CPU 30%,内存 20G |
| Redis | 腾讯云 Redis 新建 | Redis Cluster(3主3从)付费版 | 现有 |
| Prometheus | 复用监控服务器 | 新增容器 | 现有监控节点 |
新增成本:¥3,600/月(Redis Cluster 6节点性能型 + 少量云服务器)
第二阶段(500~2000 租户)
新增 2 台 4C8G 云服务器:
- 1 台:Router 服务(替代复用方案,独立部署)
- 1 台:Kafka Broker 扩展 + Credential Pool 备用节点
新增成本:¥1,600/月
第三阶段(2000~10000 租户)
全面切换至方案 B 生产级配置,新增独立服务器取代复用模式。
11.5 数据库扩容规划
当前爱优数据库资产中,与 LLM 网关相关的是只读实例(1C2G),该实例已接近满负荷。LLM 网关的 Token 计数、租户配额记录、调用日志等数据需要独立的分析型数据库。
ClickHouse / Elasticsearch / Milvus 采购建议
文档中规划的「数据分析中心平台」(ClickHouse + Elasticsearch + Milvus)尚未采购,这三个组件的选型建议:
| 组件 | 用途 | 最低配置 | 推荐配置 | 说明 |
|---|---|---|---|---|
| ClickHouse | 调用日志分析、成本统计、聚合报表 | 4C16G × 3 节点 | 8C32G × 3 节点 | 列式存储,适合大规模日志分析 |
| Elasticsearch | 商品检索、日志全文检索 | 4C8G × 3 节点 | 8C16G × 3 节点 | 爱优已有 ES 使用经验 |
| Milvus | 向量检索(相似商品推荐、RAG 知识库) | 4C8G × 3 节点 | 8C16G × 3 节点 | 需要较大内存用于向量索引 |
采购优先级:ClickHouse > Elasticsearch > Milvus。初期可先部署 ClickHouse 承载 LLM 调用日志分析,其他两个根据业务需求逐步采购。
11.6 服务器配置选型决策矩阵
| 评估维度(权重) | 方案 A 最小化 | 方案 B 生产级 | 方案 C 弹性高可用 |
|---|---|---|---|
| 成本(30%) | 5 分 | 3 分 | 1 分 |
| 并发支撑能力(25%) | 1 分 | 4 分 | 5 分 |
| 高可用性(20%) | 2 分 | 4 分 | 5 分 |
| 扩展性(15%) | 2 分 | 3 分 | 5 分 |
| 运维复杂度(10%) | 4 分 | 3 分 | 1 分 |
| 加权总分 | 2.85 | 3.50 | 3.30 |
爱优实际选型建议:
- 当前阶段(< 500 租户):方案 A 最小化 + 复用现有资产,成本最低,¥3,600/月
- 中期目标(500-2000 租户):方案 A 基础上逐步升级到方案 B
- 长期目标(2000-10000 租户):方案 B 全面部署,¥14,200/月
附录:技术参考
- 参考文档:企业微信多租户智能体技术方案