dnslink   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 93
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 37
dl 0
loc 93
ccs 33
cts 33
cp 1
rs 10
c 0
b 0
f 0
wmc 17

2 Functions

Rating   Name   Duplication   Size   Complexity  
D _get_and_parse_record() 0 51 12
A resolve() 0 32 5
1
"""Python implementation of DNSLink protocol."""
2
3
# pylint: disable=multiple-statements
4
5 1
import dns.resolver
6
7
8 1
def resolve(domain, protocol=None, depth=16, resolver=None):
9
    """
10
    Resolve a DNSLink TXT record.
11
12
    :param str domain: a domain to resolve
13
    :param str protocol: a record protocol
14
    :param int depth: a recursion depth
15
    :param dns.resolver.Resolver: a DNSPython resolver
16
17
    :return: the DNSLink records
18
    :rtype: list[str]
19
    """
20
21
    # With `_dnslink` subdomain
22
23 1
    if not domain.startswith('_dnslink.'): name = '_dnslink.' + domain
24 1
    else: name = domain
25
26 1
    records = _get_and_parse_record(name, protocol, depth, resolver)
27 1
    if records: return records
28
29
    # Without `_dnslink` subdomain
30
31 1
    if domain.startswith('_dnslink.'): name = domain[9:]
32 1
    else: name = domain
33
34 1
    records = _get_and_parse_record(name, protocol, depth, resolver)
35 1
    if records: return records
36
37
    # Not found
38
39 1
    return []
40
41
42 1
def _get_and_parse_record(domain, protocol=None, depth=16, resolver=None):
43
    """
44
    Get and parse DNSLink TXT record.
45
46
    :param str domain: a domain to resolve
47
    :param str protocol: a record protocol
48
    :param int depth: a recursion depth
49
    :param dns.resolver.Resolver: a DNSPython resolver
50
51
    :return: the DNSLink records
52
    :rtype: list[str]
53
    """
54
55 1
    if not isinstance(resolver, dns.resolver.Resolver):
56 1
        resolver = dns.resolver.get_default_resolver()
57
58 1
    try:
59 1
        answers = resolver.query(domain, 'TXT')
60 1
    except dns.exception.DNSException:
61 1
        return []
62
63 1
    records = []
64
65 1
    for rdata in answers:
66 1
        for txt in rdata.strings:
67 1
            record = txt.decode('utf-8')
68
69
            # Not valid TXT record
70 1
            if not record.count('=') == 1:
71 1
                continue
72
73 1
            content = record.split('/', 3)
74
75
            # Not valid DNSLink record
76 1
            if not len(content) >= 3 or not content[0] == 'dnslink=':
77 1
                continue
78
79
            # Chaining record
80 1
            if content[1] == 'dnslink' and depth > 1:
81 1
                records.extend([
82
                    record + ('/' + content[3] if len(content) > 3 else '')
83
                    for record
84
                    in resolve(content[2], protocol, depth - 1, resolver)
85
                ])
86
87
            # Normal record
88 1
            elif protocol in (content[1], None):
89 1
                content[0] = ''
90 1
                records.append('/'.join(content))
91
92
    return records
93