import asyncio import datetime import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) import jwt from fastapi import FastAPI,APIRouter, HTTPException, Depends, Request,Header from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import JSONResponse from config import * from douyin.access_token import get_access_token from douyin.user_info import get_user_info from db.user_oauth import UserOAuthRepository,UserOAuthToken from db.user_info import UserInfoRepository,UserInfo from db.user import User,UserRepo from db.base import update_from_model from api.swl_jwt import verify_jwt_token,verify_user from sqlmodel import Session,select from db.engine import engine,create_all_table from pydantic import BaseModel login_router = APIRouter() # code=676a1101ea02bc5dTaUVtKg8c5enYaGqB4dT 只能被使用一次,用完失效 # scopes=user_info,trial.whitelist 用户授权的范围 class ScanCode(BaseModel): code: str scopes: str async def save_login_data(data:dict): oauth_model:UserOAuthToken = UserOAuthRepository().dict_to_model(data) user_info_data = await get_user_info(oauth_model.open_id, oauth_model.access_token) if not user_info_data.get("error_code"): info_model = UserInfoRepository().dict_to_model(user_info_data) else: info_model = UserInfo() logger.debug(f"get oauth: {data}") logger.debug(f"get info: {user_info_data}") UserRepo().add_or_update(oauth_model, info_model) # 计算过期时间戳(基于北京时间) # expires_in = data.get("refresh_expires_in", 1296000) # # expires_in = 15 # expiration_time_local = datetime.datetime.now() + datetime.timedelta(seconds=expires_in) # exp = int(expiration_time_local.timestamp()) # db_manager = UserOAuthRepository() # oauth_model:UserOAuthToken = db_manager.save_login_data(data) # data = await get_user_info(oauth_model.open_id, oauth_model.access_token) # if data.get("error_code") != 0: # raise HTTPException(status_code=400, detail=data) # db_user = UserInfoRepository() # user_info_model = db_user.dict_to_model(data) # db_user.add_or_update(user_info_model) # 生成并返回 token,包含过期时间 expiration_time_local = datetime.datetime.now() + datetime.timedelta(days=90) exp = int(expiration_time_local.timestamp()) payload = { "sub": data["open_id"], "exp": exp } account_token = jwt.encode(payload, SECRET_KEY, algorithm="HS256") logger.info(f"login success, expires_time:{datetime.datetime.fromtimestamp(exp).strftime('%Y-%m-%d %H:%M:%S') }, token:{account_token}") return {"token": account_token} # 登录端点 @login_router.post("/login") async def login(data: ScanCode): logger.info(data) data = await get_access_token(data.code) if data.get("error_code") != 0: raise HTTPException(status_code=400, detail=data) return await save_login_data(data) @login_router.get("/user_info") async def user_info(user: User = Depends(verify_user)) -> UserInfo: user_info_data = await get_user_info(user.open_id, user.oauth.access_token) if not user_info_data.get("error_code"): info_model:UserInfo = UserInfoRepository().dict_to_model(user_info_data) # 如果用户修改了昵称,同步 open-douyin 的用户数据 if info_model.nickname != user.info.nickname: with Session(engine) as session: update_from_model(user.info, info_model) session.add(user) session.commit() return user.info @login_router.get("/verify_callback") async def verify_callback(code:str, scopes:str): return await login(ScanCode(code=code, scopes=scopes)) @login_router.get("/token") async def read_account(open_id: str = Depends(verify_jwt_token)): pass # 启动应用 async def main(): create_all_table() data = { "access_token": "1act.f7094fbffab2ecbfc45e9af9c32bc241oYdckvBKe82BPx8T******", "captcha": "", "desc_url": "", "description": "", "error_code": 0, "expires_in": 1296000, "log_id": "20230525105733ED3ED7AC56A******", "open_id": "b9b71865-7fea-44cc-123", "refresh_expires_in": 2592000, "refresh_token": "rft.713900b74edde9f30ec4e246b706da30t******", "scope": "user_info" } user_oauth = {'access_token': 'act.3.m3kiZmfxxIH95i1bHZ7Bq3Wkv_Xm5TtD3kpLGjtCr3G96WIINBKEvzlsaObrGcH4GaxTQeLZA13jkzoZhpAwPwMRqFxlVuIcxpge_-BpdFib1xHqkcFa4B-LX4zpd2YK3kDFTFfMcJXN_fZ2eByg6oqqa1OieUWcvlaVgw==', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '202402171353461C33F969CEFB511B216F', 'open_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.e4b3da8bd3ef01d880d827b11e826391OEGHiRrTLcp5zsGYP1dh6F9Bo7fg', 'scope': 'user_info,trial.whitelist'} res = await save_login_data(user_oauth) logger.info(f"{res}") # import jwt if __name__ == "__main__": asyncio.run(main())