swl_jwt.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import datetime
  2. from fastapi import Depends, HTTPException, status, Header, Security
  3. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  4. import jwt
  5. from config import SECRET_KEY
  6. security = HTTPBearer()
  7. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
  8. if credentials:
  9. try:
  10. payload = await verify_jwt_token()
  11. return payload # 或者返回一个包含用户信息的自定义对象
  12. except Exception as e:
  13. raise HTTPException(
  14. status_code=status.HTTP_403_FORBIDDEN,
  15. detail="Invalid token",
  16. headers={"WWW-Authenticate": "Bearer"},
  17. )
  18. else:
  19. raise HTTPException(
  20. status_code=status.HTTP_401_UNAUTHORIZED,
  21. detail="Unauthorized",
  22. headers={"WWW-Authenticate": "Bearer"},
  23. )
  24. async def get_token_from_header(authorization: str = Header(None)):
  25. if not authorization:
  26. raise HTTPException(
  27. status_code=status.HTTP_403_FORBIDDEN,
  28. detail="Not authenticated",
  29. )
  30. # 去掉 "Bearer " 前缀
  31. if not authorization.startswith("Bearer "):
  32. raise HTTPException(
  33. status_code=status.HTTP_403_FORBIDDEN,
  34. detail="Invalid authentication scheme",
  35. )
  36. return authorization.replace("Bearer ", "")
  37. async def verify_jwt_token(token: str = Security(get_token_from_header)):
  38. try:
  39. payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
  40. return payload.get("sub")
  41. except jwt.ExpiredSignatureError:
  42. raise HTTPException(
  43. status_code=status.HTTP_403_FORBIDDEN,
  44. detail="Token is expired",
  45. )
  46. except jwt.InvalidTokenError:
  47. raise HTTPException(
  48. status_code=status.HTTP_403_FORBIDDEN,
  49. detail="Invalid token",
  50. )
  51. from db.user_oauth import UserOAuthRepository,UserOAuthToken
  52. from db.user import User,UserRepo
  53. def verify_user(open_id: str = Depends(verify_jwt_token)):
  54. res = UserRepo().select(User.open_id == open_id)
  55. user:User = res.first()
  56. if not user:
  57. return
  58. oauth:UserOAuthToken = user.oauth
  59. if (oauth.expires_at - datetime.datetime.now()).total_seconds() <= 0:
  60. raise HTTPException(
  61. status_code=status.HTTP_403_FORBIDDEN,
  62. detail="open-douyin Token is expired",
  63. )
  64. return user