sopel.modules.safety.url_handler()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 80
Code Lines 66

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 66
dl 0
loc 80
rs 0
c 0
b 0
f 0
cc 21
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like sopel.modules.safety.url_handler() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding=utf-8
2
"""
3
safety.py - Alerts about malicious URLs
4
Copyright © 2014, Elad Alfassa, <[email protected]>
5
Licensed under the Eiffel Forum License 2.
6
7
This module uses virustotal.com
8
"""
9
from __future__ import unicode_literals, absolute_import, print_function, division
10
11
import logging
12
import os.path
13
import re
14
import sys
15
import time
16
17
import requests
18
19
from sopel.config.types import StaticSection, ValidatedAttribute, ListAttribute
20
from sopel.formatting import color, bold
21
from sopel.module import OP
22
import sopel.tools
23
24
try:
25
    # This is done separately from the below version if/else because JSONDecodeError
26
    # didn't appear until Python 3.5, but Sopel claims support for 3.3+
27
    # Redo this whole block of nonsense when dropping py2/old py3 support
28
    from json import JSONDecodeError as InvalidJSONResponse
29
except ImportError:
30
    InvalidJSONResponse = ValueError
31
32
if sys.version_info.major > 2:
33
    unicode = str
34
    from urllib.request import urlretrieve
35
    from urllib.parse import urlparse
36
else:
37
    from urllib import urlretrieve
38
    from urlparse import urlparse
39
40
41
LOGGER = logging.getLogger(__name__)
42
43
vt_base_api_url = 'https://www.virustotal.com/vtapi/v2/url/'
44
malware_domains = set()
45
known_good = []
46
47
48
class SafetySection(StaticSection):
49
    enabled_by_default = ValidatedAttribute('enabled_by_default', bool, default=True)
50
    """Whether to enable URL safety in all channels where it isn't explicitly disabled."""
51
    known_good = ListAttribute('known_good')
52
    """List of "known good" domains to ignore."""
53
    vt_api_key = ValidatedAttribute('vt_api_key')
54
    """Optional VirusTotal API key (improves malicious URL detection)."""
55
56
57
def configure(config):
58
    """
59
    | name | example | purpose |
60
    | ---- | ------- | ------- |
61
    | enabled\\_by\\_default | True | Enable URL safety in all channels where it isn't explicitly disabled. |
62
    | known\\_good | sopel.chat,dftba.net | List of "known good" domains to ignore. |
63
    | vt\\_api\\_key | 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef | Optional VirusTotal API key to improve malicious URL detection |
64
    """
65
    config.define_section('safety', SafetySection)
66
    config.safety.configure_setting(
67
        'enabled_by_default',
68
        "Enable URL safety in channels that don't specifically disable it?",
69
    )
70
    config.safety.configure_setting(
71
        'known_good',
72
        'Enter any domains to whitelist',
73
    )
74
    config.safety.configure_setting(
75
        'vt_api_key',
76
        "Optionally, enter a VirusTotal API key to improve malicious URL "
77
        "protection.\nOtherwise, only the Malwarebytes DB will be used."
78
    )
79
80
81
def setup(bot):
82
    bot.config.define_section('safety', SafetySection)
83
84
    if 'safety_cache' not in bot.memory:
85
        bot.memory['safety_cache'] = sopel.tools.SopelMemory()
86
    for item in bot.config.safety.known_good:
87
        known_good.append(re.compile(item, re.I))
88
89
    loc = os.path.join(bot.config.homedir, 'malwaredomains.txt')
90
    if os.path.isfile(loc):
91
        if os.path.getmtime(loc) < time.time() - 24 * 60 * 60 * 7:
92
            # File exists but older than one week — update it
93
            _download_malwaredomains_db(loc)
94
    else:
95
        _download_malwaredomains_db(loc)
96
    with open(loc, 'r') as f:
97
        for line in f:
98
            clean_line = unicode(line).strip().lower()
99
            if clean_line != '':
100
                malware_domains.add(clean_line)
101
102
103
def shutdown(bot):
104
    try:
105
        del bot.memory['safety_cache']
106
    except KeyError:
107
        pass
108
109
110
def _download_malwaredomains_db(path):
111
    url = 'https://mirror1.malwaredomains.com/files/justdomains'
112
    LOGGER.info('Downloading malwaredomains db from %s', url)
113
    urlretrieve(url, path)
114
115
116
@sopel.module.rule(r'(?u).*(https?://\S+).*')
117
@sopel.module.priority('high')
118
def url_handler(bot, trigger):
119
    """Checks for malicious URLs"""
120
    check = True    # Enable URL checking
121
    strict = False  # Strict mode: kick on malicious URL
122
    positives = 0   # Number of engines saying it's malicious
123
    total = 0       # Number of total engines
124
    use_vt = True   # Use VirusTotal
125
    check = bot.config.safety.enabled_by_default
126
    if check is None:
127
        # If not set, assume default
128
        check = True
129
    # DB overrides config:
130
    setting = bot.db.get_channel_value(trigger.sender, 'safety')
131
    if setting is not None:
132
        if setting == 'off':
133
            return  # Not checking
134
        elif setting in ['on', 'strict', 'local', 'local strict']:
135
            check = True
136
        if setting == 'strict' or setting == 'local strict':
137
            strict = True
138
        if setting == 'local' or setting == 'local strict':
139
            use_vt = False
140
141
    if not check:
142
        return  # Not overridden by DB, configured default off
143
144
    try:
145
        netloc = urlparse(trigger.group(1)).netloc
146
    except ValueError:
147
        return  # Invalid IPv6 URL
148
149
    if any(regex.search(netloc) for regex in known_good):
150
        return  # Whitelisted
151
152
    apikey = bot.config.safety.vt_api_key
153
    try:
154
        if apikey is not None and use_vt:
155
            payload = {'resource': unicode(trigger),
156
                       'apikey': apikey,
157
                       'scan': '1'}
158
159
            if trigger not in bot.memory['safety_cache']:
160
                r = requests.post(vt_base_api_url + 'report', data=payload)
161
                r.raise_for_status()
162
                result = r.json()
163
                age = time.time()
164
                data = {'positives': result['positives'],
165
                        'total': result['total'],
166
                        'age': age}
167
                bot.memory['safety_cache'][trigger] = data
168
                if len(bot.memory['safety_cache']) > 1024:
169
                    _clean_cache(bot)
170
            else:
171
                print('using cache')
172
                result = bot.memory['safety_cache'][trigger]
173
            positives = result['positives']
174
            total = result['total']
175
    except requests.exceptions.RequestException:
176
        LOGGER.debug('[VirusTotal] Error obtaining response.', exc_info=True)
177
        pass  # Ignoring exceptions with VT so MalwareDomains will always work
178
    except InvalidJSONResponse:
179
        LOGGER.debug('[VirusTotal] Malformed response (invalid JSON).', exc_info=True)
180
        pass  # Ignoring exceptions with VT so MalwareDomains will always work
181
182
    if unicode(netloc).lower() in malware_domains:
183
        # malwaredomains is more trustworthy than some VT engines
184
        # therefore it gets a weight of 10 engines when calculating confidence
185
        positives += 10
186
        total += 10
187
188
    if positives > 1:
189
        # Possibly malicious URL detected!
190
        confidence = '{}%'.format(round((positives / total) * 100))
191
        msg = 'link posted by %s is possibly malicious ' % bold(trigger.nick)
192
        msg += '(confidence %s - %s/%s)' % (confidence, positives, total)
193
        bot.say('[' + bold(color('WARNING', 'red')) + '] ' + msg)
194
        if strict:
195
            bot.kick(trigger.nick, trigger.sender, 'Posted a malicious link')
196
197
198
@sopel.module.commands('safety')
199
def toggle_safety(bot, trigger):
200
    """Set safety setting for channel"""
201
    if not trigger.admin and bot.channels[trigger.sender].privileges[trigger.nick] < OP:
202
        bot.reply('Only channel operators can change safety settings')
203
        return
204
    allowed_states = ['strict', 'on', 'off', 'local', 'local strict']
205
    if not trigger.group(2) or trigger.group(2).lower() not in allowed_states:
206
        options = ' / '.join(allowed_states)
207
        bot.reply('Available options: %s' % options)
208
        return
209
210
    channel = trigger.sender.lower()
211
    bot.db.set_channel_value(channel, 'safety', trigger.group(2).lower())
212
    bot.reply('Safety is now set to "%s" on this channel' % trigger.group(2))
213
214
215
# Clean the cache every day
216
# Code above also calls this if there are too many cache entries
217
@sopel.module.interval(24 * 60 * 60)
218
def _clean_cache(bot):
219
    """Cleans up old entries in URL cache"""
220
    # TODO: probably should use locks here, to make sure stuff doesn't explode
221
    oldest_key_age = 0
222
    oldest_key = ''
223
    for key, data in sopel.tools.iteritems(bot.memory['safety_cache']):
224
        if data['age'] > oldest_key_age:
225
            oldest_key_age = data['age']
226
            oldest_key = key
227
    if oldest_key in bot.memory['safety_cache']:
228
        del bot.memory['safety_cache'][oldest_key]
229