890 lines
29 KiB
Python
890 lines
29 KiB
Python
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
|
|
|
|
# Copyright (C) 2001-2017 Nominum, Inc.
|
|
#
|
|
# Permission to use, copy, modify, and distribute this software and its
|
|
# documentation for any purpose with or without fee is hereby granted,
|
|
# provided that the above copyright notice and this permission notice
|
|
# appear in all copies.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
|
|
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
|
|
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
|
|
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
"""DNS rdata."""
|
|
|
|
import base64
|
|
import binascii
|
|
import inspect
|
|
import io
|
|
import itertools
|
|
import random
|
|
from importlib import import_module
|
|
from typing import Any, Dict, Optional, Tuple, Union
|
|
|
|
import dns.exception
|
|
import dns.immutable
|
|
import dns.ipv4
|
|
import dns.ipv6
|
|
import dns.name
|
|
import dns.rdataclass
|
|
import dns.rdatatype
|
|
import dns.tokenizer
|
|
import dns.ttl
|
|
import dns.wire
|
|
|
|
_chunksize = 32
|
|
|
|
# We currently allow comparisons for rdata with relative names for backwards
|
|
# compatibility, but in the future we will not, as these kinds of comparisons
|
|
# can lead to subtle bugs if code is not carefully written.
|
|
#
|
|
# This switch allows the future behavior to be turned on so code can be
|
|
# tested with it.
|
|
_allow_relative_comparisons = True
|
|
|
|
|
|
class NoRelativeRdataOrdering(dns.exception.DNSException):
|
|
"""An attempt was made to do an ordered comparison of one or more
|
|
rdata with relative names. The only reliable way of sorting rdata
|
|
is to use non-relativized rdata.
|
|
|
|
"""
|
|
|
|
|
|
def _wordbreak(data, chunksize=_chunksize, separator=b" "):
|
|
"""Break a binary string into chunks of chunksize characters separated by
|
|
a space.
|
|
"""
|
|
|
|
if not chunksize:
|
|
return data.decode()
|
|
return separator.join(
|
|
[data[i : i + chunksize] for i in range(0, len(data), chunksize)]
|
|
).decode()
|
|
|
|
|
|
# pylint: disable=unused-argument
|
|
|
|
|
|
def _hexify(data, chunksize=_chunksize, separator=b" ", **kw):
|
|
"""Convert a binary string into its hex encoding, broken up into chunks
|
|
of chunksize characters separated by a separator.
|
|
"""
|
|
|
|
return _wordbreak(binascii.hexlify(data), chunksize, separator)
|
|
|
|
|
|
def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw):
|
|
"""Convert a binary string into its base64 encoding, broken up into chunks
|
|
of chunksize characters separated by a separator.
|
|
"""
|
|
|
|
return _wordbreak(base64.b64encode(data), chunksize, separator)
|
|
|
|
|
|
# pylint: enable=unused-argument
|
|
|
|
__escaped = b'"\\'
|
|
|
|
|
|
def _escapify(qstring):
|
|
"""Escape the characters in a quoted string which need it."""
|
|
|
|
if isinstance(qstring, str):
|
|
qstring = qstring.encode()
|
|
if not isinstance(qstring, bytearray):
|
|
qstring = bytearray(qstring)
|
|
|
|
text = ""
|
|
for c in qstring:
|
|
if c in __escaped:
|
|
text += "\\" + chr(c)
|
|
elif c >= 0x20 and c < 0x7F:
|
|
text += chr(c)
|
|
else:
|
|
text += "\\%03d" % c
|
|
return text
|
|
|
|
|
|
def _truncate_bitmap(what):
|
|
"""Determine the index of greatest byte that isn't all zeros, and
|
|
return the bitmap that contains all the bytes less than that index.
|
|
"""
|
|
|
|
for i in range(len(what) - 1, -1, -1):
|
|
if what[i] != 0:
|
|
return what[0 : i + 1]
|
|
return what[0:1]
|
|
|
|
|
|
# So we don't have to edit all the rdata classes...
|
|
_constify = dns.immutable.constify
|
|
|
|
|
|
@dns.immutable.immutable
|
|
class Rdata:
|
|
"""Base class for all DNS rdata types."""
|
|
|
|
__slots__ = ["rdclass", "rdtype", "rdcomment"]
|
|
|
|
def __init__(self, rdclass, rdtype):
|
|
"""Initialize an rdata.
|
|
|
|
*rdclass*, an ``int`` is the rdataclass of the Rdata.
|
|
|
|
*rdtype*, an ``int`` is the rdatatype of the Rdata.
|
|
"""
|
|
|
|
self.rdclass = self._as_rdataclass(rdclass)
|
|
self.rdtype = self._as_rdatatype(rdtype)
|
|
self.rdcomment = None
|
|
|
|
def _get_all_slots(self):
|
|
return itertools.chain.from_iterable(
|
|
getattr(cls, "__slots__", []) for cls in self.__class__.__mro__
|
|
)
|
|
|
|
def __getstate__(self):
|
|
# We used to try to do a tuple of all slots here, but it
|
|
# doesn't work as self._all_slots isn't available at
|
|
# __setstate__() time. Before that we tried to store a tuple
|
|
# of __slots__, but that didn't work as it didn't store the
|
|
# slots defined by ancestors. This older way didn't fail
|
|
# outright, but ended up with partially broken objects, e.g.
|
|
# if you unpickled an A RR it wouldn't have rdclass and rdtype
|
|
# attributes, and would compare badly.
|
|
state = {}
|
|
for slot in self._get_all_slots():
|
|
state[slot] = getattr(self, slot)
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
for slot, val in state.items():
|
|
object.__setattr__(self, slot, val)
|
|
if not hasattr(self, "rdcomment"):
|
|
# Pickled rdata from 2.0.x might not have a rdcomment, so add
|
|
# it if needed.
|
|
object.__setattr__(self, "rdcomment", None)
|
|
|
|
def covers(self) -> dns.rdatatype.RdataType:
|
|
"""Return the type a Rdata covers.
|
|
|
|
DNS SIG/RRSIG rdatas apply to a specific type; this type is
|
|
returned by the covers() function. If the rdata type is not
|
|
SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
|
|
creating rdatasets, allowing the rdataset to contain only RRSIGs
|
|
of a particular type, e.g. RRSIG(NS).
|
|
|
|
Returns a ``dns.rdatatype.RdataType``.
|
|
"""
|
|
|
|
return dns.rdatatype.NONE
|
|
|
|
def extended_rdatatype(self) -> int:
|
|
"""Return a 32-bit type value, the least significant 16 bits of
|
|
which are the ordinary DNS type, and the upper 16 bits of which are
|
|
the "covered" type, if any.
|
|
|
|
Returns an ``int``.
|
|
"""
|
|
|
|
return self.covers() << 16 | self.rdtype
|
|
|
|
def to_text(
|
|
self,
|
|
origin: Optional[dns.name.Name] = None,
|
|
relativize: bool = True,
|
|
**kw: Dict[str, Any]
|
|
) -> str:
|
|
"""Convert an rdata to text format.
|
|
|
|
Returns a ``str``.
|
|
"""
|
|
|
|
raise NotImplementedError # pragma: no cover
|
|
|
|
def _to_wire(
|
|
self,
|
|
file: Optional[Any],
|
|
compress: Optional[dns.name.CompressType] = None,
|
|
origin: Optional[dns.name.Name] = None,
|
|
canonicalize: bool = False,
|
|
) -> bytes:
|
|
raise NotImplementedError # pragma: no cover
|
|
|
|
def to_wire(
|
|
self,
|
|
file: Optional[Any] = None,
|
|
compress: Optional[dns.name.CompressType] = None,
|
|
origin: Optional[dns.name.Name] = None,
|
|
canonicalize: bool = False,
|
|
) -> bytes:
|
|
"""Convert an rdata to wire format.
|
|
|
|
Returns a ``bytes`` or ``None``.
|
|
"""
|
|
|
|
if file:
|
|
return self._to_wire(file, compress, origin, canonicalize)
|
|
else:
|
|
f = io.BytesIO()
|
|
self._to_wire(f, compress, origin, canonicalize)
|
|
return f.getvalue()
|
|
|
|
def to_generic(
|
|
self, origin: Optional[dns.name.Name] = None
|
|
) -> "dns.rdata.GenericRdata":
|
|
"""Creates a dns.rdata.GenericRdata equivalent of this rdata.
|
|
|
|
Returns a ``dns.rdata.GenericRdata``.
|
|
"""
|
|
return dns.rdata.GenericRdata(
|
|
self.rdclass, self.rdtype, self.to_wire(origin=origin)
|
|
)
|
|
|
|
def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
|
|
"""Convert rdata to a format suitable for digesting in hashes. This
|
|
is also the DNSSEC canonical form.
|
|
|
|
Returns a ``bytes``.
|
|
"""
|
|
|
|
return self.to_wire(origin=origin, canonicalize=True)
|
|
|
|
def __repr__(self):
|
|
covers = self.covers()
|
|
if covers == dns.rdatatype.NONE:
|
|
ctext = ""
|
|
else:
|
|
ctext = "(" + dns.rdatatype.to_text(covers) + ")"
|
|
return (
|
|
"<DNS "
|
|
+ dns.rdataclass.to_text(self.rdclass)
|
|
+ " "
|
|
+ dns.rdatatype.to_text(self.rdtype)
|
|
+ ctext
|
|
+ " rdata: "
|
|
+ str(self)
|
|
+ ">"
|
|
)
|
|
|
|
def __str__(self):
|
|
return self.to_text()
|
|
|
|
def _cmp(self, other):
|
|
"""Compare an rdata with another rdata of the same rdtype and
|
|
rdclass.
|
|
|
|
For rdata with only absolute names:
|
|
Return < 0 if self < other in the DNSSEC ordering, 0 if self
|
|
== other, and > 0 if self > other.
|
|
For rdata with at least one relative names:
|
|
The rdata sorts before any rdata with only absolute names.
|
|
When compared with another relative rdata, all names are
|
|
made absolute as if they were relative to the root, as the
|
|
proper origin is not available. While this creates a stable
|
|
ordering, it is NOT guaranteed to be the DNSSEC ordering.
|
|
In the future, all ordering comparisons for rdata with
|
|
relative names will be disallowed.
|
|
"""
|
|
try:
|
|
our = self.to_digestable()
|
|
our_relative = False
|
|
except dns.name.NeedAbsoluteNameOrOrigin:
|
|
if _allow_relative_comparisons:
|
|
our = self.to_digestable(dns.name.root)
|
|
our_relative = True
|
|
try:
|
|
their = other.to_digestable()
|
|
their_relative = False
|
|
except dns.name.NeedAbsoluteNameOrOrigin:
|
|
if _allow_relative_comparisons:
|
|
their = other.to_digestable(dns.name.root)
|
|
their_relative = True
|
|
if _allow_relative_comparisons:
|
|
if our_relative != their_relative:
|
|
# For the purpose of comparison, all rdata with at least one
|
|
# relative name is less than an rdata with only absolute names.
|
|
if our_relative:
|
|
return -1
|
|
else:
|
|
return 1
|
|
elif our_relative or their_relative:
|
|
raise NoRelativeRdataOrdering
|
|
if our == their:
|
|
return 0
|
|
elif our > their:
|
|
return 1
|
|
else:
|
|
return -1
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Rdata):
|
|
return False
|
|
if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
|
|
return False
|
|
our_relative = False
|
|
their_relative = False
|
|
try:
|
|
our = self.to_digestable()
|
|
except dns.name.NeedAbsoluteNameOrOrigin:
|
|
our = self.to_digestable(dns.name.root)
|
|
our_relative = True
|
|
try:
|
|
their = other.to_digestable()
|
|
except dns.name.NeedAbsoluteNameOrOrigin:
|
|
their = other.to_digestable(dns.name.root)
|
|
their_relative = True
|
|
if our_relative != their_relative:
|
|
return False
|
|
return our == their
|
|
|
|
def __ne__(self, other):
|
|
if not isinstance(other, Rdata):
|
|
return True
|
|
if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
|
|
return True
|
|
return not self.__eq__(other)
|
|
|
|
def __lt__(self, other):
|
|
if (
|
|
not isinstance(other, Rdata)
|
|
or self.rdclass != other.rdclass
|
|
or self.rdtype != other.rdtype
|
|
):
|
|
return NotImplemented
|
|
return self._cmp(other) < 0
|
|
|
|
def __le__(self, other):
|
|
if (
|
|
not isinstance(other, Rdata)
|
|
or self.rdclass != other.rdclass
|
|
or self.rdtype != other.rdtype
|
|
):
|
|
return NotImplemented
|
|
return self._cmp(other) <= 0
|
|
|
|
def __ge__(self, other):
|
|
if (
|
|
not isinstance(other, Rdata)
|
|
or self.rdclass != other.rdclass
|
|
or self.rdtype != other.rdtype
|
|
):
|
|
return NotImplemented
|
|
return self._cmp(other) >= 0
|
|
|
|
def __gt__(self, other):
|
|
if (
|
|
not isinstance(other, Rdata)
|
|
or self.rdclass != other.rdclass
|
|
or self.rdtype != other.rdtype
|
|
):
|
|
return NotImplemented
|
|
return self._cmp(other) > 0
|
|
|
|
def __hash__(self):
|
|
return hash(self.to_digestable(dns.name.root))
|
|
|
|
@classmethod
|
|
def from_text(
|
|
cls,
|
|
rdclass: dns.rdataclass.RdataClass,
|
|
rdtype: dns.rdatatype.RdataType,
|
|
tok: dns.tokenizer.Tokenizer,
|
|
origin: Optional[dns.name.Name] = None,
|
|
relativize: bool = True,
|
|
relativize_to: Optional[dns.name.Name] = None,
|
|
) -> "Rdata":
|
|
raise NotImplementedError # pragma: no cover
|
|
|
|
@classmethod
|
|
def from_wire_parser(
|
|
cls,
|
|
rdclass: dns.rdataclass.RdataClass,
|
|
rdtype: dns.rdatatype.RdataType,
|
|
parser: dns.wire.Parser,
|
|
origin: Optional[dns.name.Name] = None,
|
|
) -> "Rdata":
|
|
raise NotImplementedError # pragma: no cover
|
|
|
|
def replace(self, **kwargs: Any) -> "Rdata":
|
|
"""
|
|
Create a new Rdata instance based on the instance replace was
|
|
invoked on. It is possible to pass different parameters to
|
|
override the corresponding properties of the base Rdata.
|
|
|
|
Any field specific to the Rdata type can be replaced, but the
|
|
*rdtype* and *rdclass* fields cannot.
|
|
|
|
Returns an instance of the same Rdata subclass as *self*.
|
|
"""
|
|
|
|
# Get the constructor parameters.
|
|
parameters = inspect.signature(self.__init__).parameters # type: ignore
|
|
|
|
# Ensure that all of the arguments correspond to valid fields.
|
|
# Don't allow rdclass or rdtype to be changed, though.
|
|
for key in kwargs:
|
|
if key == "rdcomment":
|
|
continue
|
|
if key not in parameters:
|
|
raise AttributeError(
|
|
"'{}' object has no attribute '{}'".format(
|
|
self.__class__.__name__, key
|
|
)
|
|
)
|
|
if key in ("rdclass", "rdtype"):
|
|
raise AttributeError(
|
|
"Cannot overwrite '{}' attribute '{}'".format(
|
|
self.__class__.__name__, key
|
|
)
|
|
)
|
|
|
|
# Construct the parameter list. For each field, use the value in
|
|
# kwargs if present, and the current value otherwise.
|
|
args = (kwargs.get(key, getattr(self, key)) for key in parameters)
|
|
|
|
# Create, validate, and return the new object.
|
|
rd = self.__class__(*args)
|
|
# The comment is not set in the constructor, so give it special
|
|
# handling.
|
|
rdcomment = kwargs.get("rdcomment", self.rdcomment)
|
|
if rdcomment is not None:
|
|
object.__setattr__(rd, "rdcomment", rdcomment)
|
|
return rd
|
|
|
|
# Type checking and conversion helpers. These are class methods as
|
|
# they don't touch object state and may be useful to others.
|
|
|
|
@classmethod
|
|
def _as_rdataclass(cls, value):
|
|
return dns.rdataclass.RdataClass.make(value)
|
|
|
|
@classmethod
|
|
def _as_rdatatype(cls, value):
|
|
return dns.rdatatype.RdataType.make(value)
|
|
|
|
@classmethod
|
|
def _as_bytes(
|
|
cls,
|
|
value: Any,
|
|
encode: bool = False,
|
|
max_length: Optional[int] = None,
|
|
empty_ok: bool = True,
|
|
) -> bytes:
|
|
if encode and isinstance(value, str):
|
|
bvalue = value.encode()
|
|
elif isinstance(value, bytearray):
|
|
bvalue = bytes(value)
|
|
elif isinstance(value, bytes):
|
|
bvalue = value
|
|
else:
|
|
raise ValueError("not bytes")
|
|
if max_length is not None and len(bvalue) > max_length:
|
|
raise ValueError("too long")
|
|
if not empty_ok and len(bvalue) == 0:
|
|
raise ValueError("empty bytes not allowed")
|
|
return bvalue
|
|
|
|
@classmethod
|
|
def _as_name(cls, value):
|
|
# Note that proper name conversion (e.g. with origin and IDNA
|
|
# awareness) is expected to be done via from_text. This is just
|
|
# a simple thing for people invoking the constructor directly.
|
|
if isinstance(value, str):
|
|
return dns.name.from_text(value)
|
|
elif not isinstance(value, dns.name.Name):
|
|
raise ValueError("not a name")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_uint8(cls, value):
|
|
if not isinstance(value, int):
|
|
raise ValueError("not an integer")
|
|
if value < 0 or value > 255:
|
|
raise ValueError("not a uint8")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_uint16(cls, value):
|
|
if not isinstance(value, int):
|
|
raise ValueError("not an integer")
|
|
if value < 0 or value > 65535:
|
|
raise ValueError("not a uint16")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_uint32(cls, value):
|
|
if not isinstance(value, int):
|
|
raise ValueError("not an integer")
|
|
if value < 0 or value > 4294967295:
|
|
raise ValueError("not a uint32")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_uint48(cls, value):
|
|
if not isinstance(value, int):
|
|
raise ValueError("not an integer")
|
|
if value < 0 or value > 281474976710655:
|
|
raise ValueError("not a uint48")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_int(cls, value, low=None, high=None):
|
|
if not isinstance(value, int):
|
|
raise ValueError("not an integer")
|
|
if low is not None and value < low:
|
|
raise ValueError("value too small")
|
|
if high is not None and value > high:
|
|
raise ValueError("value too large")
|
|
return value
|
|
|
|
@classmethod
|
|
def _as_ipv4_address(cls, value):
|
|
if isinstance(value, str):
|
|
# call to check validity
|
|
dns.ipv4.inet_aton(value)
|
|
return value
|
|
elif isinstance(value, bytes):
|
|
return dns.ipv4.inet_ntoa(value)
|
|
else:
|
|
raise ValueError("not an IPv4 address")
|
|
|
|
@classmethod
|
|
def _as_ipv6_address(cls, value):
|
|
if isinstance(value, str):
|
|
# call to check validity
|
|
dns.ipv6.inet_aton(value)
|
|
return value
|
|
elif isinstance(value, bytes):
|
|
return dns.ipv6.inet_ntoa(value)
|
|
else:
|
|
raise ValueError("not an IPv6 address")
|
|
|
|
@classmethod
|
|
def _as_bool(cls, value):
|
|
if isinstance(value, bool):
|
|
return value
|
|
else:
|
|
raise ValueError("not a boolean")
|
|
|
|
@classmethod
|
|
def _as_ttl(cls, value):
|
|
if isinstance(value, int):
|
|
return cls._as_int(value, 0, dns.ttl.MAX_TTL)
|
|
elif isinstance(value, str):
|
|
return dns.ttl.from_text(value)
|
|
else:
|
|
raise ValueError("not a TTL")
|
|
|
|
@classmethod
|
|
def _as_tuple(cls, value, as_value):
|
|
try:
|
|
# For user convenience, if value is a singleton of the list
|
|
# element type, wrap it in a tuple.
|
|
return (as_value(value),)
|
|
except Exception:
|
|
# Otherwise, check each element of the iterable *value*
|
|
# against *as_value*.
|
|
return tuple(as_value(v) for v in value)
|
|
|
|
# Processing order
|
|
|
|
@classmethod
|
|
def _processing_order(cls, iterable):
|
|
items = list(iterable)
|
|
random.shuffle(items)
|
|
return items
|
|
|
|
|
|
@dns.immutable.immutable
|
|
class GenericRdata(Rdata):
|
|
|
|
"""Generic Rdata Class
|
|
|
|
This class is used for rdata types for which we have no better
|
|
implementation. It implements the DNS "unknown RRs" scheme.
|
|
"""
|
|
|
|
__slots__ = ["data"]
|
|
|
|
def __init__(self, rdclass, rdtype, data):
|
|
super().__init__(rdclass, rdtype)
|
|
self.data = data
|
|
|
|
def to_text(
|
|
self,
|
|
origin: Optional[dns.name.Name] = None,
|
|
relativize: bool = True,
|
|
**kw: Dict[str, Any]
|
|
) -> str:
|
|
return r"\# %d " % len(self.data) + _hexify(self.data, **kw)
|
|
|
|
@classmethod
|
|
def from_text(
|
|
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
|
|
):
|
|
token = tok.get()
|
|
if not token.is_identifier() or token.value != r"\#":
|
|
raise dns.exception.SyntaxError(r"generic rdata does not start with \#")
|
|
length = tok.get_int()
|
|
hex = tok.concatenate_remaining_identifiers(True).encode()
|
|
data = binascii.unhexlify(hex)
|
|
if len(data) != length:
|
|
raise dns.exception.SyntaxError("generic rdata hex data has wrong length")
|
|
return cls(rdclass, rdtype, data)
|
|
|
|
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
|
|
file.write(self.data)
|
|
|
|
@classmethod
|
|
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
|
|
return cls(rdclass, rdtype, parser.get_remaining())
|
|
|
|
|
|
_rdata_classes: Dict[
|
|
Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any
|
|
] = {}
|
|
_module_prefix = "dns.rdtypes"
|
|
|
|
|
|
def get_rdata_class(rdclass, rdtype):
|
|
cls = _rdata_classes.get((rdclass, rdtype))
|
|
if not cls:
|
|
cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype))
|
|
if not cls:
|
|
rdclass_text = dns.rdataclass.to_text(rdclass)
|
|
rdtype_text = dns.rdatatype.to_text(rdtype)
|
|
rdtype_text = rdtype_text.replace("-", "_")
|
|
try:
|
|
mod = import_module(
|
|
".".join([_module_prefix, rdclass_text, rdtype_text])
|
|
)
|
|
cls = getattr(mod, rdtype_text)
|
|
_rdata_classes[(rdclass, rdtype)] = cls
|
|
except ImportError:
|
|
try:
|
|
mod = import_module(".".join([_module_prefix, "ANY", rdtype_text]))
|
|
cls = getattr(mod, rdtype_text)
|
|
_rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls
|
|
_rdata_classes[(rdclass, rdtype)] = cls
|
|
except ImportError:
|
|
pass
|
|
if not cls:
|
|
cls = GenericRdata
|
|
_rdata_classes[(rdclass, rdtype)] = cls
|
|
return cls
|
|
|
|
|
|
def from_text(
|
|
rdclass: Union[dns.rdataclass.RdataClass, str],
|
|
rdtype: Union[dns.rdatatype.RdataType, str],
|
|
tok: Union[dns.tokenizer.Tokenizer, str],
|
|
origin: Optional[dns.name.Name] = None,
|
|
relativize: bool = True,
|
|
relativize_to: Optional[dns.name.Name] = None,
|
|
idna_codec: Optional[dns.name.IDNACodec] = None,
|
|
) -> Rdata:
|
|
"""Build an rdata object from text format.
|
|
|
|
This function attempts to dynamically load a class which
|
|
implements the specified rdata class and type. If there is no
|
|
class-and-type-specific implementation, the GenericRdata class
|
|
is used.
|
|
|
|
Once a class is chosen, its from_text() class method is called
|
|
with the parameters to this function.
|
|
|
|
If *tok* is a ``str``, then a tokenizer is created and the string
|
|
is used as its input.
|
|
|
|
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
|
|
|
|
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
|
|
|
|
*tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``.
|
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``), the
|
|
origin to use for relative names.
|
|
|
|
*relativize*, a ``bool``. If true, name will be relativized.
|
|
|
|
*relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
|
|
when relativizing names. If not set, the *origin* value will be used.
|
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
|
|
encoder/decoder to use if a tokenizer needs to be created. If
|
|
``None``, the default IDNA 2003 encoder/decoder is used. If a
|
|
tokenizer is not created, then the codec associated with the tokenizer
|
|
is the one that is used.
|
|
|
|
Returns an instance of the chosen Rdata subclass.
|
|
|
|
"""
|
|
if isinstance(tok, str):
|
|
tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec)
|
|
rdclass = dns.rdataclass.RdataClass.make(rdclass)
|
|
rdtype = dns.rdatatype.RdataType.make(rdtype)
|
|
cls = get_rdata_class(rdclass, rdtype)
|
|
with dns.exception.ExceptionWrapper(dns.exception.SyntaxError):
|
|
rdata = None
|
|
if cls != GenericRdata:
|
|
# peek at first token
|
|
token = tok.get()
|
|
tok.unget(token)
|
|
if token.is_identifier() and token.value == r"\#":
|
|
#
|
|
# Known type using the generic syntax. Extract the
|
|
# wire form from the generic syntax, and then run
|
|
# from_wire on it.
|
|
#
|
|
grdata = GenericRdata.from_text(
|
|
rdclass, rdtype, tok, origin, relativize, relativize_to
|
|
)
|
|
rdata = from_wire(
|
|
rdclass, rdtype, grdata.data, 0, len(grdata.data), origin
|
|
)
|
|
#
|
|
# If this comparison isn't equal, then there must have been
|
|
# compressed names in the wire format, which is an error,
|
|
# there being no reasonable context to decompress with.
|
|
#
|
|
rwire = rdata.to_wire()
|
|
if rwire != grdata.data:
|
|
raise dns.exception.SyntaxError(
|
|
"compressed data in "
|
|
"generic syntax form "
|
|
"of known rdatatype"
|
|
)
|
|
if rdata is None:
|
|
rdata = cls.from_text(
|
|
rdclass, rdtype, tok, origin, relativize, relativize_to
|
|
)
|
|
token = tok.get_eol_as_token()
|
|
if token.comment is not None:
|
|
object.__setattr__(rdata, "rdcomment", token.comment)
|
|
return rdata
|
|
|
|
|
|
def from_wire_parser(
|
|
rdclass: Union[dns.rdataclass.RdataClass, str],
|
|
rdtype: Union[dns.rdatatype.RdataType, str],
|
|
parser: dns.wire.Parser,
|
|
origin: Optional[dns.name.Name] = None,
|
|
) -> Rdata:
|
|
"""Build an rdata object from wire format
|
|
|
|
This function attempts to dynamically load a class which
|
|
implements the specified rdata class and type. If there is no
|
|
class-and-type-specific implementation, the GenericRdata class
|
|
is used.
|
|
|
|
Once a class is chosen, its from_wire() class method is called
|
|
with the parameters to this function.
|
|
|
|
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
|
|
|
|
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
|
|
|
|
*parser*, a ``dns.wire.Parser``, the parser, which should be
|
|
restricted to the rdata length.
|
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
|
|
then names will be relativized to this origin.
|
|
|
|
Returns an instance of the chosen Rdata subclass.
|
|
"""
|
|
|
|
rdclass = dns.rdataclass.RdataClass.make(rdclass)
|
|
rdtype = dns.rdatatype.RdataType.make(rdtype)
|
|
cls = get_rdata_class(rdclass, rdtype)
|
|
with dns.exception.ExceptionWrapper(dns.exception.FormError):
|
|
return cls.from_wire_parser(rdclass, rdtype, parser, origin)
|
|
|
|
|
|
def from_wire(
|
|
rdclass: Union[dns.rdataclass.RdataClass, str],
|
|
rdtype: Union[dns.rdatatype.RdataType, str],
|
|
wire: bytes,
|
|
current: int,
|
|
rdlen: int,
|
|
origin: Optional[dns.name.Name] = None,
|
|
) -> Rdata:
|
|
"""Build an rdata object from wire format
|
|
|
|
This function attempts to dynamically load a class which
|
|
implements the specified rdata class and type. If there is no
|
|
class-and-type-specific implementation, the GenericRdata class
|
|
is used.
|
|
|
|
Once a class is chosen, its from_wire() class method is called
|
|
with the parameters to this function.
|
|
|
|
*rdclass*, an ``int``, the rdataclass.
|
|
|
|
*rdtype*, an ``int``, the rdatatype.
|
|
|
|
*wire*, a ``bytes``, the wire-format message.
|
|
|
|
*current*, an ``int``, the offset in wire of the beginning of
|
|
the rdata.
|
|
|
|
*rdlen*, an ``int``, the length of the wire-format rdata
|
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
|
|
then names will be relativized to this origin.
|
|
|
|
Returns an instance of the chosen Rdata subclass.
|
|
"""
|
|
parser = dns.wire.Parser(wire, current)
|
|
with parser.restrict_to(rdlen):
|
|
return from_wire_parser(rdclass, rdtype, parser, origin)
|
|
|
|
|
|
class RdatatypeExists(dns.exception.DNSException):
|
|
"""DNS rdatatype already exists."""
|
|
|
|
supp_kwargs = {"rdclass", "rdtype"}
|
|
fmt = (
|
|
"The rdata type with class {rdclass:d} and rdtype {rdtype:d} "
|
|
+ "already exists."
|
|
)
|
|
|
|
|
|
def register_type(
|
|
implementation: Any,
|
|
rdtype: int,
|
|
rdtype_text: str,
|
|
is_singleton: bool = False,
|
|
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
|
|
) -> None:
|
|
"""Dynamically register a module to handle an rdatatype.
|
|
|
|
*implementation*, a module implementing the type in the usual dnspython
|
|
way.
|
|
|
|
*rdtype*, an ``int``, the rdatatype to register.
|
|
|
|
*rdtype_text*, a ``str``, the textual form of the rdatatype.
|
|
|
|
*is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
|
|
RRsets of the type can have only one member.)
|
|
|
|
*rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
|
|
it applies to all classes.
|
|
"""
|
|
|
|
rdtype = dns.rdatatype.RdataType.make(rdtype)
|
|
existing_cls = get_rdata_class(rdclass, rdtype)
|
|
if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
|
|
raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
|
|
_rdata_classes[(rdclass, rdtype)] = getattr(
|
|
implementation, rdtype_text.replace("-", "_")
|
|
)
|
|
dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)
|