| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import datetime
- from fastapi import Depends, HTTPException, status, Header, Security
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- import jwt
- from config import SECRET_KEY
- security = HTTPBearer()
- async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
- if credentials:
- try:
- payload = await verify_jwt_token()
- return payload # 或者返回一个包含用户信息的自定义对象
- except Exception as e:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid token",
- headers={"WWW-Authenticate": "Bearer"},
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Unauthorized",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- async def get_token_from_header(authorization: str = Header(None)):
- if not authorization:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authenticated",
- )
- # 去掉 "Bearer " 前缀
- if not authorization.startswith("Bearer "):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid authentication scheme",
- )
- return authorization.replace("Bearer ", "")
- async def verify_jwt_token(token: str = Security(get_token_from_header)):
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- return payload.get("sub")
- except jwt.ExpiredSignatureError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Token is expired",
- )
- except jwt.InvalidTokenError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid token",
- )
-
- from db.user_oauth import UserOAuthRepository,UserOAuthToken
- from db.user import User,UserRepo
- def verify_user(open_id: str = Depends(verify_jwt_token)):
- res = UserRepo().select(User.open_id == open_id)
- user:User = res.first()
- if not user:
- return
- oauth:UserOAuthToken = user.oauth
- if (oauth.expires_at - datetime.datetime.now()).total_seconds() <= 0:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="open-douyin Token is expired",
- )
- return user
|