jwt.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334
  1. from fastapi import Depends, HTTPException, status, Header, Security
  2. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  3. import jwt
  4. from config import JWT_SECRET_KEY
  5. async def get_token_from_header(authorization: str = Header(None)):
  6. if not authorization:
  7. raise HTTPException(
  8. status_code=status.HTTP_403_FORBIDDEN,
  9. detail="Not authenticated",
  10. )
  11. # 去掉 "Bearer " 前缀
  12. if not authorization.startswith("Bearer "):
  13. raise HTTPException(
  14. status_code=status.HTTP_403_FORBIDDEN,
  15. detail="Invalid authentication scheme",
  16. )
  17. return authorization.replace("Bearer ", "")
  18. async def verify_jwt_token(token: str = Security(get_token_from_header)):
  19. try:
  20. payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=["HS256"])
  21. return {"sub": payload.get("sub")}
  22. except jwt.ExpiredSignatureError:
  23. raise HTTPException(
  24. status_code=status.HTTP_403_FORBIDDEN,
  25. detail="Token is expired",
  26. )
  27. except jwt.InvalidTokenError:
  28. raise HTTPException(
  29. status_code=status.HTTP_403_FORBIDDEN,
  30. detail="Invalid token",
  31. )