| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import asyncio
- import os
- import random
- import re
- import time
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from DrissionPage import ChromiumPage
- from DrissionPage import ChromiumOptions
- from conf.config import logger,PAGE_OUTPUT,INI_PATH,chrome_options,db,USER_DATA,OUTPUT
- from DrissionPage import ChromiumOptions
- from DrissionPage.common import Settings
- from faker import Faker
- import DrissionPage
- from fake_useragent import UserAgent
- from dataset import Table
- table:Table = db['cookie']
- def get_browser_fake_info():
- # https://github.com/joke2k/faker/wiki
- fake = Faker()
- while True:
- user_agent = fake.user_agent()
- mac_platform_token = fake.mac_platform_token()
- if "Windows" in user_agent:
- break
- return user_agent
- page = None
- # https://dataset.readthedocs.io/en/latest/api.html
- def create_ua_header():
- ua = UserAgent(os=["windows", "macos"])
- res = ua.getRandom
- logger.info(f"{ua}")
- version = res['version']
- vnum = int(version)
- header = {
- "Sec-Ch-Ua": f'";Not A Brand";v="99", "Chromium";v="{vnum}"',
- "Sec-Ch-Ua-Platform": "Windows" if 'win' in res['os'] else "macOS",
- "User-Agent": res['useragent'],
- }
- # logger.info(f"{res}")
- # logger.info(f"{header}")
- return header
- def gen_douyin_cookies():
- # header = {'Sec-Ch-Ua': '";Not A Brand";v="99", "Chromium";v="122"', 'Sec-Ch-Ua-Platform': 'Windows', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Config/92.2.2788.20'}
- header = create_ua_header()
- logger.info(f"header {header}")
- chrome_options.set_user_data_path(USER_DATA + '1')
- page = ChromiumPage(chrome_options)
- try:
- page.set.cookies.clear()
- except Exception as e:
- logger.exception(f"{e}")
- page.quit()
- return
- page.set.load_mode.normal()
- page.set.headers(headers=header)
- logger.info(f"address {chrome_options.address}")
- logger.info(f"user_data_path {chrome_options.user_data_path}")
- logger.info(f"start '{page._chromium_options._browser_path}'")
- logger.info(f"process_id {page.process_id}")
- # page.get("edge://version/")
- url="https://www.douyin.com/"
- # tab = page.new_tab()
- tab = page
- retry = 3
- while retry:
- try:
- logger.info(f"start listen")
- tab.listen.start("www.douyin.com", method="GET")
- tab.get(url)
- i = 0
- for packet in tab.listen.steps(timeout=5):
- if not packet.url.startswith('https'):
- continue
- # logger.info(f"{packet.url} {packet.request.headers}")
- cookie = packet.request.extra_info.headers["cookie"]
- logger.debug(f"cookie {cookie}")
- # page.quit()
- if len(cookie) > 1600:
- ret = {
- "cookies": cookie,
- "user_agent": header['User-Agent'],
- "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
- "create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
- }
- logger.debug(f"ret {ret}")
- page.quit()
- return ret
- i += 1
- if i == 5:
- break
- # res = tab.listen.wait(1, timeout=5, raise_err=True)
- # logger.debug(f"res {res.request.headers}")
- # logger.debug(f"res.request.extra_info.headers {res.request.extra_info.headers}")
- except DrissionPage.errors.WaitTimeoutError as e:
- logger.error(f"{e}")
- except Exception as e:
- logger.exception(e)
- tab.listen.clear()
- retry -= 1
- logger.info(f"retry {retry}")
- page.quit()
- def save_to_db(data):
- table = db['cookie']
- table.insert(data)
- def try_gen_douyin_cookies():
- while True:
- try:
- ret = gen_douyin_cookies()
- # logger.info(f"ret {ret}")
- save_to_db(ret)
- break
- # Ctrl+c cancel_error
- except KeyboardInterrupt:
- logger.info("KeyboardInterrupt")
- page.quit()
- sys.exit(0)
- except DrissionPage.errors.WaitTimeoutError as e:
- logger.error(f"{e}")
- except Exception as e:
- logger.exception(e)
- def get_from_table():
- count = table.count()
- # random choise
- if count > 0:
- index = random.randint(0, count-1)
- row = table.find_one(id=index)
- # logger.info(f"{row['id']}")
- # logger.info(f"row {row}")
- return row
- else:
- logger.info("no cookie")
- return None
- def main():
-
- # ua = create_ua_header()
- # gen_douyin_cookies()
- # try_gen_douyin_cookies()
- get_from_table()
- # table.create_column('key')
- # table.delete(id=6)
- if __name__ == "__main__":
- main()
|