src.heuristics   A
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 167
Duplicated Lines 79.64 %

Importance

Changes 0
Metric Value
eloc 106
dl 133
loc 167
rs 9.52
c 0
b 0
f 0
wmc 36

13 Methods

Rating   Name   Duplication   Size   Complexity  
A Heuristic.match() 5 5 2
A HeuristicsRegistry._load_from_yaml() 23 23 4
A HeuristicsRegistry._get_heuristic_in_grouping() 17 17 5
A HeuristicsRegistry._get_heuristic_in_all_groupings() 13 13 4
A HeuristicsRegistry.check_heuristic() 5 5 1
A HeuristicsRegistry.__init__() 2 2 1
A Heuristic.__init__() 9 9 2
A Heuristic.__repr__() 3 3 1
A HeuristicsRegistry.all_tables() 5 5 3
A HeuristicsRegistry.get_heuristic() 5 5 2
B HeuristicsRegistry.load_heuristics() 14 14 6
A HeuristicsRegistry.groupings() 4 4 2
A Heuristic.__call__() 12 12 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# MIT License
2
#
3
# Copyright (c) 2017 Matt Boyer
4
#
5
# Permission is hereby granted, free of charge, to any person obtaining a copy
6
# of this software and associated documentation files (the "Software"), to deal
7
# in the Software without restriction, including without limitation the rights
8
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
# copies of the Software, and to permit persons to whom the Software is
10
# furnished to do so, subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be included in
13
# all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
# SOFTWARE.
22
23
import os
24
from pkg_resources import resource_stream
25
import re
26
import yaml
27
28
from . import _LOGGER
29
from . import PROJECT_NAME, USER_YAML_PATH, BUILTIN_YAML
30
31
32 View Code Duplication
class Heuristic(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
33
    def __init__(self, magic, offset, grouping, table, name_regex=None):
34
        self._offset = offset
35
        self._table_name = table
36
        self._grouping = grouping
37
        self._magic_re = re.compile(magic)
38
39
        self._table_name_regex = None
40
        if name_regex is not None:
41
            self._table_name_regex = re.compile(name_regex)
42
43
    def __repr__(self):
44
        return "<Record heuristic for table \"{0}\"({1})>".format(
45
            self._table_name, self._grouping
46
        )
47
48
    def __call__(self, freeblock_bytes):
49
        # We need to unwind the full set of matches so we can traverse it
50
        # in reverse
51
        all_matches = [
52
            match for match in self._magic_re.finditer(freeblock_bytes)
53
        ]
54
        for magic_match in all_matches[::-1]:
55
            header_start = magic_match.start() - self._offset
56
            if header_start < 0:
57
                _LOGGER.debug("Header start outside of freeblock!")
58
                break
59
            yield header_start
60
61
    def match(self, table):
62
        if self._table_name_regex is not None:
63
            return bool(self._table_name_regex.match(table.name))
64
        else:
65
            return self._table_name == table.name
66
67
68 View Code Duplication
class HeuristicsRegistry(dict):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
69
70
    def __init__(self):
71
        super().__init__(self)
72
73
    @staticmethod
74
    def check_heuristic(magic, offset):
75
        assert(isinstance(magic, bytes))
76
        assert(isinstance(offset, int))
77
        assert(offset >= 0)
78
79
    def _load_from_yaml(self, yaml_string):
80
        if isinstance(yaml_string, bytes):
81
            yaml_string = yaml_string.decode('utf-8')
82
83
        raw_yaml = yaml.load(yaml_string, Loader=yaml.CLoader)
84
        # TODO Find a more descriptive term than "table grouping"
85
        for table_grouping, tables in raw_yaml.items():
86
            _LOGGER.debug(
87
                "Loading YAML data for table grouping \"%s\"",
88
                table_grouping
89
            )
90
            grouping_tables = {}
91
            for table_name, table_props in tables.items():
92
                self.check_heuristic(
93
                    table_props['magic'], table_props['offset']
94
                )
95
                grouping_tables[table_name] = Heuristic(
96
                    table_props['magic'], table_props['offset'],
97
                    table_grouping, table_name,
98
                    name_regex=table_props.get('name_regex')
99
                )
100
                _LOGGER.debug("Loaded heuristics for \"%s\"", table_name)
101
            self[table_grouping] = grouping_tables
102
103
    def load_heuristics(self):
104
        with resource_stream(PROJECT_NAME, BUILTIN_YAML) as builtin:
105
            try:
106
                self._load_from_yaml(builtin.read())
107
            except KeyError as ex:
108
                raise SystemError("Malformed builtin magic file") from ex
109
110
        if not os.path.exists(USER_YAML_PATH):
111
            return
112
        with open(USER_YAML_PATH, 'r', encoding='UTF8') as user_yaml:
113
            try:
114
                self._load_from_yaml(user_yaml.read())
115
            except KeyError as ex:
116
                raise SystemError("Malformed user magic file") from ex
117
118
    @property
119
    def groupings(self):
120
        for db_name in sorted(self.keys()):
121
            yield db_name
122
123
    @property
124
    def all_tables(self):
125
        for db in self.groupings:
126
            for table in self[db].keys():
127
                yield (db, table)
128
129
    def _get_heuristic_in_grouping(self, db_table, grouping):
130
        heuristic_name = None
131
        if grouping in self:
132
            for heuristic_name in self[grouping]:
133
                if self[grouping][heuristic_name].match(db_table):
134
                    break
135
            else:
136
                # We haven't found a match within the grouping... what
137
                # shall we do?
138
                raise ValueError("No heuristic found")
139
140
            return self[grouping][heuristic_name]
141
142
        else:
143
            raise ValueError(
144
                "No heuristic defined for table \"%s\" in grouping \"%s\"" %
145
                (db_table.name, grouping)
146
            )
147
148
    def _get_heuristic_in_all_groupings(self, db_table):
149
        grouping = None
150
        heuristic_name = None
151
        for grouping, heuristic_name in self.all_tables:
152
            if self[grouping][heuristic_name].match(db_table):
153
                break
154
        else:
155
            raise ValueError(
156
                "No heuristic defined for table \"%s\" in any grouping" %
157
                (db_table.name,)
158
            )
159
160
        return self[grouping][heuristic_name]
161
162
    def get_heuristic(self, db_table, grouping):
163
        if grouping is not None:
164
            return self._get_heuristic_in_grouping(db_table, grouping)
165
        else:
166
            return self._get_heuristic_in_all_groupings(db_table)
167