90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
|
import requests
|
||
|
from dns import resolver, query, message, exception
|
||
|
import socket
|
||
|
|
||
|
# Replace with your DNS master
|
||
|
DNS_MASTER = "ns1.it53.nl"
|
||
|
|
||
|
# Replace with your API key
|
||
|
API_KEY = "YXJkQU1VVkRoSGZyV0FP"
|
||
|
|
||
|
def get_zones(api_key):
|
||
|
url = "https://dns.it53.nl:8443/api/v1/servers/localhost/zones"
|
||
|
headers = {"x-api-key": api_key}
|
||
|
response = requests.get(url, headers=headers)
|
||
|
zones = response.json()
|
||
|
return zones
|
||
|
|
||
|
def get_ns_records(zone_name):
|
||
|
try:
|
||
|
ns_records = resolver.resolve(zone_name, 'NS')
|
||
|
return [str(record) for record in ns_records]
|
||
|
except resolver.NXDOMAIN:
|
||
|
print(f"❌ Error: Zone '{zone_name}' does not exist.")
|
||
|
return []
|
||
|
except resolver.NoAnswer:
|
||
|
print(f"❌ Error: No NS records found for zone '{zone_name}'.")
|
||
|
return []
|
||
|
|
||
|
def check_soa_consistency(zone_name, ns_records, master_ip):
|
||
|
master_soa_query = message.make_query(zone_name, 'SOA')
|
||
|
try:
|
||
|
master_soa_response = query.tcp(master_soa_query, where=master_ip)
|
||
|
print(f"Master SOA response for {zone_name}: {master_soa_response}")
|
||
|
except exception.SyntaxError:
|
||
|
print(f"❌ Malformed text input for master NS in zone '{zone_name}'.")
|
||
|
return
|
||
|
|
||
|
for ns_record in ns_records:
|
||
|
try:
|
||
|
ns_soa_query = message.make_query(zone_name, 'SOA')
|
||
|
ns_soa_response = query.tcp(ns_soa_query, where=ns_record)
|
||
|
print(f"NS '{ns_record}' SOA response for {zone_name}: {ns_soa_response}")
|
||
|
|
||
|
# Extract serial from SOA records
|
||
|
ns_serial = ns_soa_response.answer[0].serial
|
||
|
master_serial = master_soa_response.answer[0].serial
|
||
|
print(f"NS serial {ns_serial}")
|
||
|
|
||
|
# Compare serial values
|
||
|
if ns_serial != master_serial:
|
||
|
print(f"❌ Inconsistency: Serial for NS '{ns_record}' in zone '{zone_name}'.")
|
||
|
else:
|
||
|
print(f"✅ Consistency: Serial for NS '{ns_record}' in zone '{zone_name}'.")
|
||
|
except exception.SyntaxError:
|
||
|
print(f"❌ Malformed text input for NS '{ns_record}' in zone '{zone_name}'.")
|
||
|
except resolver.NoAnswer:
|
||
|
print(f"❌ No SOA record found for NS '{ns_record}' in zone '{zone_name}'.")
|
||
|
except resolver.NXDOMAIN:
|
||
|
print(f"❌ Zone '{zone_name}' does not exist for NS '{ns_record}'.")
|
||
|
except exception.Timeout:
|
||
|
print(f"❌ Timeout while checking SOA for NS '{ns_record}' in zone '{zone_name}'.")
|
||
|
except Exception as e:
|
||
|
print(f"❌ Error checking SOA for NS '{ns_record}' in zone '{zone_name}': {str(e)}")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def main():
|
||
|
zones = get_zones(API_KEY)
|
||
|
|
||
|
for zone in zones:
|
||
|
zone_name = zone["name"]
|
||
|
print(f"Checking zone: {zone_name}")
|
||
|
|
||
|
ns_records = get_ns_records(zone_name)
|
||
|
if ns_records:
|
||
|
# Resolve IP address for DNS_MASTER
|
||
|
try:
|
||
|
master_ip = socket.gethostbyname(DNS_MASTER)
|
||
|
check_soa_consistency(zone_name, ns_records, master_ip)
|
||
|
except socket.error as e:
|
||
|
print(f"❌ Error resolving IP address for '{DNS_MASTER}': {str(e)}")
|
||
|
exit()
|
||
|
print("\n" + "=" * 40 + "\n")
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|
||
|
|