| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- from fastapi import Depends, HTTPException, status, Header, Security
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- import jwt
- from config import SECRET_KEY
- security = HTTPBearer()
- async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
- if credentials:
- try:
- payload = await verify_jwt_token()
- return payload # 或者返回一个包含用户信息的自定义对象
- except Exception as e:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid token",
- headers={"WWW-Authenticate": "Bearer"},
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Unauthorized",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- async def get_token_from_header(authorization: str = Header(None)):
- if not authorization:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authenticated",
- )
- # 去掉 "Bearer " 前缀
- if not authorization.startswith("Bearer "):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid authentication scheme",
- )
- return authorization.replace("Bearer ", "")
- async def verify_jwt_token(token: str = Security(get_token_from_header)):
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- return payload.get("sub")
- except jwt.ExpiredSignatureError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Token is expired",
- )
- except jwt.InvalidTokenError:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid token",
- )
-
- from db.user import UserOAuthRepository,UserOAuthToken
- def get_uer_oauth_and_verify(open_id: str = Depends(verify_jwt_token)):
- db_oauth:UserOAuthToken = UserOAuthRepository().get_by_open_id(open_id)
- # 没有用户凭证,需要扫码登陆
- if not db_oauth:
- raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="need login")
- return db_oauth
|