import datetime from fastapi import Depends, HTTPException, status, Header, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt from config import SECRET_KEY security = HTTPBearer() async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): if credentials: try: payload = await verify_jwt_token() return payload # 或者返回一个包含用户信息的自定义对象 except Exception as e: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"}, ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized", headers={"WWW-Authenticate": "Bearer"}, ) async def get_token_from_header(authorization: str = Header(None)): if not authorization: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authenticated", ) # 去掉 "Bearer " 前缀 if not authorization.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid authentication scheme", ) return authorization.replace("Bearer ", "") async def verify_jwt_token(token: str = Security(get_token_from_header)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload.get("sub") except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Token is expired", ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token", ) from db.user_oauth import UserOAuthRepository,UserOAuthToken from db.user import User,UserRepo def verify_user(open_id: str = Depends(verify_jwt_token)): res = UserRepo().select(User.open_id == open_id) user:User = res.first() if not user: return oauth:UserOAuthToken = user.oauth if (oauth.expires_at - datetime.datetime.now()).total_seconds() <= 0: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="open-douyin Token is expired", ) return user