| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import asyncio
- import datetime
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- import jwt
- from fastapi import FastAPI,APIRouter, HTTPException, Depends, Request,Header
- from fastapi import Depends, FastAPI, HTTPException, status
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from fastapi.responses import JSONResponse
- from config import *
- from douyin.access_token import get_access_token
- from douyin.user_info import get_user_info
- from db.user_oauth import UserOAuthRepository,UserOAuthToken
- from db.user_info import UserInfoRepository,UserInfo
- from db.user import User,UserRepo
- from db.base import update_from_model
- from api.swl_jwt import verify_jwt_token,verify_user
- from sqlmodel import Session,select
- from db.engine import engine,create_all_table
- from pydantic import BaseModel
- login_router = APIRouter()
- # code=676a1101ea02bc5dTaUVtKg8c5enYaGqB4dT 只能被使用一次,用完失效
- # scopes=user_info,trial.whitelist 用户授权的范围
- class ScanCode(BaseModel):
- code: str
- scopes: str
- async def save_login_data(data:dict):
- oauth_model:UserOAuthToken = UserOAuthRepository().dict_to_model(data)
- user_info_data = await get_user_info(oauth_model.open_id, oauth_model.access_token)
- if not user_info_data.get("error_code"):
- info_model = UserInfoRepository().dict_to_model(user_info_data)
- else:
- info_model = UserInfo()
- logger.debug(f"get oauth: {data}")
- logger.debug(f"get info: {user_info_data}")
- UserRepo().add_or_update(oauth_model, info_model)
- # 计算过期时间戳(基于北京时间)
- # expires_in = data.get("refresh_expires_in", 1296000)
- # # expires_in = 15
- # expiration_time_local = datetime.datetime.now() + datetime.timedelta(seconds=expires_in)
- # exp = int(expiration_time_local.timestamp())
- # db_manager = UserOAuthRepository()
-
- # oauth_model:UserOAuthToken = db_manager.save_login_data(data)
-
- # data = await get_user_info(oauth_model.open_id, oauth_model.access_token)
- # if data.get("error_code") != 0:
- # raise HTTPException(status_code=400, detail=data)
- # db_user = UserInfoRepository()
- # user_info_model = db_user.dict_to_model(data)
- # db_user.add_or_update(user_info_model)
-
- # 生成并返回 token,包含过期时间
- expiration_time_local = datetime.datetime.now() + datetime.timedelta(days=90)
- exp = int(expiration_time_local.timestamp())
- payload = {
- "sub": data["open_id"],
- "exp": exp
- }
- account_token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
- logger.info(f"login success, expires_time:{datetime.datetime.fromtimestamp(exp).strftime('%Y-%m-%d %H:%M:%S') }, token:{account_token}")
- return {"token": account_token}
-
- # 登录端点
- @login_router.post("/login")
- async def login(data: ScanCode):
- logger.info(data)
- data = await get_access_token(data.code)
- if data.get("error_code") != 0:
- raise HTTPException(status_code=400, detail=data)
- return await save_login_data(data)
- @login_router.get("/user_info")
- async def user_info(user: User = Depends(verify_user)) -> UserInfo:
- user_info_data = await get_user_info(user.open_id, user.oauth.access_token)
- if not user_info_data.get("error_code"):
- info_model:UserInfo = UserInfoRepository().dict_to_model(user_info_data)
- # 如果用户修改了昵称,同步 open-douyin 的用户数据
- if info_model.nickname != user.info.nickname:
- with Session(engine) as session:
- update_from_model(user.info, info_model)
- session.add(user)
- session.commit()
- return user.info
- @login_router.get("/verify_callback")
- async def verify_callback(code:str, scopes:str):
- return await login(ScanCode(code=code, scopes=scopes))
-
- @login_router.get("/token")
- async def read_account(open_id: str = Depends(verify_jwt_token)):
- pass
- # 启动应用
- async def main():
- create_all_table()
- data = {
- "access_token": "1act.f7094fbffab2ecbfc45e9af9c32bc241oYdckvBKe82BPx8T******",
- "captcha": "",
- "desc_url": "",
- "description": "",
- "error_code": 0,
- "expires_in": 1296000,
- "log_id": "20230525105733ED3ED7AC56A******",
- "open_id": "b9b71865-7fea-44cc-123",
- "refresh_expires_in": 2592000,
- "refresh_token": "rft.713900b74edde9f30ec4e246b706da30t******",
- "scope": "user_info"
- }
- user_oauth = {'access_token': 'act.3.m3kiZmfxxIH95i1bHZ7Bq3Wkv_Xm5TtD3kpLGjtCr3G96WIINBKEvzlsaObrGcH4GaxTQeLZA13jkzoZhpAwPwMRqFxlVuIcxpge_-BpdFib1xHqkcFa4B-LX4zpd2YK3kDFTFfMcJXN_fZ2eByg6oqqa1OieUWcvlaVgw==', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '202402171353461C33F969CEFB511B216F', 'open_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.e4b3da8bd3ef01d880d827b11e826391OEGHiRrTLcp5zsGYP1dh6F9Bo7fg', 'scope': 'user_info,trial.whitelist'}
- res = await save_login_data(user_oauth)
- logger.info(f"{res}")
- # import jwt
- if __name__ == "__main__":
- asyncio.run(main())
|