import asyncio import os import random import re import time import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions from conf.config import logger,PAGE_OUTPUT,INI_PATH,chrome_options,db,USER_DATA,OUTPUT from DrissionPage import ChromiumOptions from DrissionPage.common import Settings from faker import Faker import DrissionPage from fake_useragent import UserAgent from dataset import Table from collections import OrderedDict table:Table = db['cookie'] def get_browser_fake_info(): # https://github.com/joke2k/faker/wiki fake = Faker() while True: user_agent = fake.user_agent() mac_platform_token = fake.mac_platform_token() if "Windows" in user_agent: break return user_agent page = None # https://dataset.readthedocs.io/en/latest/api.html def create_ua_header(): ua = UserAgent(os=["windows", "macos"]) res = ua.getRandom logger.info(f"{ua}") version = res['version'] vnum = int(version) header = { "Sec-Ch-Ua": f'";Not A Brand";v="99", "Chromium";v="{vnum}"', "Sec-Ch-Ua-Platform": "Windows" if 'win' in res['os'] else "macOS", "User-Agent": res['useragent'], } # logger.info(f"{res}") # logger.info(f"{header}") return header def gen_douyin_cookies(): # header = {'Sec-Ch-Ua': '";Not A Brand";v="99", "Chromium";v="122"', 'Sec-Ch-Ua-Platform': 'Windows', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Config/92.2.2788.20'} header = create_ua_header() logger.info(f"header {header}") chrome_options.set_user_data_path(USER_DATA + '1') page = ChromiumPage(chrome_options) try: page.set.cookies.clear() except Exception as e: logger.exception(f"{e}") page.quit() return page.set.load_mode.normal() page.set.headers(headers=header) logger.info(f"address {chrome_options.address}") logger.info(f"user_data_path {chrome_options.user_data_path}") logger.info(f"start '{page._chromium_options._browser_path}'") logger.info(f"process_id {page.process_id}") # page.get("edge://version/") url="https://www.douyin.com/" # tab = page.new_tab() tab = page retry = 3 while retry: try: logger.info(f"start listen") tab.listen.start("www.douyin.com", method="GET") tab.get(url) i = 0 for packet in tab.listen.steps(timeout=5): if not packet.url.startswith('https'): continue # logger.info(f"{packet.url} {packet.request.headers}") cookie = packet.request.extra_info.headers["cookie"] logger.debug(f"cookie {cookie}") # page.quit() if len(cookie) > 1600: ret = { "cookies": cookie, "user_agent": header['User-Agent'], "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), } logger.debug(f"ret {ret}") page.quit() return ret i += 1 if i == 5: break # res = tab.listen.wait(1, timeout=5, raise_err=True) # logger.debug(f"res {res.request.headers}") # logger.debug(f"res.request.extra_info.headers {res.request.extra_info.headers}") except DrissionPage.errors.WaitTimeoutError as e: logger.error(f"{e}") except Exception as e: logger.exception(e) tab.listen.clear() retry -= 1 logger.info(f"retry {retry}") page.quit() def save_to_db(data): table = db['cookie'] table.insert(data) def try_gen_douyin_cookies(): while True: try: ret = gen_douyin_cookies() # logger.info(f"ret {ret}") save_to_db(ret) break # Ctrl+c cancel_error except KeyboardInterrupt: logger.info("KeyboardInterrupt") page.quit() sys.exit(0) except DrissionPage.errors.WaitTimeoutError as e: logger.error(f"{e}") except Exception as e: logger.exception(e) def get_from_table(id=None): count = table.count() # random choise if count > 0: if id: row = table.find_one(id=id) else: index = random.randint(0, count-1) row = table.find_one(id=index) # logger.info(f"{row['id']}") # logger.info(f"row {row}") return row else: logger.info("no cookie") return None def delete_old_and_create_new(row:OrderedDict): res = try_gen_douyin_cookies() if res: res['id'] = row['id'] id = table.upsert(res, ['id']) logger.info(f"upsert {id}") return table.find_one(id=row['id']) def main(): # delete_old_and_create_new() # ua = create_ua_header() # gen_douyin_cookies() try_gen_douyin_cookies() # get_from_table() # table.create_column('key') # table.delete(id=6) if __name__ == "__main__": main()