import random import re import socket import struct from dns import resolver,exception from cachetools import TTLCache # 配置自定义 DNS 服务器 custom_resolver = resolver.Resolver() custom_resolver.nameservers = ['10.0.0.1', '10.0.0.32', '10.0.0.2'] cache = TTLCache(maxsize=100, ttl=90) # 设置端口号(这里假设所有服务器都使用 8600 端口) for server in custom_resolver.nameservers: custom_resolver.port = 8600 # 注意:通常不需要对每个服务器单独设置,除非它们使用不同端口 def extract_ip_prefix(encoded_address:str): match = encoded_address.split(".")[0] return match def decode_address(encoded_address): hex_address = extract_ip_prefix(encoded_address) binary_address = bytes.fromhex(hex_address) return socket.inet_ntoa(binary_address) def get_srv(srv_record_name): try: if srv_record_name not in cache: answer = custom_resolver.resolve(srv_record_name, 'SRV') cache[srv_record_name] = answer else: answer = cache[srv_record_name] selected_rdata = random.choice(answer) ip_address = decode_address(selected_rdata.target.to_text()) print(f"Service: {srv_record_name} - {ip_address}:{selected_rdata.port} ") return ip_address, selected_rdata.port except exception.DNSException as e: print(f"DNS exception: {e}") raise def del_cache(srv_record_name=""): if srv_record_name: cache.pop(srv_record_name, None) else: cache.clear() def main(): # 查询 SRV 记录 srv_record_name = 'video-get.service.consul' # srv_record_name = 'consul.service.consul' get_srv(srv_record_name) get_srv("prefect.service.consul") if __name__ == "__main__": main()