login.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import datetime
  2. import os
  3. import sys
  4. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  5. import jwt
  6. from fastapi import FastAPI,APIRouter, HTTPException, Depends, Request,Header
  7. from fastapi import Depends, FastAPI, HTTPException, status
  8. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  9. from pydantic import BaseModel
  10. from fastapi.responses import JSONResponse
  11. from config import *
  12. from douyin.access_token import get_access_token
  13. from douyin.user_info import get_user_info
  14. from db.user import UserOAuthRepository,UserOAuthToken
  15. from api.jwt import verify_jwt_token
  16. login_router = APIRouter()
  17. class ScanCode(BaseModel):
  18. code: str
  19. scopes: str
  20. class User(BaseModel):
  21. nickname: str
  22. avatar: str
  23. # 登录端点
  24. @login_router.post("/login")
  25. async def login(data: ScanCode):
  26. if PRODUCE_ENV:
  27. data = await get_access_token(data.code)
  28. else:
  29. # 测试环境使用。因为每次 get_access_token 的 code 只能使用一次就过期了,为了避免频繁扫码,直接模拟返回请求结果
  30. data = {'access_token': 'act.3.UCzqnMwbL7uUTH0PkWbvDvIHcpy417HnfMqymbvBSpo9b1MJ3jOdwCxw-UPstOOjsGDWIdNwTGev4oEp8eUR-vHbU24XU5K4BkhPeOKJW1CLrEUS3XFxpG6SHqoQtvL6qhEgINcvt4V3KQX6C2qTeTkgQ-KwPO6jWi5uoin3YXo5DqwuGk3bbQ9dZoY=', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '2024012915260549B5ED1A675515CD573C', 'open_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.c29d64456ea3d5e4c932247ee93dd735aq5OhtcYNXNFAD70XHKrdntpE6U0', 'scope': 'user_info,trial.whitelist'}
  31. if data.get("error_code") != 0:
  32. return data
  33. db_manager = UserOAuthRepository()
  34. logger.debug(data)
  35. db_manager.add_token(data)
  36. # 计算过期时间戳(基于北京时间)
  37. expires_in = data.get("expires_in", 0) # 如果没有 expires_in 键,则默认过期时间为 0
  38. expires_in = 15
  39. expiration_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
  40. beijing_timezone_delta = datetime.timedelta(hours=8) # 北京时间是UTC+8
  41. expiration_time_beijing = expiration_time_utc + beijing_timezone_delta
  42. exp = int(expiration_time_beijing.timestamp())
  43. # 生成并返回 token,包含过期时间
  44. payload = {
  45. "aud": data["open_id"],
  46. "exp": exp # 添加过期时间戳(北京时间)到 payload
  47. }
  48. account_token = jwt.encode(payload, JWT_SECRET_KEY, algorithm="HS256")
  49. logger.info(f"login success, expires_time:{datetime.datetime.fromtimestamp(exp).strftime('%Y-%m-%d %H:%M:%S') }, token:{account_token}")
  50. return {"token": account_token}
  51. # 受保护资源示例
  52. @login_router.get("/account")
  53. async def read_account(user: dict = Depends(verify_jwt_token)):
  54. open_id = user.get("aud")
  55. UserOAuthRepository().display_all_records()
  56. logger.info(user.get("aud"))
  57. return {"message": "Account information", "open_id": user.get("aud")}
  58. # 在这里返回当前用户的信息
  59. return {"nickname": current_user.username, "avatar": "https://p26.douyinpic.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=4010531038"}
  60. # 其他受保护的资源...
  61. # 启动应用
  62. def main():
  63. pass
  64. if __name__ == "__main__":
  65. main()