| 12345678910111213141516171819202122232425262728293031323334 |
- from fastapi import Depends, HTTPException, status, Header, Security
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- import jwt
- from config import JWT_SECRET_KEY
-
- async def get_token_from_header(authorization: str = Header(None)):
- if not authorization:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authenticated",
- )
- # 去掉 "Bearer " 前缀
- if not authorization.startswith("Bearer "):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid authentication scheme",
- )
- return authorization.replace("Bearer ", "")
-
- async def verify_jwt_token(token: str = Security(get_token_from_header)):
- try:
- payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=["HS256"])
- return {"sub": payload.get("sub")}
- except jwt.ExpiredSignatureError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Token is expired",
- )
- except jwt.InvalidTokenError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid token",
- )
-
|