Pārlūkot izejas kodu

完成 dp 最后一页的检查

mrh 10 mēneši atpakaļ
vecāks
revīzija
0a0b65e876

+ 14 - 15
database/excel_import.py

@@ -9,7 +9,7 @@ from database.sqlite_engine import engine
 from typing import Optional
 from database.sqlite_engine import create_db_and_tables, drop_table
 
-class Keyword(SQLModel, table=True):
+class KeywordModel(SQLModel, table=True):
     id: int = Field(default=None, primary_key=True)
     key_word: str = Field(unique=True)
     total_pages: Optional[int] = Field(default=None)
@@ -38,7 +38,7 @@ class ExcelDatabaseManager:
 
     def insert_or_update_record(self, record):
         with Session(self.engine) as session:
-            statement = select(Keyword).where(Keyword.key_word == record["key_word"])
+            statement = select(KeywordModel).where(KeywordModel.key_word == record["key_word"])
             existing = session.exec(statement).first()
             
             if existing:
@@ -47,7 +47,7 @@ class ExcelDatabaseManager:
                 existing.done = record.get("done", existing.done)
             else:
                 # 插入新记录
-                new_record = Keyword(**record)
+                new_record = KeywordModel(**record)
                 session.add(new_record)
             session.commit()
 
@@ -61,17 +61,16 @@ class ExcelDatabaseManager:
     def mark_keyword_done(self, keyword: str):
         """标记关键词为已完成"""
         with Session(self.engine) as session:
-            statement = select(Keyword).where(Keyword.key_word == keyword)
+            statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
             keyword_record = session.exec(statement).first()
             if keyword_record:
                 keyword_record.done = True
-                keyword_record.last_updated = datetime.now()
                 session.commit()
 
     def update_keyword_progress(self, keyword: str, current_page: int):
         """更新关键词的当前进度"""
         with Session(self.engine) as session:
-            statement = select(Keyword).where(Keyword.key_word == keyword)
+            statement = select(KeywordModel).where(KeywordModel.key_word == keyword)
             keyword_record = session.exec(statement).first()
             if keyword_record:
                 keyword_record.current_page = current_page
@@ -81,29 +80,29 @@ class ExcelDatabaseManager:
     def get_next_keyword(self):
         """获取下一个未完成的关键词"""
         with Session(self.engine) as session:
-            statement = select(Keyword).where(Keyword.done == False)
-            keyword = session.exec(statement.order_by(Keyword.last_updated)).first()
+            statement = select(KeywordModel).where(KeywordModel.done == False)
+            keyword = session.exec(statement.order_by(KeywordModel.last_updated)).first()
             return keyword
 
-    def get_all_keywords(self) -> list[Keyword]:
+    def get_all_keywords(self) -> list[KeywordModel]:
         """获取所有关键词"""
         with Session(self.engine) as session:
-            statement = select(Keyword)
+            statement = select(KeywordModel)
             return session.exec(statement).all()
 
-    def get_keywords_by_status(self, done: bool) -> list[Keyword]:
+    def get_keywords_by_status(self, done: bool=False) -> list[KeywordModel]:
         """根据完成状态获取关键词"""
         with Session(self.engine) as session:
-            statement = select(Keyword).where(Keyword.done == done)
+            statement = select(KeywordModel).where(KeywordModel.done == done)
             return session.exec(statement).all()
 
-    def get_keywords_by_page(self, limit: int = 50, offset: int = 0) -> list[Keyword]:
+    def get_keywords_by_page(self, limit: int = 50, offset: int = 0) -> list[KeywordModel]:
         """分页获取关键词"""
         with Session(self.engine) as session:
-            statement = select(Keyword).limit(limit).offset(offset)
+            statement = select(KeywordModel).limit(limit).offset(offset)
             return session.exec(statement).all()
 
-    def get_keywords(self) -> list[Keyword]:
+    def get_keywords(self) -> list[KeywordModel]:
         """获取所有关键词(兼容旧接口)"""
         return self.get_all_keywords()
 

+ 0 - 1
mylib/base.py

@@ -4,7 +4,6 @@ from crawl4ai import *
 from pathlib import Path
 import json
 import pickle
-OUTPUT_DIR = Path("output").absolute()
 from crawl4ai.async_configs import BrowserConfig
 
 # Using proxy URL

+ 41 - 0
mylib/crawl_lib_func.py

@@ -0,0 +1,41 @@
+
+import re
+
+
+def filter_links(links):
+    '''
+    input: {
+        'internal': [{}],
+        'external': [
+            {
+                "href": "xx",
+                "text": "xxm",
+                "title": "",
+                "base_domain": "benlcollins.com"
+            }
+        ],
+    }
+    '''
+    external_links = links["external"]
+    filtered_links = [link for link in external_links if "google" not in link["base_domain"]]
+    return filtered_links
+
+def is_valid_domain(domain):
+    # 正则表达式匹配域名格式
+    pattern = r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$'
+    return re.match(pattern, domain) is not None
+
+def filter_local_domain(links):
+    '''
+    input: [{
+            "href": "xx",
+            "text": "xxm",
+            "title": "",
+            "base_domain": "benlcollins.com"
+    }]
+    '''
+    filtered_links = []
+    for link in links:
+        if 'base_domain' in link and is_valid_domain(link['base_domain']):
+            filtered_links.append(link)
+    return filtered_links

+ 45 - 14
mylib/search_manager.py

@@ -4,19 +4,23 @@ from DrissionPage import ChromiumPage
 from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
 from mylib.base import (
     save_to_file,
-    OUTPUT_DIR,
     ensure_output_dir,
     replace_space,
     browser_config
 )
 from database.search_model import SearchDatabaseManager,SearchResult
+from database.excel_import import ExcelDatabaseManager,KeywordModel
 from database.sqlite_engine import create_db_and_tables, drop_table
+from mylib.drission_page import load_chrome_from_ini
 from lxml import html
+from mylib.settings import GOOGLE_SEARCH_DIR
+page = load_chrome_from_ini()
 
 class SearchManager:
     def __init__(self, page: ChromiumPage):
         self.page = page
         self.search_db_manager = SearchDatabaseManager()
+        self.excel_db_manager = ExcelDatabaseManager()
         
     def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult:
         """搜索关键词并返回结果链接和保存的HTML文件路径
@@ -44,11 +48,34 @@ class SearchManager:
         html_path = self.save_page(keyword, start)
         
         # 检查是否是最后一页
-        is_last_page = not self.go_to_next_page()
+        is_last_page = self.check_last_page()
         
         # 保存到数据库
         return self.save_search_result(keyword, start, url, html_path, is_last_page)
+    def check_last_page(self):
+        tree = html.fromstring(self.page.html)
         
+        # 检查 id="search" 是否存在
+        search_element = tree.xpath('//*[@id="search"]')
+        
+        # 检查 id="pnnext" 是否存在
+        pnnext_element = tree.xpath('//*[@id="pnnext"]')
+        
+        # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页
+        if pnnext_element:
+            return False  
+        elif search_element and not pnnext_element:
+            return True  # 是最后一页
+        else:
+            raise ValueError("网页错误,无法确定是否是最后一页。")
+    def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True):
+        keyword= keyword_model.key_word
+        for current_start in range(start, pages_num, 10):
+            search_result:SearchResult = self.search_keyword(keyword, current_start, cache)
+            if search_result.is_last_page:
+                print(f"Reached last page for {keyword} at start={current_start}")
+                self.excel_db_manager.mark_keyword_done(keyword)    
+                break
     def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult:
         """保存搜索结果到数据库
         
@@ -91,7 +118,7 @@ class SearchManager:
         
     def save_page(self, keyword: str, start: int) -> Path:
         """保存当前页面"""
-        save_dir = OUTPUT_DIR / keyword
+        save_dir = GOOGLE_SEARCH_DIR / keyword
         ensure_output_dir(save_dir)
         
         save_path = save_dir / f"{start}.html"
@@ -147,20 +174,24 @@ class SearchManager:
                 links.append(href)
         return links
 
+def test_one():
+    global page
+    # 初始化浏览器
+    self = SearchManager(page)
+    key_model_list = self.excel_db_manager.get_keywords_by_status()
+    key_model = key_model_list.pop(0)
+    print(key_model)
+    self.walk_search_one_keywords(key_model)
+
 async def main():
     create_db_and_tables()
-    from mylib.drission_page import load_chrome_from_ini
-    
-    # 初始化浏览器
-    page = load_chrome_from_ini()
-    manager = SearchManager(page)
-    
-    # 示例用法
-    keyword = "Acalypha matsudae essential oil"
-    
+    # test_one()
+    # global page
+    # self = SearchManager(page)
+    # self.search_db_manager.get_search_results("python", 0)    
     # 搜索关键词
-    res = manager.search_keyword(keyword, cache=True)
-    print(f"Found results: {res.model_dump_json(indent=4)} ")
+    # res = manager.search_keyword(keyword_model.key_word, cache=True)
+    # print(f"Found results: {res.model_dump_json(indent=4)} ")
 
 if __name__ == "__main__":
     asyncio.run(main())

+ 2 - 1
mylib/settings.py

@@ -1,4 +1,5 @@
 from pathlib import Path
 WORK_DIR = Path(__file__).parent.parent.absolute()
 OUTPUT_DIR = WORK_DIR / "output"
-CONFIG_DIR = WORK_DIR / "mylib" / "conf"
+CONFIG_DIR = WORK_DIR / "mylib" / "conf"
+GOOGLE_SEARCH_DIR = OUTPUT_DIR / "google_search"

+ 63 - 45
search_keyward.py

@@ -1,44 +1,38 @@
 import asyncio
+import re
 from crawl4ai import *
 from pathlib import Path
 import json
 from lxml import html  # 使用 lxml.html 模块
 from sqlmodel import Session, select
 from mylib.base import (replace_space, save_to_file, save_all_result,
-                        OUTPUT_DIR,save_to_pickle,ensure_output_dir,
+                        save_to_pickle,ensure_output_dir,
                         save_base64_to_file,browser_config)
 from mylib.drission_page import load_chrome_from_ini
+from database.excel_import import ExcelDatabaseManager,KeywordModel
+from mylib.settings import GOOGLE_SEARCH_DIR
+from mylib.crawl_lib_func import filter_links,filter_local_domain
 
 page = load_chrome_from_ini()
 
 async def google_search(url:str, config=None)->CrawlResult:
+    run_config = CrawlerRunConfig(
+    magic=True,
+    simulate_user=True,
+    override_navigator=True
+    )
     async with AsyncWebCrawler(config=browser_config) as crawler:
         result = await crawler.arun(
             url=url,
-            cache_mode=CacheMode.ENABLED,
+            cache_mode=CacheMode.DISABLED,
             user_agent='random',
+            config=run_config,
 
         )
-        # save_to_pickle(result, OUTPUT_DIR / f"{search_key}.pickle")
+        # save_to_pickle(result, GOOGLE_SEARCH_DIR / f"{search_key}.pickle")
         return result
 
-def filter_links(links):
-    '''
-    input: {
-        'internal': [{}],
-        'external': [
-            {
-                "href": "xx",
-                "text": "xxm",
-                "title": "",
-                "base_domain": "benlcollins.com"
-            }
-        ],
-    }
-    '''
-    external_links = links["external"]
-    filtered_links = [link for link in external_links if "google" not in link["base_domain"]]
-    return filtered_links
+    
 def is_search_result_empty(html_content: str) -> bool:
     '''
     检查页面是否存在 id="search" 的元素
@@ -49,19 +43,16 @@ def is_search_result_empty(html_content: str) -> bool:
     search_elements = tree.xpath('//*[@id="search"]/*')
     return len(search_elements) == 0
 
-def is_search_result_links_empty(result: CrawlResult) -> bool:
-    print
-
 def is_already_processed(keyword: str) -> bool:
     """检查关键词是否已处理"""
-    save_dir = OUTPUT_DIR / replace_space(keyword) / 'pkl'
+    save_dir = GOOGLE_SEARCH_DIR / replace_space(keyword) / 'pkl'
     return save_dir.exists() and any(save_dir.glob("*.pickle"))
 
 async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip_exist=True):
     global page
     """处理单个关键词"""
-    keyword = replace_space(keyword)
-    save_dir = OUTPUT_DIR / keyword 
+    # keyword = replace_space(keyword)
+    save_dir = GOOGLE_SEARCH_DIR / keyword 
     ensure_output_dir(save_dir)
     
     # # 如果已经处理过,直接返回保存目录
@@ -71,24 +62,24 @@ async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip
         
     # 未处理过,执行搜索
     for i in range(start, pages_num, 10):
-        save_html_path = OUTPUT_DIR / keyword / f"{i}.html"
+        save_html_path = GOOGLE_SEARCH_DIR / keyword / f"{i}.html"
         url = f"https://www.google.com/search?q={keyword}&start={i}"
         print(f"search url: {url}")
         
         # 如果缓存文件存在,直接读取
-        if skip_exist and save_html_path.exists():
-            print(f"跳过缓存文件 {save_html_path}")
-            continue
-            print(f"读取缓存文件 {save_html_path}")
-        else:
-            page.get(url)
-            save_to_file(page.html,save_html_path)
+        # if skip_exist and save_html_path.exists():
+        #     print(f"跳过缓存文件 {save_html_path}")
+        #     continue
+        #     print(f"读取缓存文件 {save_html_path}")
+        # else:
+        #     page.get(url)
+        #     save_to_file(page.html,save_html_path)
 
-            # result: CrawlResult = await google_search(url)
-            # 保存 HTML 文件
-            # save_to_file(result.html, save_html_path)
-            print(f"保存 HTML 文件 {save_html_path}")
-        url = f"file://{save_html_path}"
+        #     # result: CrawlResult = await google_search(url)
+        #     # 保存 HTML 文件
+        #     # save_to_file(result.html, save_html_path)
+        #     print(f"保存 HTML 文件 {save_html_path}")
+        # url = f"file://{save_html_path}"
         result: CrawlResult = await google_search(url)
         
         # 漂亮打印 result.links
@@ -107,15 +98,42 @@ async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip
     return save_dir
 
 async def search_all():
+    excel_db_manager = ExcelDatabaseManager()
     """处理所有未完成的关键词"""
-    keywords = await get_keywords_from_db()
-    for keyword in keywords:
+    key_model_list = excel_db_manager.get_keywords_by_status()
+    for keyword_model in key_model_list:
         # if is_already_processed(keyword):
         #     print(f"关键词 {keyword} 已处理,跳过")
         #     continue
-        await process_keyword(keyword)
+        await process_keyword(keyword_model.key_word)
 async def test_single_search():
-    await process_keyword("Acalypha malabarica essential oil", start=0, pages_num=250)
+    # await process_keyword("Acalypha malabarica essential oil", start=0, pages_num=250)
+    # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\正常的搜索结果.html')
+    # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\查询不到内容.html')
+    # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\流量异常.html")
+    # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\最后一页.html")
+    save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\只有一页.html")
+    save_dir = save_html_path.parent
+    file_name = save_html_path.name
+    url = f"file://{save_html_path}"
+    result: CrawlResult = await google_search(url)
+    # 漂亮打印 result.links
+    # print(json.dumps(result.links, indent=4))
+    save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{file_name}.json")
+    print(f"保存 result.links 文件 {save_json_path}")
+    links = filter_links(result.links)
+    # print('\n -----------------links:')
+    # print(json.dumps(links, indent=4))
+    save_json_path = save_to_file(json.dumps(links, indent=4), save_dir / f"links-{file_name}-filter.json")
+    links_not_local = filter_local_domain(links)
+    print(f"保存 links_not_local.json 文件 {save_json_path}")
+    # print('\n -----------------links_not_local:')
+    # print(json.dumps(links_not_local, indent=4))
+    # print(f"len links_not_local: {len(links_not_local)}")
+    save_links_not_local_path = save_to_file(json.dumps(links_not_local, indent=4), save_dir / f"links-{file_name}-filter-not-local.json")
+    print(f"保存 links-{file_name}-filter-not-local.json 文件 {save_links_not_local_path}")
+    # print(f"start: {i}, links: {links} \n len: {len(links)}")
+
     # result = await google_search("Acalypha malabarica essential oil", start=50)
     # print(f"result clean html:\n {result.cleaned_html}")
     # print(f"result.links\n {result.links['external']}")
@@ -126,8 +144,8 @@ async def test_single_search():
 
 
 async def main():
-    await search_all()
-    # await test_single_search()
+    # await search_all()
+    await test_single_search()
     
 if __name__ == "__main__":
     asyncio.run(main())

+ 40 - 0
tests/test_dp_search_page.py

@@ -0,0 +1,40 @@
+import pytest
+from pathlib import Path
+from DrissionPage import ChromiumPage
+from mylib.drission_page import load_chrome_from_ini
+from mylib.search_manager import SearchManager
+
+# 使用 fixture 来初始化浏览器和 SearchManager
+@pytest.fixture(scope="module")
+def page():
+    return load_chrome_from_ini()
+
+@pytest.fixture(scope="module")
+def search_manager(page):
+    return SearchManager(page)
+
+# 参数化测试用例
+@pytest.mark.parametrize("file_name, expected_result", [
+    ("只有一页.html", True),
+    ("最后一页.html", True),
+    ("查询不到内容 (2).html", True),
+    ("查询不到内容.html", True),
+    ("正常的搜索结果.html", False),
+    ("流量异常.html", pytest.raises(ValueError)),
+])
+def test_check_last_page(search_manager, file_name, expected_result):
+    test_dir = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug')
+    file_path = test_dir / file_name
+    
+    print(f"Testing file: {file_name}")  # 打印当前测试的文件名
+    
+    if isinstance(expected_result, type(pytest.raises(ValueError))):
+        print(f"Expecting ValueError for file: {file_name}")  # 打印预期会抛出异常的文件
+        with expected_result:
+            search_manager.page.get(file_path)
+            search_manager.check_last_page()
+    else:
+        search_manager.page.get(file_path)
+        result = search_manager.check_last_page()
+        print(f"Result for {file_name}: {result}")  # 打印实际的测试结果
+        assert result == expected_result, f"Expected {expected_result}, but got {result} for {file_name}"