import datetime import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) import jwt from fastapi import FastAPI,APIRouter, HTTPException, Depends, Request,Header from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from fastapi.responses import JSONResponse from config import * from douyin.access_token import get_access_token from douyin.user_info import get_user_info from db.user import UserOAuthRepository,UserOAuthToken from api.jwt import verify_jwt_token login_router = APIRouter() class ScanCode(BaseModel): code: str scopes: str class User(BaseModel): nickname: str avatar: str # 登录端点 @login_router.post("/login") async def login(data: ScanCode): if PRODUCE_ENV: data = await get_access_token(data.code) else: # 测试环境使用。因为每次 get_access_token 的 code 只能使用一次就过期了,为了避免频繁扫码,直接模拟返回请求结果 data = {'access_token': 'act.3.UCzqnMwbL7uUTH0PkWbvDvIHcpy417HnfMqymbvBSpo9b1MJ3jOdwCxw-UPstOOjsGDWIdNwTGev4oEp8eUR-vHbU24XU5K4BkhPeOKJW1CLrEUS3XFxpG6SHqoQtvL6qhEgINcvt4V3KQX6C2qTeTkgQ-KwPO6jWi5uoin3YXo5DqwuGk3bbQ9dZoY=', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '2024012915260549B5ED1A675515CD573C', 'open_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.c29d64456ea3d5e4c932247ee93dd735aq5OhtcYNXNFAD70XHKrdntpE6U0', 'scope': 'user_info,trial.whitelist'} if data.get("error_code") != 0: return data db_manager = UserOAuthRepository() logger.debug(data) db_manager.add_token(data) # 计算过期时间戳(基于北京时间) expires_in = data.get("expires_in", 0) # 如果没有 expires_in 键,则默认过期时间为 0 expires_in = 15 expiration_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in) beijing_timezone_delta = datetime.timedelta(hours=8) # 北京时间是UTC+8 expiration_time_beijing = expiration_time_utc + beijing_timezone_delta exp = int(expiration_time_beijing.timestamp()) # 生成并返回 token,包含过期时间 payload = { "aud": data["open_id"], "exp": exp # 添加过期时间戳(北京时间)到 payload } account_token = jwt.encode(payload, JWT_SECRET_KEY, algorithm="HS256") logger.info(f"login success, expires_time:{datetime.datetime.fromtimestamp(exp).strftime('%Y-%m-%d %H:%M:%S') }, token:{account_token}") return {"token": account_token} # 受保护资源示例 @login_router.get("/account") async def read_account(user: dict = Depends(verify_jwt_token)): open_id = user.get("aud") UserOAuthRepository().display_all_records() logger.info(user.get("aud")) return {"message": "Account information", "open_id": user.get("aud")} # 在这里返回当前用户的信息 return {"nickname": current_user.username, "avatar": "https://p26.douyinpic.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=4010531038"} # 其他受保护的资源... # 启动应用 def main(): pass if __name__ == "__main__": main()