jwt.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from fastapi import Depends, HTTPException, status, Header, Security
  2. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  3. import jwt
  4. from config import SECRET_KEY
  5. security = HTTPBearer()
  6. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
  7. if credentials:
  8. try:
  9. payload = await verify_jwt_token()
  10. return payload # 或者返回一个包含用户信息的自定义对象
  11. except Exception as e:
  12. raise HTTPException(
  13. status_code=status.HTTP_403_FORBIDDEN,
  14. detail="Invalid token",
  15. headers={"WWW-Authenticate": "Bearer"},
  16. )
  17. else:
  18. raise HTTPException(
  19. status_code=status.HTTP_401_UNAUTHORIZED,
  20. detail="Unauthorized",
  21. headers={"WWW-Authenticate": "Bearer"},
  22. )
  23. async def get_token_from_header(authorization: str = Header(None)):
  24. if not authorization:
  25. raise HTTPException(
  26. status_code=status.HTTP_403_FORBIDDEN,
  27. detail="Not authenticated",
  28. )
  29. # 去掉 "Bearer " 前缀
  30. if not authorization.startswith("Bearer "):
  31. raise HTTPException(
  32. status_code=status.HTTP_403_FORBIDDEN,
  33. detail="Invalid authentication scheme",
  34. )
  35. return authorization.replace("Bearer ", "")
  36. async def verify_jwt_token(token: str = Security(get_token_from_header)):
  37. try:
  38. payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
  39. return payload.get("sub")
  40. except jwt.ExpiredSignatureError:
  41. raise HTTPException(
  42. status_code=status.HTTP_403_FORBIDDEN,
  43. detail="Token is expired",
  44. )
  45. except jwt.InvalidTokenError:
  46. raise HTTPException(
  47. status_code=status.HTTP_403_FORBIDDEN,
  48. detail="Invalid token",
  49. )
  50. from db.user import UserOAuthRepository,UserOAuthToken
  51. def get_uer_oauth_and_verify(open_id: str = Depends(verify_jwt_token)):
  52. db_oauth:UserOAuthToken = UserOAuthRepository().get_by_open_id(open_id)
  53. # 没有用户凭证,需要扫码登陆
  54. if not db_oauth:
  55. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="need login")
  56. return db_oauth