| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- from datetime import datetime
- from typing import Optional
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from db.engine import engine
- from sqlmodel import Field, SQLModel,create_engine,Session,select,func
- import psycopg2
- from config import DB_URL,logger
- # from db.common import engine
- from sqlalchemy import UniqueConstraint, Index
- from sqlalchemy.dialects.postgresql import insert
- from db.base import BaseRepository
- # 定义数据库模型,不推荐使用 __tablename__ = '"UserInfo"' 来定义包含大写的表名字,
- # 因为可能会导致与其他数据库系统不兼容,而且表查询的时候需要额外注意表格名的大小写
- class UserOAuthToken(SQLModel, table=True):
- id: Optional[int] = Field(primary_key=True)
- access_token:str
- expires_in: Optional[int] = None
- open_id:str = Field(index=True)
- refresh_expires_in: Optional[int] = None
- refresh_token:str
- scope: str
- update_time: datetime = Field(default_factory=datetime.now) # 添加时间戳字段
- __table_args__ = (UniqueConstraint('open_id'),)
- class UserInfo(SQLModel, table=True):
- id: Optional[int] = Field(primary_key=True)
- avatar: str
- avatar_larger: str
- client_key: str
- e_account_role: str = Field(default="")
- nickname: str
- # 外键约束有助于:级联操作、避免冗余、数据完整性
- open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True)
- union_id: str
- update_time: datetime = Field(default_factory=datetime.now)
- __table_args__ = (UniqueConstraint('open_id'),)
- class DouyinBaseRepository(BaseRepository):
- def __init__(self, model: SQLModel, engine=engine):
- super().__init__(model, engine)
- def add_or_update(self, dict_data: dict) -> SQLModel:
- clean_data = {k: v for k, v in dict_data.items() if hasattr(self.model, k)}
- obj_model = self.model(**clean_data)
- with Session(bind=self.engine) as session:
- exist_obj = self.check_exist(obj_model, self.unique_constraint_fields, ex_session=session)
- logger.debug(f"check table '{self.model.__tablename__}' where {self.unique_constraint_fields}")
- if exist_obj:
- self.set_update_time(exist_obj)
- dict_data = self.model_dump_by_field(obj_model, self.non_unique_fields)
- self.set_obj_by_dict(exist_obj,dict_data)
- logger.info(f"modify table '{self.model.__tablename__}' id '{exist_obj.open_id}' from {dict_data}")
- session.commit()
- return exist_obj
- else:
- self.create(obj_model)
- logger.debug(f"on table '{self.model.__tablename__}' create {obj_model}")
- return obj_model
- def get_by_open_id(self, open_id):
- with Session(self.engine) as session:
- logger.debug(f"get {open_id}")
- base_statement = select(self.model).where(self.model.open_id == open_id)
- results = session.exec(base_statement)
- return results.first()
- class UserInfoRepository(DouyinBaseRepository):
- def __init__(self, engine=engine):
- super().__init__(UserInfo, engine)
- self.model:UserInfo
-
- def add_or_update_by_unique(self, obj_in: dict) -> SQLModel:
- clean_data = {k: v for k, v in obj_in.items() if hasattr(self.model, k)}
- obj_model = self.model(**clean_data)
- with Session(bind=self.engine) as session:
- exist_obj = self.check_exist(obj_model, self.unique_constraint_fields, ex_session=session)
- if exist_obj:
- self.set_update_time(exist_obj)
- dict_data = self.model_dump_by_field(obj_model, self.non_unique_fields)
- self.set_obj_by_dict(exist_obj,dict_data)
- session.commit()
- return exist_obj
- else:
- self.create(obj_model)
- session.commit()
- return obj_model
-
-
- # Database manager class
- class UserOAuthRepository(DouyinBaseRepository):
- def __init__(self, engine=engine):
- super().__init__(UserOAuthToken, engine)
- self.model:UserOAuthToken
- # def add_token(self, data: dict):
- # clean_data = {k: v for k, v in data.items() if hasattr(self.model, k)}
- # obj_model = self.model(**clean_data)
- # with Session(bind=self.engine) as session:
- # exist_obj = self.check_exist(obj_model, self.unique_constraint_fields, ex_session=session)
- # if exist_obj:
- # self.set_update_time(exist_obj)
- # dict_data = self.model_dump_by_field(obj_model, self.unique_constraint_fields)
- # self.set_obj_by_dict(exist_obj,dict_data)
- # session.commit()
- # return exist_obj
- # else:
- # self.create(obj_model)
- # session.commit()
- # return obj_model
- async def delete_token(self, token_id: int):
- async with self.session_factory() as session:
- statement = select(UserOAuthToken).where(UserOAuthToken.id == token_id)
- token = await session.execute(statement).scalars().first()
- if token:
- await session.delete(token)
- await session.commit()
- print(f"Record deleted: ID - {token_id}")
- else:
- print(f"Record with ID {token_id} not found")
-
- def test_add(open_id=None):
- SQLModel.metadata.create_all(engine)
-
- user_oauth = {'access_token': 'act.3.m3kiZmfxxIH95i1bHZ7Bq3Wkv_Xm5TtD3kpLGjtCr3G96WIINBKEvzlsaObrGcH4GaxTQeLZA13jkzoZhpAwPwMRqFxlVuIcxpge_-BpdFib1xHqkcFa4B-LX4zpd2YK3kDFTFfMcJXN_fZ2eByg6oqqa1OieUWcvlaVgw==', 'captcha': '', 'desc_url': '', 'description': '', 'error_code': 0, 'expires_in': 1296000, 'log_id': '202402171353461C33F969CEFB511B216F', 'open_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'refresh_expires_in': 2592000, 'refresh_token': 'rft.e4b3da8bd3ef01d880d827b11e826391OEGHiRrTLcp5zsGYP1dh6F9Bo7fg', 'scope': 'user_info,trial.whitelist'}
- user_info = {
- "avatar": "https://p26.douyinpic.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=4010531038",
- "avatar_larger": "https://p3.douyinpic.com/aweme/1080x1080/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=4010531038",
- "captcha": "",
- "city": "",
- "client_key": "55",
- "country": "",
- "desc_url": "",
- "description": "",
- "district": "",
- "e_account_role": "",
- "error_code": 0,
- "gender": 0,
- "log_id": "202401261424326FE877A6CAB03910C553",
- "nickname": "程序员马工",
- "open_id": "_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy",
- "province": "",
- "union_id": "123-01ae-59bd-978a-1de8566186a8"
- }
- if open_id:
- user_oauth["open_id"] = open_id
- user_info["open_id"] = open_id
- user_info["nickname"] = "user" + open_id[:5]
- else:
- open_id = user_oauth["open_id"]
- db_manager = UserOAuthRepository()
- res = db_manager.add_or_update(user_oauth)
- # logger.debug(res)
- db_user_info = UserInfoRepository()
- res = db_user_info.add_or_update(user_info)
- logger.debug(db_manager.get_by_open_id(open_id))
- return user_oauth["open_id"]
- if __name__ == "__main__":
- test_add()
|