Эх сурвалжийг харах

测试 prefect 但是效果不理想

mrh 1 жил өмнө
parent
commit
a8f84b34ae

+ 0 - 0
fontend/tsconfig.app.json


+ 0 - 0
fontend/tsconfig.json


+ 0 - 0
fontend/tsconfig.node.json


Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 826 - 5
poetry.lock


+ 1 - 0
pyproject.toml

@@ -18,6 +18,7 @@ dependencies = [
     "flower (>=2.0.1,<3.0.0)",
     "docling (>=2.21.0,<3.0.0)",
     "pywebview (>=5.4,<6.0)",
+    "prefect (>=3.2.7,<4.0.0)",
 ]
 
 

+ 14 - 0
tests/mytest/demo/dagster_task.py

@@ -0,0 +1,14 @@
+import dagster as dg
+
+
+@dg.asset
+def hello(context: dg.AssetExecutionContext):
+    context.log.info("Hello!")
+
+
+@dg.asset(deps=[hello])
+def world(context: dg.AssetExecutionContext):
+    context.log.info("World!")
+
+
+defs = dg.Definitions(assets=[hello, world])

+ 140 - 0
tests/mytest/demo/prefect_task.py

@@ -0,0 +1,140 @@
+import datetime
+from pathlib import Path
+from typing import Dict, Optional
+from pydantic import BaseModel
+from prefect import flow, task
+from worker.search_engine.drission_google_search import search_keyword_drission
+from worker.html_convert.pandoc import process_single_example, process_all_results
+from DrissionPage import ChromiumPage
+from mylib.drission_page import load_chrome_from_ini
+from mylib.logu import logger
+from config.settings import OUTPUT_DIR
+from prefect.runtime import task_run, flow_run
+from prefect.filesystems import LocalFileSystem
+from prefect.cache_policies import INPUTS,DEFAULT
+from datetime import timedelta
+from worker.crawl_pages.crawl_urls import URLCrawler
+from mylib.logu import get_logger
+from crawl4ai import BrowserConfig
+from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
+
+
+local_storage = LocalFileSystem(basepath=OUTPUT_DIR/"prefect_data")
+logger.info(f"{local_storage.basepath}")
+# local_storage.save('crawl', overwrite=True)
+local_storage = local_storage.load('crawl')
+
+class Keyword(BaseModel):
+    keyword: str
+
+class GoogleSearchInput(Keyword):
+    max_result_items: int = 200
+    skip_existing: bool = True
+    browser_config: Optional[Dict] = None
+
+class CrawlKeywordUrlsInput(Keyword):
+    browser_config: Optional[Dict] = {
+                    'headless': True,
+                    'verbose': False,
+                    'extra_args': ["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
+                    'proxy': None  # Let worker get its own proxy
+                }
+    overwrite: Optional[bool] = False
+    proxies: Optional[Dict] = []
+
+def get_search_task_runtime_name():
+    parameters = flow_run.parameters
+    keyword = parameters.get('keyword', '')
+    task_run_name = task_run.name
+    logger.info(f"{task_run_name}_{keyword}")
+    return f"search_{keyword}"
+
+
+def cache_key_fn_search_keyword_task(context, parameters):
+    keyword = parameters.get('keyword', '')
+    return f"search-{keyword}"
+# @task(result_storage=local_storage)
+@task(persist_result=True, 
+      result_storage=local_storage, 
+      result_serializer='json',
+      task_run_name=get_search_task_runtime_name,
+      cache_policy=INPUTS, 
+      cache_key_fn=cache_key_fn_search_keyword_task,
+      )
+def search_keyword_task(google_search_input: GoogleSearchInput):
+    """异步关键词搜索任务"""
+    print(f"开始处理关键词搜索任务: {google_search_input.keyword} {google_search_input.skip_existing}")
+    result = search_keyword_drission(google_search_input.keyword, max_result_items=google_search_input.max_result_items, skip_existing=google_search_input.skip_existing, browser_config=google_search_input.browser_config)
+    print(f"search_keyword_task: ", result)
+    assert result['error'] == 0
+    # return {"keyword": keyword, "result": {'error': 0, 'msg': '', 'data': {'max_result_items': max_result_items}}}
+    return result
+
+@task(persist_result=True, 
+      result_storage=local_storage, 
+      result_serializer='json',
+      cache_key_fn=cache_key_fn_search_keyword_task,
+      )
+def crawl_page_urls_task(page_id: int, browser_config: BrowserConfig = None, overwrite: bool = False):
+    result = URLCrawler().crawl_page_urls(page_id, browser_config=browser_config, overwrite=overwrite)
+    return result
+
+@flow(log_prints=True, flow_run_name="{keyword}",persist_result=True)
+def google_crawl(google_search_input: GoogleSearchInput):
+    print(f"Hello {google_search_input.keyword} from Prefect! 🤗")
+    task = search_keyword_task.with_options(
+        refresh_cache=not google_search_input.skip_existing)
+    result_submit = task.submit(google_search_input)
+    result = result_submit.result()
+    keyword_model:KeywordTask = result['data']
+    return keyword_model
+
+@flow(log_prints=True, flow_run_name="crawl_{keyword}",persist_result=True)
+def crawl_keyword_urls(keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
+    crawler = URLCrawler()
+    keyword_model = crawler.db_manager.get_keyword_task(keyword)
+    assert keyword_model and keyword_model.is_completed
+    print(f"crawl_keyword_urls {keyword}")
+    crawl_task = crawl_page_urls_task.with_options(
+    task_run_name='crawl_urls-{keyword}',
+    refresh_cache=overwrite)
+    for page in keyword_model.pages:
+        crawl_submit = crawl_task.submit(page.id, browser_config=CrawlKeywordUrlsInput.browser_config, overwrite=False)
+        crawl_submit.wait()
+        page_result = crawl_submit.result()
+        print(f"page.id {page.id} result: {page_result}")
+
+@flow(log_prints=True, flow_run_name="convert_{keyword}",persist_result=True)
+def convert_results(keyword: str):
+    crawler = URLCrawler()
+    keyword_model = crawler.db_manager.get_keyword_task(keyword)
+    assert keyword_model and keyword_model.is_completed
+    print(f"convert_results {keyword}")
+    for item in keyword_model.items:
+        print(f"item.id {item.id}")
+        res = process_single_example(item.id)
+        print(f"process_single_example res {res}")
+        # crawler.convert_result_to_docx(item.id)
+        # crawler.convert_result_to_html(item.id)
+
+
+@flow(log_prints=True)
+def search_and_convert(google_search_input: GoogleSearchInput, browser_config: BrowserConfig = None, overwrite: bool = False):
+    pass
+
+if __name__ == "__main__":
+    # creates a deployment and stays running to monitor for work instructions 
+    # generated on the server
+
+    # hello_world.deploy(
+    #     name="my-first-deployment",
+    #     tags=["onboarding"],
+    #     parameters={"goodbye": True},
+    #     work_pool_name="default-pool",
+    # )
+    google_crawl.serve(
+        name="google_crawl_to_docx_flow",
+        tags=["default"],
+        parameters={"keyword": "python like this"},
+
+    )

+ 1 - 1
worker/crawl_pages/crawl_urls.py

@@ -180,7 +180,7 @@ class URLCrawler:
                 task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
                 tasks.append(task)
                 
-            await asyncio.gather(*tasks, return_exceptions=True)
+            return await asyncio.gather(*tasks, return_exceptions=True)
 
     async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
         """Crawl all URLs for a specific keyword"""

+ 1 - 1
worker/html_convert/pandoc.py

@@ -191,7 +191,7 @@ def process_single_example(result_id: int, skip_existing=True):
         logger.info(f"Successfully processed result {result_id}")
     else:
         logger.error(f"Failed to process result {result_id}")
-
+    return success
 def process_all_results():
     # Process all results in the database
     db_manager = SearchResultManager()

+ 9 - 0
worker/readme.md

@@ -1,3 +1,12 @@
+# prefect 
+```shell
+prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"
+prefect start
+
+```
+
+# celery
+
 To use this system you would:
 
  1 Start Celery worker with: celery -A worker.celery.app worker --loglevel=info --concurrency=1

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно