|
|
@@ -0,0 +1,281 @@
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+from scrapling import Adaptor
|
|
|
+
|
|
|
+class SmartElementSelector:
|
|
|
+ def __init__(self, rules):
|
|
|
+ self.rules = rules
|
|
|
+
|
|
|
+ def is_hidden(self, element):
|
|
|
+ """综合可见性检测"""
|
|
|
+ # 内联样式检测
|
|
|
+ if 'style' in element.attrib:
|
|
|
+ style = element.attrib['style'].lower()
|
|
|
+ if 'display:none' in style or 'visibility:hidden' in style:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 隐藏属性检测
|
|
|
+ if element.attrib.get('hidden') in ['', 'hidden']:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 常见隐藏类名检测
|
|
|
+ if 'class' in element.attrib:
|
|
|
+ class_names = element.attrib['class'].lower().split()
|
|
|
+ if {'hidden', 'd-none', 'invisible'} & set(class_names):
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ def check_ancestor(self, element, rule):
|
|
|
+ """递归检查祖先节点"""
|
|
|
+ target_tag = rule.get('tag', '').lower()
|
|
|
+ attr_name = rule.get('name', '').lower()
|
|
|
+ attr_value = rule.get('value', '').lower()
|
|
|
+
|
|
|
+ current = element.parent
|
|
|
+ while current:
|
|
|
+ # 标签匹配检测
|
|
|
+ if current.tag.lower() == target_tag:
|
|
|
+ # 属性值检测
|
|
|
+ current_value = current.attrib.get(attr_name, '').lower()
|
|
|
+ if rule['match_type'] == 'contains':
|
|
|
+ if attr_value in current_value:
|
|
|
+ return True
|
|
|
+ elif rule['match_type'] == 'exact':
|
|
|
+ if current_value == attr_value:
|
|
|
+ return True
|
|
|
+ current = current.parent
|
|
|
+ return False
|
|
|
+
|
|
|
+ def calculate_score(self, element):
|
|
|
+ """动态权重评分系统"""
|
|
|
+ if self.is_hidden(element):
|
|
|
+ return -1 # 直接排除隐藏元素
|
|
|
+
|
|
|
+ score = 0
|
|
|
+ for rule in self.rules:
|
|
|
+ # 标签类型检测
|
|
|
+ if rule['type'] == 'tag':
|
|
|
+ if element.tag.lower() == rule['value'].lower():
|
|
|
+ score += rule['weight']
|
|
|
+
|
|
|
+ # 属性包含检测
|
|
|
+ elif rule['type'] == 'attribute_contains':
|
|
|
+ attr_value = element.attrib.get(rule['name'], '').lower()
|
|
|
+ if rule['value'].lower() in attr_value:
|
|
|
+ score += rule['weight']
|
|
|
+
|
|
|
+ # 祖先节点检测
|
|
|
+ elif rule['type'] == 'ancestor':
|
|
|
+ if self.check_ancestor(element, rule):
|
|
|
+ score += rule['weight']
|
|
|
+
|
|
|
+ # 可见性奖励分
|
|
|
+ elif rule['type'] == 'visible':
|
|
|
+ if not self.is_hidden(element):
|
|
|
+ score += rule['weight']
|
|
|
+
|
|
|
+ return score
|
|
|
+
|
|
|
+
|
|
|
+class LocatorGenerator:
|
|
|
+ @staticmethod
|
|
|
+ def generate_locators(element):
|
|
|
+ """生成多种定位策略并按优先级排序"""
|
|
|
+ locators = []
|
|
|
+
|
|
|
+ # 1. 优先使用唯一标识属性
|
|
|
+ if element_id := element.attrib.get('id'):
|
|
|
+ locators.append(('id', f'#{element_id}'))
|
|
|
+
|
|
|
+ # 2. 使用稳定的name属性
|
|
|
+ if name := element.attrib.get('name'):
|
|
|
+ locators.append(('name', f'[name="{name}"]'))
|
|
|
+
|
|
|
+ # 3. 组合关键ARIA属性
|
|
|
+ aria_attrs = []
|
|
|
+ if aria_label := element.attrib.get('aria-label'):
|
|
|
+ aria_attrs.append(f'[aria-label="{aria_label}"]')
|
|
|
+ if role := element.attrib.get('role'):
|
|
|
+ aria_attrs.append(f'[role="{role}"]')
|
|
|
+ if aria_attrs:
|
|
|
+ locators.append(('aria-combo', ''.join(aria_attrs)))
|
|
|
+
|
|
|
+ # 4. 智能类名处理(过滤动态部分)
|
|
|
+ if classes := element.attrib.get('class', '').split():
|
|
|
+ static_classes = [c for c in classes if len(c) > 3 and not c.isnumeric()]
|
|
|
+ if static_classes:
|
|
|
+ class_selector = '.' + '.'.join(static_classes)
|
|
|
+ locators.append(('class', class_selector))
|
|
|
+
|
|
|
+ # 5. 生成相对XPath(基于邻近特征)
|
|
|
+ xpath_parts = []
|
|
|
+ if element.tag:
|
|
|
+ xpath_parts.append(f'//{element.tag}')
|
|
|
+ for attr in ['name', 'role', 'placeholder']:
|
|
|
+ if value := element.attrib.get(attr):
|
|
|
+ xpath_parts.append(f'[@{attr}="{value}"]')
|
|
|
+ break
|
|
|
+ if xpath_parts:
|
|
|
+ locators.append(('xpath', ''.join(xpath_parts)))
|
|
|
+
|
|
|
+ # 按优先级排序:id > name > aria > class > xpath
|
|
|
+ priority_order = ['id', 'name', 'aria-combo', 'class', 'xpath']
|
|
|
+ return sorted(locators, key=lambda x: priority_order.index(x[0]) if x[0] in priority_order else len(priority_order))
|
|
|
+
|
|
|
+
|
|
|
+class EnhancedAdaptor:
|
|
|
+ def __init__(self, html_content):
|
|
|
+ self.html_content = html_content
|
|
|
+ # 这里假设 Adaptor 已经实现了基本的 HTML 解析和元素查找功能
|
|
|
+ self.page = Adaptor(html_content)
|
|
|
+
|
|
|
+ def find_all(self, tag_name):
|
|
|
+ """查找所有指定标签的元素"""
|
|
|
+ return self.page.find_all(tag_name)
|
|
|
+
|
|
|
+ def get_locator_strategy(self, element):
|
|
|
+ """获取推荐定位策略"""
|
|
|
+ locators = LocatorGenerator.generate_locators(element)
|
|
|
+
|
|
|
+ # 选择第一个非空定位器
|
|
|
+ for loc_type, selector in locators:
|
|
|
+ if loc_type == 'id':
|
|
|
+ return {'strategy': 'id', 'selector': selector[1:]}
|
|
|
+ if loc_type in ('name', 'aria-combo', 'class'):
|
|
|
+ return {'strategy': 'css', 'selector': selector}
|
|
|
+ if loc_type == 'xpath':
|
|
|
+ return {'strategy': 'xpath', 'selector': selector}
|
|
|
+
|
|
|
+ # 默认返回相对XPath
|
|
|
+ return {'strategy': 'xpath', 'selector': element.xpath}
|
|
|
+
|
|
|
+ def verify_locator(self, locator_info):
|
|
|
+ """验证定位器有效性"""
|
|
|
+ results = {
|
|
|
+ 'is_unique': False,
|
|
|
+ 'alternatives': []
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ elements = self.find_all(locator_info['selector'], strategy=locator_info['strategy'])
|
|
|
+ if len(elements) == 1:
|
|
|
+ results['is_unique'] = True
|
|
|
+ else:
|
|
|
+ # 生成备选方案
|
|
|
+ results['alternatives'] = self.generate_fallback_locators(elements[0])
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ return results
|
|
|
+
|
|
|
+ def generate_fallback_locators(self, element):
|
|
|
+ """生成备选定位器"""
|
|
|
+ fallbacks = []
|
|
|
+ # 添加更多备选策略
|
|
|
+ if element.attrib.get('name'):
|
|
|
+ fallbacks.append({'strategy': 'css', 'selector': f'[name="{element.attrib["name"]}"]'})
|
|
|
+ if element.attrib.get('class'):
|
|
|
+ fallbacks.append({'strategy': 'css', 'selector': f'.{element.attrib["class"].split()[0]}'})
|
|
|
+ return fallbacks
|
|
|
+
|
|
|
+
|
|
|
+def get_search_rule():
|
|
|
+ # 规则配置(可根据实际需求扩展)
|
|
|
+ return [
|
|
|
+ # 基础特征
|
|
|
+ {"type": "tag", "value": "textarea", "weight": 20},
|
|
|
+
|
|
|
+ # 关键属性检测
|
|
|
+ {"type": "attribute_contains", "name": "title", "value": "search", "weight": 25},
|
|
|
+ {"type": "attribute_contains", "name": "aria-label", "value": "search", "weight": 25},
|
|
|
+ {"type": "attribute_contains", "name": "role", "value": "search", "weight": 30},
|
|
|
+
|
|
|
+ # 层级关系检测
|
|
|
+ {
|
|
|
+ "type": "ancestor",
|
|
|
+ "tag": "form",
|
|
|
+ "name": "action",
|
|
|
+ "value": "/search",
|
|
|
+ "match_type": "exact",
|
|
|
+ "weight": 40
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "type": "ancestor",
|
|
|
+ "tag": "form",
|
|
|
+ "name": "role",
|
|
|
+ "value": "search",
|
|
|
+ "match_type": "exact",
|
|
|
+ "weight": 35
|
|
|
+ },
|
|
|
+
|
|
|
+ # 可见性奖励
|
|
|
+ {"type": "visible", "weight": 20}
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+def find_target_element(html_content, rules, base_all='textarea'):
|
|
|
+ selector = SmartElementSelector(rules)
|
|
|
+ page = EnhancedAdaptor(html_content)
|
|
|
+
|
|
|
+ # 获取所有候选元素
|
|
|
+ candidates = page.find_all(base_all) # 可以扩展为多标签搜索
|
|
|
+
|
|
|
+ # 计算评分并过滤
|
|
|
+ scored_elements = []
|
|
|
+ for el in candidates:
|
|
|
+ score = selector.calculate_score(el)
|
|
|
+ if score > 0:
|
|
|
+ scored_elements.append((el, score))
|
|
|
+
|
|
|
+ # 按评分排序
|
|
|
+ scored_elements.sort(key=lambda x: x[1], reverse=True)
|
|
|
+
|
|
|
+ # 返回最高分元素(带评分验证)
|
|
|
+ if scored_elements:
|
|
|
+ top_score = scored_elements[0][1]
|
|
|
+ # 过滤掉明显低分的候选项
|
|
|
+ finalists = [el for el, s in scored_elements if s >= top_score * 0.8]
|
|
|
+
|
|
|
+ # 如果有多个高分候选,优先选择更靠近表单的元素
|
|
|
+ if len(finalists) > 1:
|
|
|
+ # 通过DOM深度进行二次排序
|
|
|
+ finalists.sort(key=lambda el: len(el.path))
|
|
|
+ # print(f"finalists {finalists}")
|
|
|
+ best_element = finalists[0]
|
|
|
+ locator_info = page.get_locator_strategy(best_element)
|
|
|
+ # print(f"locator_info {locator_info}")
|
|
|
+ # {'strategy': 'id', 'selector': 'APjFqb'}
|
|
|
+ locator_info['xpath'] = f'//{base_all}[@{locator_info["strategy"]}="{locator_info["selector"]}"]'
|
|
|
+ return locator_info
|
|
|
+ # 添加置信度验证
|
|
|
+ verification_results = page.verify_locator(locator_info)
|
|
|
+ if verification_results['is_unique']:
|
|
|
+ return {
|
|
|
+ 'element': best_element,
|
|
|
+ 'locator': locator_info,
|
|
|
+ 'confidence': 'high'
|
|
|
+ }
|
|
|
+ return {
|
|
|
+ 'element': best_element,
|
|
|
+ 'locator': locator_info,
|
|
|
+ 'confidence': 'medium',
|
|
|
+ 'fallbacks': verification_results['alternatives']
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+def get_search_ele(html_content: str, base_all='textarea'):
|
|
|
+ rules = get_search_rule()
|
|
|
+ return find_target_element(html_content, rules, base_all=base_all)
|
|
|
+def search_demo():
|
|
|
+ # 使用示例
|
|
|
+ file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\查询不到搜索框-两个textarea.html')
|
|
|
+ file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\查询搜索框结果页面.html')
|
|
|
+ # file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\智能选择A标签测试.html')
|
|
|
+
|
|
|
+ html_content = file.read_text(encoding='utf-8')
|
|
|
+ rules = get_search_rule()
|
|
|
+ target_element = find_target_element(html_content, rules)
|
|
|
+ print(target_element)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ search_demo()
|