| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- import httpx
- from config import logger
- from typing import Optional
- from pydantic import BaseModel
- class DouyinAccessTokenResponse(BaseModel):
- error_code: int
- data: dict
- message: str
- async def check_access_token_response(response_json: dict):
- model_data = DouyinAccessTokenResponse(**response_json)
- if model_data.error_code != 0:
- raise Exception(f"获取 access token 失败,错误码:{model_data.error_code}")
- return model_data.data
- async def get_access_token(code):
- client_key = os.environ.get("CLIENT_KEY") # 从环境变量中获取 client_key
- client_secret = os.environ.get("CLIENT_SECRET") # 从环境变量中获取 client_secret
- async with httpx.AsyncClient() as client:
- response = await client.post(
- "https://open.douyin.com/oauth/access_token/",
- headers={"Content-Type": "application/json"},
- json={
- "grant_type": "authorization_code",
- "client_key": client_key,
- "client_secret": client_secret,
- "code": code,
- },
- )
-
- ''' response success:
- {
- "data": {
- "access_token": "act.f7094fbffab2ecbfc45e9af9c32bc241oYdckvBKe82BPx8T******",
- "captcha": "",
- "desc_url": "",
- "description": "",
- "error_code": 0,
- "expires_in": 1296000,
- "log_id": "20230525105733ED3ED7AC56A******",
- "open_id": "b9b71865-7fea-44cc-******",
- "refresh_expires_in": 2592000,
- "refresh_token": "rft.713900b74edde9f30ec4e246b706da30t******",
- "scope": "user_info"
- },
- "message": "success"
- }
-
- response error:
- {
- "data": {
- "description": "Parameter error",
- "error_code": 2100005
- },
- "extra": {
- "logid": "2020070614111601022506808001045D59",
- "now": 1594015876138
- }
- }
- '''
- logger.debug(response.json())
- return response.json().get("data")
- # 单元测试
- def main():
- # 访问: https://swl-8l9.pages.dev/ 点击立即体验
- # 浏览器打开调试模式F12,扫码登录
- # 在浏览器调试窗口 - 网络 - 名称 - 第一条 /verify?code=936c3671e073703c25Ze7zlgwxcOjvsYJ6Iz&state=&scope - 获得 code
- import asyncio
- res = asyncio.run(get_access_token("936c3671e073703c25Ze7zlgwxcOjvsYJ6Iz"))
- print("res:",res)
- '''
- {'access_token': 'act.3.wl8L3DFQ3sj3uKYzQShOSs8HbOgKh0FVvjxKeaTum0ZOEXoyBI8D1N7gTBqGbrY32KP-Pm41EAvcobSheOBi8tvRdhj7m5-5ZVoprZZu_GN5J2KnH2fZ_X9_l7Q6iFyvpPoMkX3Zyom3PCkeRZp4Jg9sE2ZiwuvZVdnvft0A25uBWXvj2IEbWW_0Bf8=', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '20240129123749239735B0529965BC6D93', 'open_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.c29d64456ea3d5e4c932247ee93dd735aq5OhtcYNXNFAD70XHKrdntpE6U0', 'scope': 'user_info,trial.whitelist'}
- '''
- if __name__ == "__main__":
- main()
|