| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- from pathlib import Path
- from scrapling import Adaptor
- class SmartElementSelector:
- def __init__(self, rules):
- self.rules = rules
-
- def is_hidden(self, element):
- """综合可见性检测"""
- # 内联样式检测
- if 'style' in element.attrib:
- style = element.attrib['style'].lower()
- if 'display:none' in style or 'visibility:hidden' in style:
- return True
-
- # 隐藏属性检测
- if element.attrib.get('hidden') in ['', 'hidden']:
- return True
-
- # 常见隐藏类名检测
- if 'class' in element.attrib:
- class_names = element.attrib['class'].lower().split()
- if {'hidden', 'd-none', 'invisible'} & set(class_names):
- return True
-
- return False
- def check_ancestor(self, element, rule):
- """递归检查祖先节点"""
- target_tag = rule.get('tag', '').lower()
- attr_name = rule.get('name', '').lower()
- attr_value = rule.get('value', '').lower()
-
- current = element.parent
- while current:
- # 标签匹配检测
- if current.tag.lower() == target_tag:
- # 属性值检测
- current_value = current.attrib.get(attr_name, '').lower()
- if rule['match_type'] == 'contains':
- if attr_value in current_value:
- return True
- elif rule['match_type'] == 'exact':
- if current_value == attr_value:
- return True
- current = current.parent
- return False
- def calculate_score(self, element):
- """动态权重评分系统"""
- if self.is_hidden(element):
- return -1 # 直接排除隐藏元素
-
- score = 0
- for rule in self.rules:
- # 标签类型检测
- if rule['type'] == 'tag':
- if element.tag.lower() == rule['value'].lower():
- score += rule['weight']
-
- # 属性包含检测
- elif rule['type'] == 'attribute_contains':
- attr_value = element.attrib.get(rule['name'], '').lower()
- if rule['value'].lower() in attr_value:
- score += rule['weight']
-
- # 祖先节点检测
- elif rule['type'] == 'ancestor':
- if self.check_ancestor(element, rule):
- score += rule['weight']
-
- # 可见性奖励分
- elif rule['type'] == 'visible':
- if not self.is_hidden(element):
- score += rule['weight']
-
- return score
- class LocatorGenerator:
- @staticmethod
- def generate_locators(element):
- """生成多种定位策略并按优先级排序"""
- locators = []
-
- # 1. 优先使用唯一标识属性
- if element_id := element.attrib.get('id'):
- locators.append(('id', f'#{element_id}'))
-
- # 2. 使用稳定的name属性
- if name := element.attrib.get('name'):
- locators.append(('name', f'[name="{name}"]'))
-
- # 3. 组合关键ARIA属性
- aria_attrs = []
- if aria_label := element.attrib.get('aria-label'):
- aria_attrs.append(f'[aria-label="{aria_label}"]')
- if role := element.attrib.get('role'):
- aria_attrs.append(f'[role="{role}"]')
- if aria_attrs:
- locators.append(('aria-combo', ''.join(aria_attrs)))
-
- # 4. 智能类名处理(过滤动态部分)
- if classes := element.attrib.get('class', '').split():
- static_classes = [c for c in classes if len(c) > 3 and not c.isnumeric()]
- if static_classes:
- class_selector = '.' + '.'.join(static_classes)
- locators.append(('class', class_selector))
-
- # 5. 生成相对XPath(基于邻近特征)
- xpath_parts = []
- if element.tag:
- xpath_parts.append(f'//{element.tag}')
- for attr in ['name', 'role', 'placeholder']:
- if value := element.attrib.get(attr):
- xpath_parts.append(f'[@{attr}="{value}"]')
- break
- if xpath_parts:
- locators.append(('xpath', ''.join(xpath_parts)))
-
- # 按优先级排序:id > name > aria > class > xpath
- priority_order = ['id', 'name', 'aria-combo', 'class', 'xpath']
- return sorted(locators, key=lambda x: priority_order.index(x[0]) if x[0] in priority_order else len(priority_order))
- class EnhancedAdaptor:
- def __init__(self, html_content):
- self.html_content = html_content
- # 这里假设 Adaptor 已经实现了基本的 HTML 解析和元素查找功能
- self.page = Adaptor(html_content)
-
- def find_all(self, tag_name):
- """查找所有指定标签的元素"""
- return self.page.find_all(tag_name)
-
- def get_locator_strategy(self, element):
- """获取推荐定位策略"""
- locators = LocatorGenerator.generate_locators(element)
-
- # 选择第一个非空定位器
- for loc_type, selector in locators:
- if loc_type == 'id':
- return {'strategy': 'id', 'selector': selector[1:]}
- if loc_type in ('name', 'aria-combo', 'class'):
- return {'strategy': 'css', 'selector': selector}
- if loc_type == 'xpath':
- return {'strategy': 'xpath', 'selector': selector}
-
- # 默认返回相对XPath
- return {'strategy': 'xpath', 'selector': element.xpath}
-
- def verify_locator(self, locator_info):
- """验证定位器有效性"""
- results = {
- 'is_unique': False,
- 'alternatives': []
- }
-
- try:
- elements = self.find_all(locator_info['selector'], strategy=locator_info['strategy'])
- if len(elements) == 1:
- results['is_unique'] = True
- else:
- # 生成备选方案
- results['alternatives'] = self.generate_fallback_locators(elements[0])
- except:
- pass
- return results
-
- def generate_fallback_locators(self, element):
- """生成备选定位器"""
- fallbacks = []
- # 添加更多备选策略
- if element.attrib.get('name'):
- fallbacks.append({'strategy': 'css', 'selector': f'[name="{element.attrib["name"]}"]'})
- if element.attrib.get('class'):
- fallbacks.append({'strategy': 'css', 'selector': f'.{element.attrib["class"].split()[0]}'})
- return fallbacks
- def get_search_rule():
- # 规则配置(可根据实际需求扩展)
- return [
- # 基础特征
- {"type": "tag", "value": "textarea", "weight": 20},
-
- # 关键属性检测
- {"type": "attribute_contains", "name": "title", "value": "search", "weight": 25},
- {"type": "attribute_contains", "name": "aria-label", "value": "search", "weight": 25},
- {"type": "attribute_contains", "name": "role", "value": "search", "weight": 30},
-
- # 层级关系检测
- {
- "type": "ancestor",
- "tag": "form",
- "name": "action",
- "value": "/search",
- "match_type": "exact",
- "weight": 40
- },
- {
- "type": "ancestor",
- "tag": "form",
- "name": "role",
- "value": "search",
- "match_type": "exact",
- "weight": 35
- },
-
- # 可见性奖励
- {"type": "visible", "weight": 20}
- ]
- def find_target_element(html_content, rules, base_all='textarea'):
- selector = SmartElementSelector(rules)
- page = EnhancedAdaptor(html_content)
-
- # 获取所有候选元素
- candidates = page.find_all(base_all) # 可以扩展为多标签搜索
-
- # 计算评分并过滤
- scored_elements = []
- for el in candidates:
- score = selector.calculate_score(el)
- if score > 0:
- scored_elements.append((el, score))
-
- # 按评分排序
- scored_elements.sort(key=lambda x: x[1], reverse=True)
-
- # 返回最高分元素(带评分验证)
- if scored_elements:
- top_score = scored_elements[0][1]
- # 过滤掉明显低分的候选项
- finalists = [el for el, s in scored_elements if s >= top_score * 0.8]
-
- # 如果有多个高分候选,优先选择更靠近表单的元素
- if len(finalists) > 1:
- # 通过DOM深度进行二次排序
- finalists.sort(key=lambda el: len(el.path))
- # print(f"finalists {finalists}")
- best_element = finalists[0]
- locator_info = page.get_locator_strategy(best_element)
- # print(f"locator_info {locator_info}")
- # {'strategy': 'id', 'selector': 'APjFqb'}
- locator_info['xpath'] = f'//{base_all}[@{locator_info["strategy"]}="{locator_info["selector"]}"]'
- return locator_info
- # 添加置信度验证
- verification_results = page.verify_locator(locator_info)
- if verification_results['is_unique']:
- return {
- 'element': best_element,
- 'locator': locator_info,
- 'confidence': 'high'
- }
- return {
- 'element': best_element,
- 'locator': locator_info,
- 'confidence': 'medium',
- 'fallbacks': verification_results['alternatives']
- }
- return None
- def get_search_ele(html_content: str, base_all='textarea'):
- rules = get_search_rule()
- return find_target_element(html_content, rules, base_all=base_all)
- def search_demo():
- # 使用示例
- file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\查询不到搜索框-两个textarea.html')
- file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\查询搜索框结果页面.html')
- # file = Path(r'G:\code\upwork\zhang_crawl_bio\output\debug\智能选择A标签测试.html')
- html_content = file.read_text(encoding='utf-8')
- rules = get_search_rule()
- target_element = find_target_element(html_content, rules)
- print(target_element)
- if __name__ == "__main__":
- search_demo()
|