Passed
Push — develop ( 9e1b8b...5d0b21 )
by Dean
03:00
created

Upgrade.current()   B

Complexity

Conditions 6

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 34.8262

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 20
ccs 1
cts 14
cp 0.0714
rs 8
cc 6
crap 34.8262
1 1
from plugin.core.constants import PLUGIN_VERSION_BASE, PLUGIN_VERSION_BRANCH
2 1
from plugin.core.environment import Environment
3 1
from plugin.managers.message import MessageManager
4 1
from plugin.models import Message
5 1
from plugin.modules.core.base import Module
6
7 1
import json
8 1
import logging
9 1
import os
10
11 1
VERSION_PATH = os.path.abspath(os.path.join(Environment.path.code, '..', '.version'))
12
13 1
log = logging.getLogger(__name__)
14
15
16 1
class Upgrade(Module):
17 1
    __key__ = 'upgrade'
18
19 1
    def __init__(self):
20 1
        pass
21
22 1
    def start(self):
23
        log.debug('Checking for version change...')
24
25
        current = self.current()
26
        installed = self.installed()
27
28
        # New install
29
        if current is None:
30
            self.update(installed)
31
            return
32
33
        # Validate
34
        changed, valid = self.validate(current, installed)
35
36
        if not changed or not valid:
37
            return
38
39
        # Update current version
40
        self.update(installed)
41
42 1
    @classmethod
43
    def validate(cls, current, installed):
44
        # No change
45
        if current == installed:
46
            log.debug('No change')
47
            return False, True
48
49
        # Downgrade
50
        if tuple(installed['version']) < tuple(current['version']):
51
            cls.log(
52
                logging.WARN, Message.Code.DowngradeUnclean,
53
                message='Unclean downgrade detected, see http://bit.ly/TFPx100201 for more details',
54
                description="An unclean plugin downgrade can cause a number of errors, see http://bit.ly/TFPx100201 for more details"
55
            )
56
            return True, False
57
58
        # Upgrade
59
        if tuple(installed['version']) > tuple(current['version']):
60
            cls.log(
61
                logging.INFO, Message.Code.UpgradePerformed,
62
                message='Plugin has been updated to v%s' % cls.format(installed)
63
            )
64
            return True, True
65
66
        # Invalid
67
        return False, False
68
69 1
    @classmethod
70 1
    def log(cls, level, code, message, description=None):
71
        # Write to log file
72
        log.log(level, message)
73
74
        # Store message in database
75
        MessageManager.get.from_message(
76
            level, message,
77
78
            code=code,
79
            description=(description or message)
80
        )
81
82 1
    @classmethod
83
    def current(cls):
84
        if not os.path.exists(VERSION_PATH):
85
            return None
86
87
        try:
88
            with open(VERSION_PATH, 'rb') as fp:
89
                data = json.load(fp)
90
91
            if 'version' not in data:
92
                return None
93
94
            if 'branch' not in data:
95
                return None
96
97
            return data
98
        except Exception, ex:
99
            log.warn('Unable to read current version: %s', ex, exc_info=True)
100
101
        return None
102
103 1
    @classmethod
104
    def installed(cls):
105
        return {
106
            'version': list(PLUGIN_VERSION_BASE),
107
            'branch': PLUGIN_VERSION_BRANCH
108
        }
109
110 1
    @classmethod
111
    def update(cls, data):
112
        try:
113
            with open(VERSION_PATH, 'wb') as fp:
114
                json.dump(data, fp)
115
116
            log.debug('Version updated to %r', data)
117
            return True
118
        except Exception, ex:
119
            log.warn('Unable to write current version: %s', ex, exc_info=True)
120
121
        return False
122
123 1
    @staticmethod
124
    def format(data):
125
        if 'version' not in data:
126
            return None
127
128
        if 'branch' not in data:
129
            return None
130
131
        return '%s-%s' % (
132
            '.'.join([
133
                str(x) for x in data['version']
134
            ]),
135
            data['branch']
136
        )
137