LiteLLM Customization
LiteLLM 에 대한 이야기를 이어서 써보기로 한다. LiteLLM 이외의 다른 대안을 찾아봤는데 딱히 괜찮아 보이는 오픈소스는 찾기 힘들었다. 심플한데 주요기능이 없거나 또 괜찮다 싶으면 핵심기능이 유료구독인 경우가 많았다.
그에 비해 LiteLLM 은 꽤나 후한편이었고 부족한 부분은 만들면 그만이었다. 다음은 LiteLLM 의 Enterprise 버전에서만 제공하는 기능들이다.
1. 보안 & 접근 제어
- Admin UI SSO/SAML (Okta, Azure AD 등)
- JWT 기반 인증 (enable_jwt_auth, 커스텀 JWT 검증 함수 연동)
- Audit Logs — 키/팀/유저/모델의 생성·수정·삭제·재발급 이력 및 보존 정책 (Enterprise License 필수)
- RBAC(역할 기반 접근 제어)
- Public/Private 라우트 제어
- IP 화이트리스트 (allowed_ips)
- Virtual key 자동 로테이션
- Secret Manager 연동 (AWS KMS, HashiCorp Vault 등)
- AI Hub — 사용 가능한 모델/에이전트를 브랜딩된 공개 페이지로 공유
2. Guardrails (일부 Enterprise 전용/유료 연동)
- Prompt Injection 탐지 (Pillar Security, HiddenLayer 등 서드파티 연동)
- Secret Detection (hide-secrets — GitHub PAT, Stripe 키 등 탐지, detect-secrets 플러그인 지정 가능)
- Aporia, Lakera 등 서드파티 가드레일 통합
3. 운영/관리
- Spend Tracking, Multi-team 관리
- Managed Files with Finetuning — /fine_tuning/jobs 엔드포인트를 통한 배치 파인튜닝 관리 (문서상 "free Enterprise feature"로 별도 표기)
Call Hooks
Enterprise 기능요약을 보면 "Virtual Key 자동 로테이션" 이 우리가 필요했던 기능이고 특정기간(사규는 60일 기준)만 유효한 키가 만료가 되면 자동으로 키를 갱신해주는 기능일 것이다.
개발 문서를 파던중에 아래 Call Hooks 문서를 발견했고 해당 기능이라면 원하는 어떤 기능이든 만들 수 있지 않을까 싶었다. Hook 을 걸면 어떤 요청이든 LLM 에 전달하기 전이든 후든 파이썬 스크립트를 돌릴 수 있었고 이미 디비와 Redis 에 접속할 수 있는 권한이 있으니 그야말로 뭐든지 할 수 있는 상황이다.

1) async_pre_call_hook
이 Hook은 LLM 를 호출하기 "전에" 실행되는 프록시 Hook이다.
class MyCustomHandler(CustomLogger):
async def async_pre_call_hook(self, user_api_key_dict, cache, data, call_type):
data["model"] = "my-new-model"
return data
proxy_handler_instance = MyCustomHandler()위 예제를 보면 "data(요청 바디 dict)" 가 있는데 이게 LLM 에 전달하는 data 인자값이고 예제는 그 data 값에서 모델을 my-new-model 로 강제하는 것을 볼 수 있다.
다시 요약하면 LLM 에 전달하기 전에 이 Hook 으로 프로세스를 제어할 수 있는 기능을 한다고 보면 된다.
다른 예제를 하나 더 보자. 아래 예제는 SYSTEM_PROMPTS 에 설정된 모델 (marketer-v1) 이라면 SYSTEM_PROMPTS에서 정해둔 프롬프트를 강제적으로 적용하게 하는 기능을 한다.
SYSTEM_PROMPTS = {
"marketer-v1": """
당신은 마케팅본부의 수석 데이터 사이언티스트입니다.
모든 분석은 정량 근거 기반 + 통계적 타당성 + 불확실성 명시 + 허위 수치 생성 금지 원칙을 따릅니다.
"""
}
class SystemPromptInjector(CustomLogger):
async def async_pre_call_hook(self, user_api_key_dict, cache, data, call_type):
model = data.get("model", "")
system_prompt = SYSTEM_PROMPTS.get(model)
if not system_prompt:
return data # 해당 모델용 프롬프트가 없으면 그대로 통과
messages = data.get("messages", [])
if messages and messages[0].get("role") == "system":
messages[0]["content"] += "\n\n" + system_prompt
else:
data["messages"] = [{"role": "system", "content": system_prompt}] + messages
return data아마도 config.yaml 에는 다음과 같이 설정해야 할 것이다
model_list:
- model_name: marketer-v1
litellm_params:
model: global.anthropic.claude-sonnet-4-6
litellm_settings:
timezone: "Asia/Seoul"
callbacks: ["smtp_email", proxy_handler_instance] # 이 부분 참고config.yaml
이로서 litellm 을 사용하는 claude code 계열의 에이전트는 claude-sonnet-4-6 를 베이스로 하는 marketer-v1 라는 모델을 사용할 수 있고 해당 모델을 사용할때는 모든 프롬프트에 SYSTEM_PROMPTS 에 설정한 프롬프트가 적용된다.
2) async_post_call_failure_hook
이 Hook은 프록시계층(litellm) 에서 "오류가 발생" 한뒤 클라이언트에게 응답을 주기 전에 실행된다. 아마도 전체 구조를 그리면 다음과 같을 것이다.
클라이언트 -> 프록시계층(litellm) -> LLM -> 프록시계층(litellm) -> 클라이언트
위 Hook 의 이벤트는 클라이언트가 프록시계층(litellm) 에 LLM 에 특정 메시지를 전달해 달라고 요청했는데 litellm 가 오류를 발생하며 거절한 것이다. 키가 만료 됐다거나 budget 이 부족하다거나 하는 이유일 것이다.
예제를 하나 보자.
class MyCustomHandler(CustomLogger):
async def async_post_call_failure_hook(
self, request_data, original_exception, user_api_key_dict, traceback_str=None
):
if isinstance(original_exception, litellm.ContextWindowExceededError):
return HTTPException(
status_code=400,
detail="프롬프트 길이가 너무 길어요. 길이를 줄여서 다시 시도하세요."
)
return Noneoriginal_exception 로 전달받은 에러가 litellm.ContextWindowExceededError 처럼 이미 정의된 컨텍스트 길이 초과 에러라면 그에 맞는 예외문구로 클라이언트에게 다시 전달 하는 예시이다.
이 Hook 을 사용하면 만료키에 대한 대응은 에러유형이 키만료 상태이면 만료키를 삭제하고 생성하고 넘어온 사용자 정보로 메일을 보내는 프로세스를 작성하면 된다.
사용자 경험은 아마도, LLM 을 사용 하다가 키가 만료됐다는 메시지를 클로드 코드든 클로드 앱에서든 "만료가 됐고 이메일로 메일을 보냈어" 라는 취지의 메시지를 받을 것이다.
조금 더 편한 사용자 경험을 제공할 수 있겠지만 내가 구현한 것은 여기까지다. 도움이 될까 싶어 코드를 정리해서 첨부한다.
import asyncio
import json
import os
import re
import smtplib
import ssl
import urllib.request
from datetime import datetime
from email.message import EmailMessage
from litellm.integrations.custom_logger import CustomLogger
# 모델 이름 -> 강제로 주입할 시스템 프롬프트 매핑
SYSTEM_PROMPTS = {
"marketer": """
당신은 수석 데이터 사이언티스트입니다.
모든 분석은 정량 근거 기반 + 통계적 타당성 + 불확실성 명시 + 허위 수치 생성 금지 원칙을 따릅니다.
""",
}
# 신규 키에 그대로 물려줄 설정 필드. key_alias는 유니크 제약 때문에 신규 발급 시점에는
# 일부러 뺀다(만료 키가 아직 그 alias를 쥐고 있어 충돌 나므로, alias는 나중에 별도로 옮긴다).
_KEY_COPY_FIELDS = (
"user_id", "team_id",
"models", "max_budget", "budget_duration", "metadata",
"max_parallel_requests", "tpm_limit", "rpm_limit",
"model_max_budget", "allowed_cache_controls",
"permissions", "soft_budget_cooldown", "allowed_routes",
)
_EXPIRED_KEY_PATTERNS = ("expired key", "key expired", "expired api key", "api key expired")
class SystemPromptInjector(CustomLogger):
"""모델별 시스템 프롬프트 주입 + API 키 만료 시 자동 재발급/메일 안내."""
def __init__(self):
super().__init__()
# 같은 만료 키로 재시도가 여러 번 들어와도 새 키를 중복 발급/발송하지 않기 위한 표시.
# 신규 키 발급까지 "성공"한 뒤에만 채워 넣는다(아래 _handle_expired_key 참고).
# 프로세스 로컬이라 워커가 여러 개면 완벽히 막지는 못하지만, 최악의 경우도 "새 키가
# 두 번 발급되어 메일이 중복 발송"되는 수준이라(잠긴 채로 방치되는 것보다 훨씬 낫다) 이 정도로 충분하다.
self._alerted_keys = set()
async def async_pre_call_hook(self, user_api_key_dict, cache, data, call_type):
model = data.get("model", "")
system_prompt = SYSTEM_PROMPTS.get(model)
if not system_prompt:
return data
if call_type == "anthropic_messages":
# Anthropic API는 system이 messages 안이 아니라 별도 최상위 필드라 병합 방식이 다르다
existing = data.get("system")
if not existing:
data["system"] = system_prompt
elif isinstance(existing, list):
data["system"] = existing + [{"type": "text", "text": system_prompt}]
else:
data["system"] = existing + "\n\n" + system_prompt
else:
# OpenAI 스타일은 system이 messages 리스트의 첫 항목이다
messages = data.get("messages", [])
if messages and messages[0].get("role") == "system":
content = messages[0]["content"]
# content가 문자열이 아니라 content-block 리스트로 오는 클라이언트도 있다
if isinstance(content, list):
messages[0]["content"] = content + [{"type": "text", "text": system_prompt}]
else:
messages[0]["content"] = content + "\n\n" + system_prompt
else:
data["messages"] = [{"role": "system", "content": system_prompt}] + messages
return data
async def async_post_call_failure_hook(self, request_data, original_exception, user_api_key_dict, **kwargs):
"""인증 단계에서 거부된 요청(만료 키 등)을 가로채는 훅.
키 만료는 실제 LLM 호출 전에 프록시가 막기 때문에 이 훅에서만 처리하면 된다.
"""
try:
await self._handle_expired_key(original_exception, user_api_key_dict)
except Exception as e:
print(f"[ERROR] expired-key alert failed: {e}", flush=True)
async def _handle_expired_key(self, exception, user_api_key_dict):
message = str(getattr(exception, "message", exception))
if not any(pattern in message.lower() for pattern in _EXPIRED_KEY_PATTERNS):
return # 만료 키 에러가 아니면 처리하지 않음
old_identifier = self._extract_key_identifier(exception, user_api_key_dict)
if not old_identifier or old_identifier in self._alerted_keys:
return
# 1. 조회 — 만료된 키라도 마스터 키로 /key/info를 호출하면 정보를 돌려준다.
# (거절되는 건 "만료된 키 자신"으로 인증할 때뿐이라, DB 직접 접근이 필요 없다)
key_info = await asyncio.to_thread(self._get_key_info, old_identifier)
if not key_info:
print(f"[WARN] expired key {old_identifier[:12]}...: 키 정보를 찾지 못해 알림을 건너뜀", flush=True)
return
recipient = await asyncio.to_thread(self._resolve_recipient, key_info)
if not recipient:
print(f"[WARN] expired key {old_identifier[:12]}...: 수신자 이메일을 찾지 못해 알림을 건너뜀", flush=True)
return
# 2. 생성 — 신규 키를 먼저 확보한다. 만료 키 삭제/alias 이전은 성공한 뒤에만 진행하므로,
# 여기서 실패해도 사용자는 (원래부터 못 쓰던) 만료 키 상태 그대로일 뿐 추가로 잃는 게 없다.
new_key = await asyncio.to_thread(self._generate_new_key, key_info)
if not new_key:
print(f"[ERROR] expired key {old_identifier[:12]}...: 신규 키 발급 실패", flush=True)
return
# 신규 키 확보에 성공한 시점부터만 dedup 마킹 — 그래야 위 단계 실패 시 다음 이벤트에서 재시도된다
self._alerted_keys.add(old_identifier)
# 3. 메일 발송
sent = await asyncio.to_thread(self._send_email, recipient, new_key)
print(f"[INFO] expired key alert: recipient={recipient} email_sent={sent}", flush=True)
# 4. 뒷정리(best-effort) — 만료 키 삭제 + 신규 키에 alias 이전. 실패해도 이미 신규 키는
# 발급되고 메일도 나갔으니 사용자 입장에서는 문제없다.
await asyncio.to_thread(self._finalize_old_key, old_identifier, new_key, key_info.get("key_alias") or "")
def _extract_key_identifier(self, exception, user_api_key_dict) -> str:
"""/key/info, /key/delete는 raw sk-... 든 해시든 둘 다 알아서 정규화해서 받아준다."""
uak = user_api_key_dict.model_dump() if hasattr(user_api_key_dict, "model_dump") else (user_api_key_dict or {})
if isinstance(uak, dict) and uak.get("token"):
return uak["token"]
# 인증 단계에서 막힌 요청은 uak가 비어 있을 수 있어, 에러 메시지에 찍힌 키를 정규식으로 뽑아낸다
message = str(getattr(exception, "message", exception))
match = re.search(r"Received API Key\s*=\s*(sk-\S+)", message)
return match.group(1).rstrip(",") if match else ""
def _call_admin_api(self, method: str, path: str, payload: dict = None) -> dict:
"""LiteLLM 프록시 관리자 API(/key/*, /user/*)를 마스터 키로 호출한다."""
master_key = os.environ.get("LITELLM_MASTER_KEY")
base_url = os.environ.get("LITELLM_PROXY_BASE_URL", "http://localhost:8080").rstrip("/")
headers = {"Authorization": f"Bearer {master_key}"}
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(f"{base_url}{path}", data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.load(resp)
def _get_key_info(self, identifier: str) -> dict:
try:
result = self._call_admin_api("GET", f"/key/info?key={identifier}")
return result.get("info") or result or {}
except Exception as e:
print(f"[WARN] /key/info 조회 실패: {e}", flush=True)
return {}
def _get_user_email(self, user_id: str) -> str:
if not user_id:
return ""
try:
result = self._call_admin_api("GET", f"/user/info?user_id={user_id}")
user_info = result.get("user_info") if isinstance(result, dict) else None
if isinstance(user_info, dict) and user_info.get("user_email"):
return user_info["user_email"]
return result.get("user_email", "") if isinstance(result, dict) else ""
except Exception as e:
print(f"[WARN] /user/info 조회 실패: {e}", flush=True)
return ""
def _resolve_recipient(self, key_info: dict) -> str:
"""key_alias -> user_email(/user/info) -> 관리자 알림용 env 순으로 이메일을 찾는다."""
key_alias = key_info.get("key_alias") or ""
if "@" in key_alias:
return key_alias
user_email = self._get_user_email(key_info.get("user_id") or "")
if "@" in user_email:
return user_email
return os.environ.get("ALERT_EMAIL", "")
def _generate_new_key(self, key_info: dict) -> str:
"""만료된 키와 동일한 예산/모델/팀 설정으로 신규 키를 발급한다."""
payload = {
"duration": "60d",
**{k: key_info[k] for k in _KEY_COPY_FIELDS if key_info.get(k) is not None},
}
try:
result = self._call_admin_api("POST", "/key/generate", payload)
return result.get("key", "")
except Exception as e:
print(f"[ERROR] /key/generate 호출 실패: {e}", flush=True)
return ""
def _finalize_old_key(self, old_identifier: str, new_key: str, key_alias: str):
"""만료 키 삭제 + 신규 키로 alias 이전. 신규 키가 이미 발급/발송된 뒤에만 호출된다."""
try:
self._call_admin_api("POST", "/key/delete", {"keys": [old_identifier]})
except Exception as e:
print(f"[WARN] 만료 키 삭제 실패(무시): {e}", flush=True)
if key_alias:
try:
self._call_admin_api("POST", "/key/update", {"key": new_key, "key_alias": key_alias})
except Exception as e:
print(f"[WARN] 신규 키 alias 이전 실패(무시): {e}", flush=True)
def _send_email(self, recipient: str, new_key: str) -> bool:
"""SMTP로 신규 키 발급 안내 메일을 보낸다."""
smtp_host = os.environ.get("SMTP_HOST")
smtp_port = int(os.environ.get("SMTP_PORT", 587))
smtp_user = os.environ.get("SMTP_USERNAME")
smtp_pass = os.environ.get("SMTP_PASSWORD")
sender = os.environ.get("SMTP_SENDER_EMAIL", smtp_user)
msg = EmailMessage()
msg["Subject"] = "[AX] API Key 만료 안내 - 신규 키 발급됨"
msg["From"] = sender
msg["To"] = recipient
msg.set_content(
"안녕하세요, LLM PROXY 시스템 알림입니다.\n\n"
"사용 중인 API Key가 만료되어 신규 키가 자동 발급되었습니다.\n\n"
f"신규 API Key: {new_key}\n\n"
f"발급 시각: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
"감사합니다."
)
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.ehlo()
server.starttls(context=ssl.create_default_context())
server.ehlo()
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return True
except Exception as e:
print(f"[ERROR] SMTP 발송 실패: {e}", flush=True)
return False
# config.yaml의 callbacks 목록에서 참조하는 진입점
proxy_handler_instance = SystemPromptInjector()