| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import socket
- from fastapi import FastAPI, Request
- from fastapi.responses import HTMLResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.templating import Jinja2Templates
- import requests
- import uvicorn
- from fastapi.responses import FileResponse
- from fastapi import FastAPI, Depends, HTTPException, Form
- import httpx
- import os
- # from db.user import UserOAuthToken
- from config import HOST, PORT
- app = FastAPI()
-
- @app.get("/")
- async def read_root(request: Request):
- return FileResponse(os.path.join("static", "index.html"))
- # 授权码获取 access_token 的路由
- @app.post("/get_access_token")
- async def get_access_token(request: Request):
- client_key = os.environ.get("CLIENT_KEY") # 从环境变量中获取 client_key
- client_secret = os.environ.get("CLIENT_SECRET") # 从环境变量中获取 client_secret
- try:
- code = request.query_params["code"] # 从查询参数中获取 code
- except KeyError:
- raise HTTPException(status_code=400, detail="Missing 'code' parameter")
-
- # 发送请求获取 access_token
- async with httpx.AsyncClient() as client:
- response = await client.post(
- "https://open.douyin.com/oauth/access_token/",
- headers={"Content-Type": "application/json"},
- json={
- "grant_type": "authorization_code",
- "client_key": client_key,
- "client_secret": client_secret,
- "code": code,
- },
- )
-
- if response.status_code != 200:
- raise HTTPException(status_code=response.status_code, detail=response.text)
- '''
- {
- "data": {
- "access_token": "act.f7094fbffab2ecbfc45e9af9c32bc241oYdckvBKe82BPx8T******",
- "captcha": "",
- "desc_url": "",
- "description": "",
- "error_code": 0,
- "expires_in": 1296000,
- "log_id": "20230525105733ED3ED7AC56A******",
- "open_id": "b9b71865-7fea-44cc-******",
- "refresh_expires_in": 2592000,
- "refresh_token": "rft.713900b74edde9f30ec4e246b706da30t******",
- "scope": "user_info"
- },
- "message": "success"
- }
- '''
- data = response.json()["data"]
-
- print(data)
- # magong_user = UserOAuthToken(access_token=data['access_token'], open_id=data['open_id'])
- return {"message": "Access token stored successfully"}
- @app.get("/verify_callback")
- async def verify_callback(request: Request):
- # 打印请求方法
- print(f"Method: {request.method}")
- # open-douyin.magong.site/verify_callback?code=676a1101ea02bc5dTaUVtKg8c5enYaGqB4dT&state=&scopes=user_info,trial.whitelist
- print(request.url)
- # 打印请求头
- print(f"Headers:")
- for key, value in request.headers.items():
- print(f"{key}: {value}")
- # 打印查询参数
- print(f"Query Parameters:")
- for key, value in request.query_params.items():
- print(f"{key}: {value}")
- return HTMLResponse("<h1>Callback Received! Verification Successful.</h1>")
- def main():
- print(f"https://open-douyin-cf.magong.site 公网代理地址,cloudflare dns proxy ,由 caddy 转发到 8600 端口")
- print(f"https://open-douyin-wk.magong.site 公网代理地址,cloudflare workers 转发到 8600 端口")
- print(f"http://sv-v2.magong.site:{PORT} ⭐ 推荐,仅支持 ipv6 ,直连、满速、无延迟。缺点是不支持 https 协议,因为不经过 Caddy 代理,直达 Fastapi 没有配置 https")
- print(f"https://open-douyin.magong.site 内网穿透隧道,cloudflare tunnel ,经常访问不了")
- uvicorn.run(app, host=None, port=PORT, log_level="info")
- if __name__ == "__main__":
- main()
|