| Total Complexity | 41 |
| Total Lines | 181 |
| Duplicated Lines | 11.6 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like menderbot.antlr_generated.PythonLexerBase often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import sys |
||
| 2 | from enum import Enum |
||
| 3 | from pkgutil import get_data |
||
| 4 | from typing import Dict, List, Optional, TextIO |
||
| 5 | |||
| 6 | from antlr4 import InputStream, Lexer, Parser, Token, TokenStream |
||
| 7 | from antlr4.Token import CommonToken |
||
| 8 | |||
| 9 | |||
| 10 | class PythonLexerBase(Lexer): |
||
| 11 | tab_size = 4 |
||
| 12 | |||
| 13 | def __init__(self, input_stream: InputStream, output: TextIO = sys.stdout): |
||
| 14 | super().__init__(input_stream, output) |
||
| 15 | |||
| 16 | self.__opened: int = 0 |
||
| 17 | self.__indents: List[int] = [] |
||
| 18 | self.__first_tokens_ind: int = 0 |
||
| 19 | self.__last_tokens_ind: int = 0 |
||
| 20 | self.__buffer: List[Optional[Token]] = [None for _ in range(32)] |
||
| 21 | self.__last_token: Optional[Token] = None |
||
| 22 | |||
| 23 | def emitToken(self, token: Token) -> None: |
||
| 24 | self._token = token |
||
| 25 | |||
| 26 | if self.__buffer[self.__first_tokens_ind] is not None: |
||
| 27 | self.__last_tokens_ind = self.__inc_token_ind(self.__last_tokens_ind) |
||
| 28 | |||
| 29 | if self.__last_tokens_ind == self.__first_tokens_ind: |
||
| 30 | # Enlarge buffer |
||
| 31 | new_array: List[Optional[Token]] = [None for _ in range(len(self.__buffer) * 2)] |
||
| 32 | dest_ind = len(new_array) - (len(self.__buffer) - self.__first_tokens_ind) |
||
| 33 | |||
| 34 | new_array[0:self.__first_tokens_ind] = self.__buffer[0:self.__first_tokens_ind] |
||
| 35 | new_array[dest_ind:dest_ind + len(self.__buffer) - self.__first_tokens_ind] = \ |
||
| 36 | self.__buffer[self.__first_tokens_ind:len(self.__buffer)] |
||
| 37 | |||
| 38 | self.__first_tokens_ind = dest_ind |
||
| 39 | self.__buffer = new_array |
||
| 40 | |||
| 41 | self.__buffer[self.__last_tokens_ind] = token |
||
| 42 | self.__last_token = token |
||
| 43 | |||
| 44 | def nextToken(self) -> Token: |
||
| 45 | # Check if the end-of-file is ahead and there are still some DEDENTS expected. |
||
| 46 | if self._input.LA(1) == Token.EOF and self.__indents: |
||
| 47 | if (self.__buffer[self.__last_tokens_ind] is not None or |
||
| 48 | self.__buffer[self.__last_tokens_ind].type != self.LINE_BREAK): |
||
| 49 | # First emit an extra line break that serves as the end of the statement. |
||
| 50 | self.__emit_token_type(self.LINE_BREAK) |
||
| 51 | |||
| 52 | # Now emit as much DEDENT tokens as needed. |
||
| 53 | while self.__indents: |
||
| 54 | self.__emit_token_type(self.DEDENT) |
||
| 55 | self.__indents.pop() |
||
| 56 | |||
| 57 | next_token: Token = super().nextToken() |
||
| 58 | |||
| 59 | if self.__buffer[self.__first_tokens_ind] is None: |
||
| 60 | return next_token |
||
| 61 | |||
| 62 | result: Token = self.__buffer[self.__first_tokens_ind] |
||
| 63 | self.__buffer[self.__first_tokens_ind] = None |
||
| 64 | |||
| 65 | if self.__first_tokens_ind != self.__last_tokens_ind: |
||
| 66 | self.__first_tokens_ind = self.__inc_token_ind(self.__first_tokens_ind) |
||
| 67 | |||
| 68 | return result |
||
| 69 | |||
| 70 | def HandleNewLine(self) -> None: |
||
| 71 | self.__emit_token_type_on_channel(self.NEWLINE, self.HIDDEN, self.text) |
||
| 72 | |||
| 73 | c = self._input.LA(1) |
||
| 74 | if c == -1 : |
||
| 75 | return |
||
| 76 | next_char: str = chr(c) |
||
| 77 | |||
| 78 | # Process whitespaces in handle_spaces |
||
| 79 | if next_char != ' ' and next_char != '\t' and self.__is_not_new_line_or_comment(next_char): |
||
| 80 | self.__process_new_line(0) |
||
| 81 | |||
| 82 | def HandleSpaces(self) -> None: |
||
| 83 | next_char: str = chr(self._input.LA(1)) |
||
| 84 | |||
| 85 | if ((self.__last_token is None or self.__last_token.type == self.NEWLINE) and |
||
| 86 | self.__is_not_new_line_or_comment(next_char)): |
||
| 87 | # Calculates the indentation of the provided spaces, taking the |
||
| 88 | # following rules into account: |
||
| 89 | # |
||
| 90 | # "Tabs are replaced (from left to right) by one to eight spaces |
||
| 91 | # such that the total number of characters up to and including |
||
| 92 | # the replacement is a multiple of eight [...]" |
||
| 93 | # |
||
| 94 | # -- https://docs.python.org/3.1/reference/lexical_analysis.html#indentation |
||
| 95 | |||
| 96 | indent: int = 0 |
||
| 97 | |||
| 98 | for i in range(0, len(self.text)): |
||
| 99 | indent += PythonLexerBase.tab_size - indent % PythonLexerBase.tab_size if self.text[i] == '\t' else 1 |
||
| 100 | |||
| 101 | self.__process_new_line(indent) |
||
| 102 | |||
| 103 | self.__emit_token_type_on_channel(self.WS, self.HIDDEN, self.text) |
||
| 104 | |||
| 105 | def IncIndentLevel(self) -> None: |
||
| 106 | self.__opened += 1 |
||
| 107 | |||
| 108 | def DecIndentLevel(self) -> None: |
||
| 109 | if self.__opened: |
||
| 110 | self.__opened -= 1 |
||
| 111 | |||
| 112 | def __is_not_new_line_or_comment(self, next_char: str) -> bool: |
||
| 113 | return (self.__opened == 0 and |
||
| 114 | next_char != '\r' and |
||
| 115 | next_char != '\n' and |
||
| 116 | next_char != '\f' and |
||
| 117 | next_char != '#') |
||
| 118 | |||
| 119 | def __process_new_line(self, indent: int) -> None: |
||
| 120 | self.__emit_token_type(self.LINE_BREAK) |
||
| 121 | |||
| 122 | previous: int = 0 if not self.__indents else self.__indents[-1] |
||
| 123 | |||
| 124 | if indent > previous: |
||
| 125 | self.__indents.append(indent) |
||
| 126 | self.__emit_token_type(self.INDENT) |
||
| 127 | else: |
||
| 128 | # Possibly emit more than 1 DEDENT token. |
||
| 129 | while self.__indents and self.__indents[-1] > indent: |
||
| 130 | self.__emit_token_type(self.DEDENT) |
||
| 131 | self.__indents.pop() |
||
| 132 | |||
| 133 | def __inc_token_ind(self, ind: int) -> int: |
||
| 134 | return (ind + 1) % len(self.__buffer) |
||
| 135 | |||
| 136 | def __emit_token_type(self, token_type: int) -> None: |
||
| 137 | self.__emit_token_type_on_channel(token_type, self.DEFAULT_TOKEN_CHANNEL, "") |
||
| 138 | |||
| 139 | def __emit_token_type_on_channel(self, token_type: int, channel: int, text: str) -> None: |
||
| 140 | char_index: int = self.getCharIndex() |
||
| 141 | token: CommonToken = CommonToken( |
||
| 142 | self._tokenFactorySourcePair, |
||
| 143 | token_type, |
||
| 144 | channel, |
||
| 145 | char_index - len(text), |
||
| 146 | char_index - 1) |
||
| 147 | token.line = self.line |
||
| 148 | token.column = self.column |
||
| 149 | token.text = text |
||
| 150 | |||
| 151 | self.emitToken(token) |
||
| 152 | |||
| 153 | |||
| 154 | class PythonVersion(Enum): |
||
| 155 | Autodetect = 0 |
||
| 156 | Python2 = 2 |
||
| 157 | Python3 = 3 |
||
| 158 | |||
| 159 | |||
| 160 | class PythonParserBase(Parser): |
||
| 161 | def __init__(self, input_stream: TokenStream): |
||
| 162 | super().__init__(input_stream) |
||
| 163 | self.__version = PythonVersion.Autodetect |
||
| 164 | |||
| 165 | @property |
||
| 166 | def version(self) -> PythonVersion: |
||
| 167 | return self.__version |
||
| 168 | |||
| 169 | @version.setter |
||
| 170 | def version(self, version): |
||
| 171 | if isinstance(version, PythonVersion): |
||
| 172 | self.__version = version |
||
| 173 | else: |
||
| 174 | self.__version = PythonVersion(version) |
||
| 175 | |||
| 176 | def _check_version(self, version: int) -> bool: |
||
| 177 | return self.__version == PythonVersion.Autodetect or version == self.__version.value |
||
| 178 | |||
| 179 | def set_version(self, required_version: int) -> None: |
||
| 180 | self.__version = PythonVersion(required_version) |
||
| 181 |