import requests from dns import resolver, query, message, exception import socket # Replace with your DNS master DNS_MASTER = "ns1.it53.nl" # Replace with your API key API_KEY = "YXJkQU1VVkRoSGZyV0FP" def get_zones(api_key): url = "https://dns.it53.nl:8443/api/v1/servers/localhost/zones" headers = {"x-api-key": api_key} response = requests.get(url, headers=headers) zones = response.json() return zones def get_ns_records(zone_name): try: ns_records = resolver.resolve(zone_name, 'NS') return [str(record) for record in ns_records] except resolver.NXDOMAIN: print(f"❌ Error: Zone '{zone_name}' does not exist.") return [] except resolver.NoAnswer: print(f"❌ Error: No NS records found for zone '{zone_name}'.") return [] def check_soa_consistency(zone_name, ns_records, master_ip): master_soa_query = message.make_query(zone_name, 'SOA') try: master_soa_response = query.tcp(master_soa_query, where=master_ip) print(f"Master SOA response for {zone_name}: {master_soa_response}") except exception.SyntaxError: print(f"❌ Malformed text input for master NS in zone '{zone_name}'.") return for ns_record in ns_records: try: ns_soa_query = message.make_query(zone_name, 'SOA') ns_soa_response = query.tcp(ns_soa_query, where=ns_record) print(f"NS '{ns_record}' SOA response for {zone_name}: {ns_soa_response}") # Extract serial from SOA records ns_serial = ns_soa_response.answer[0].serial master_serial = master_soa_response.answer[0].serial print(f"NS serial {ns_serial}") # Compare serial values if ns_serial != master_serial: print(f"❌ Inconsistency: Serial for NS '{ns_record}' in zone '{zone_name}'.") else: print(f"✅ Consistency: Serial for NS '{ns_record}' in zone '{zone_name}'.") except exception.SyntaxError: print(f"❌ Malformed text input for NS '{ns_record}' in zone '{zone_name}'.") except resolver.NoAnswer: print(f"❌ No SOA record found for NS '{ns_record}' in zone '{zone_name}'.") except resolver.NXDOMAIN: print(f"❌ Zone '{zone_name}' does not exist for NS '{ns_record}'.") except exception.Timeout: print(f"❌ Timeout while checking SOA for NS '{ns_record}' in zone '{zone_name}'.") except Exception as e: print(f"❌ Error checking SOA for NS '{ns_record}' in zone '{zone_name}': {str(e)}") def main(): zones = get_zones(API_KEY) for zone in zones: zone_name = zone["name"] print(f"Checking zone: {zone_name}") ns_records = get_ns_records(zone_name) if ns_records: # Resolve IP address for DNS_MASTER try: master_ip = socket.gethostbyname(DNS_MASTER) check_soa_consistency(zone_name, ns_records, master_ip) except socket.error as e: print(f"❌ Error resolving IP address for '{DNS_MASTER}': {str(e)}") exit() print("\n" + "=" * 40 + "\n") if __name__ == "__main__": main()