from fastapi import Depends, HTTPException, status, Header, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt from config import SECRET_KEY security = HTTPBearer() async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): if credentials: try: payload = await verify_jwt_token() return payload # 或者返回一个包含用户信息的自定义对象 except Exception as e: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"}, ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized", headers={"WWW-Authenticate": "Bearer"}, ) async def get_token_from_header(authorization: str = Header(None)): if not authorization: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authenticated", ) # 去掉 "Bearer " 前缀 if not authorization.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid authentication scheme", ) return authorization.replace("Bearer ", "") async def verify_jwt_token(token: str = Security(get_token_from_header)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload.get("sub") except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Token is expired", ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token", ) from db.user import UserOAuthRepository,UserOAuthToken def get_uer_oauth_and_verify(open_id: str = Depends(verify_jwt_token)): db_oauth:UserOAuthToken = UserOAuthRepository().get_by_open_id(open_id) # 没有用户凭证,需要扫码登陆 if not db_oauth: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="need login") return db_oauth