operating_sites.OperatingSitesUpdater.add_data()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 65
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 1
dl 0
loc 65
rs 9.9
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import re
2
from typing import ClassVar, Final
3
4
from bs4 import BeautifulSoup
5
from sqlalchemy import (
6
    Boolean,
7
    Column,
8
    Integer,
9
    MetaData,
10
    SmallInteger,
11
    String,
12
    Table,
13
    text,
14
)
15
16
from src.new_data_processors.common import (
17
    DataDownloader,
18
)
19
from src.new_data_processors.common_excel_processors import ExcelProcessorSimple
20
21
22
def _translate_operating_site_type(operating_site_type: str) -> str:
23
    dictionary = {
24
        "állomás": "station",
25
        "egyéb": "other",
26
        "elágazás": "spur",
27
        "eldöntő pont": "decision_point",
28
        "forgalmi kitérő": "crossover",
29
        "iparvágány": "industrial_track",
30
        "iparvágány kiágazás": "industrial_track_spur",
31
        "keresztezés": "crossing",
32
        "megálló-elágazóhely": "spur_halt",
33
        "megálló-rakodóhely": "loading_halt",
34
        "megállóhely": "halt",
35
        "megállóhely-iparvágány kiágazás": "industrial_track_spur_halt",
36
        "nem definiált": "undefined",
37
        "országhatár": "border_crossing",
38
        "pályavasúti határpont": "railway_border_crossing",
39
        "rakodóhely": "loading_point",
40
        "vágányfonódás-elágazás": "gauntlet_spur",
41
    }
42
    return dictionary[operating_site_type]
43
44
45
class OperatingSitesUpdater(ExcelProcessorSimple, DataDownloader):
46
    TABLE_NAME: ClassVar[str] = "operating_sites"
47
    database_metadata: ClassVar[MetaData] = MetaData()
48
49
    table: ClassVar[Table] = Table(
50
        TABLE_NAME,
51
        database_metadata,
52
        Column(name="name", type_=String(255), nullable=False),
53
        Column(name="name_shortened", type_=String(255)),
54
        Column(name="name_short", type_=String(255)),
55
        Column(name="operator", type_=String(255)),
56
        Column(name="type", type_=String(255)),
57
        Column(
58
            name="code_uic", type_=Integer, nullable=False, index=True, primary_key=True
59
        ),
60
        Column(name="code_telegraph", type_=String(4)),
61
        Column(name="category_passenger", type_=SmallInteger),
62
        Column(name="category_freight", type_=SmallInteger),
63
        Column(name="traffic_passenger", type_=Boolean),
64
        Column(name="traffic_freight", type_=Boolean),
65
        Column(name="terminus", type_=Boolean),
66
        Column(name="request_stop", type_=Boolean),
67
        Column(name="train_meeting", type_=Boolean),
68
        Column(name="open_to_train_operators", type_=Boolean),
69
    )
70
71
    def __init__(self) -> None:
72
        super().__init__()
73
74
        self.WEBSITE_DOMAIN: Final = "https://www.kapella2.hu"
75
        self.WEBSITE_URL: Final = (
76
            f"/ehuszfelulet/szolgalatihelyek?vizsgalt_idopont="
77
            f"{self.TODAY}&vizsgalt_idoszak_kezdo={self.TODAY}&vizsgalt_idoszak_veg={self.TODAY}"
78
        )
79
        self.INFRA_ID: int = NotImplemented
80
        self.INFRA_ID_URL: str = NotImplemented
81
        self.XLS_URL: str = NotImplemented
82
83
        self._data_to_process = self.get_data(self.WEBSITE_DOMAIN + self.WEBSITE_URL)
84
85
        self.logger.info(f"{self.__class__.__name__} initialized!")
86
87
    def get_data(self, url: str) -> bytes:
88
        splash_page_soup = self.get_splash_page(url)
89
        self.get_infra_id(splash_page_soup, url)
90
        list_page = self.download_list_page(url)
91
        return self.download_xls_file(list_page)
92
93
    def get_splash_page(self, url: str) -> BeautifulSoup:
94
        splash_page = super().get_data(url)
95
        splash_page_soup = BeautifulSoup(
96
            markup=splash_page,
97
            features="lxml",
98
        )
99
        return splash_page_soup
100
101
    def get_infra_id(self, splash_page_soup, url) -> None:
102
        try:
103
            select_tag = splash_page_soup.find(
104
                name="select",
105
                attrs={"name": "infra_id"},
106
            )
107
            if not select_tag:
108
                self.logger.critical(f"No `select` tag found on the splash page at {url}!")
109
                raise ValueError
110
        except ValueError as exception:
111
            self.logger.critical(exception)
112
            raise
113
        # future: report bug (false positive) to mypy developers
114
        self.INFRA_ID = int(select_tag.find("option")["value"])  # type: ignore
115
116
    def download_list_page(self, url: str) -> bytes:
117
        self.INFRA_ID_URL = f"&infra_id={self.INFRA_ID}"
118
        list_page = super().get_data(url + self.INFRA_ID_URL)
119
        return list_page
120
121
    def download_xls_file(self, list_page: bytes) -> bytes:
122
        self.XLS_URL = re.findall(
123
            pattern=r"/ehuszfelulet/excelexport\?id_xls=\w+",
124
            string=str(list_page),
125
        )[0]
126
        return super().get_data(self.WEBSITE_DOMAIN + self.XLS_URL)
127
128
    def rename_columns_manually(self) -> None:
129
        # future: report wrong display and copying of hyphen (e.g. Fil'akovo) to pandas and JetBrains developers
130
        self.data.rename(
131
            columns={
132
                "Hosszú név": "name",
133
                "Rövid név": "name_shortened",
134
                "Polgári név": "name_short",
135
                "Társaság": "operator",
136
                "Szolgálati hely típus": "type",
137
                "PLC kód": "code_uic",
138
                "Távíró kód": "code_telegraph",
139
                "Állomáskategória személyvonatok számára": "category_passenger",
140
                "Állomáskategória tehervonatok számára": "category_freight",
141
                "Személy szállításra megnyitva": "traffic_passenger",
142
                "Áru szállításra megnyitva": "traffic_freight",
143
                "Menetvonal kezdő/végpontja": "terminus",
144
                "Feltételes megállás lehetséges": "request_stop",
145
                "Vonattalálkozásra alkalmas": "train_meeting",
146
                "Szolg. hely nyílt": "open_to_train_operators",
147
            },
148
            inplace=True,
149
        )
150
151
    def correct_data_manually(self) -> None:
152
        self.data["type"] = self.data["type"].apply(
153
            lambda x: _translate_operating_site_type(str(x))
154
        )
155
        self.replace_code_uic_letters()
156
157
    def replace_code_uic_letters(self) -> None:
158
        country_codes_iso = ["HU", "AT", "SK", "UA", "RO", "RS", "HR", "SI"]
159
        for country_code_iso in country_codes_iso:
160
            country_code_uic = self.get_uic_code(country_code_iso)
161
            self.data["code_uic"] = self.data["code_uic"].str.replace(
162
                pat=country_code_iso,
163
                repl=country_code_uic,
164
            )
165
166
    def get_uic_code(self, country_code_iso: str) -> str:
167
        with self.database.engine.begin() as connection:
168
            query = """
169
                select code_uic
170
                from countries
171
                where code_iso = :country_code_iso
172
            """
173
            result = connection.execute(
174
                text(query),
175
                {"country_code_iso": country_code_iso},
176
            ).fetchone()
177
178
            try:
179
                assert result is not None
180
            except AssertionError as exception:
181
                self.logger.critical(exception)
182
                raise
183
            return str(result[0])
184
    
185
    def correct_boolean_values(self) -> None:
186
        boolean_columns = [
187
            "traffic_passenger",
188
            "traffic_freight",
189
            "terminus",
190
            "request_stop",
191
            "train_meeting",
192
            "open_to_train_operators",
193
        ]
194
        for column in boolean_columns:
195
            self.data[column] = self.data[column].apply(lambda x: x == "igen")
196
197
    def add_data(self) -> None:
198
        with self.database.engine.begin() as connection:
199
            queries = [
200
                """
201
                insert ignore into operating_sites (
202
                    name,
203
                    name_shortened,
204
                    name_short,
205
                    operator,
206
                    type,
207
                    code_uic,
208
                    code_telegraph,
209
                    category_passenger,
210
                    category_freight,
211
                    traffic_passenger,
212
                    traffic_freight,
213
                    terminus,
214
                    request_stop,
215
                    train_meeting,
216
                    open_to_train_operators
217
                )
218
                values (
219
                    :name,
220
                    :name_shortened,
221
                    :name_short,
222
                    :operator,
223
                    :type,
224
                    :code_uic,
225
                    :code_telegraph,
226
                    :category_passenger,
227
                    :category_freight,
228
                    :traffic_passenger,
229
                    :traffic_freight,
230
                    :terminus,
231
                    :request_stop,
232
                    :train_meeting,
233
                    :open_to_train_operators
234
                )
235
                """,
236
                """
237
                update operating_sites
238
                set
239
                    name = :name,
240
                    name_shortened = :name_shortened,
241
                    name_short = :name_short,
242
                    operator = :operator,
243
                    type = :type,
244
                    code_telegraph = :code_telegraph,
245
                    category_passenger = :category_passenger,
246
                    category_freight = :category_freight,
247
                    traffic_passenger = :traffic_passenger,
248
                    traffic_freight = :traffic_freight,
249
                    terminus = :terminus,
250
                    request_stop = :request_stop,
251
                    train_meeting = :train_meeting,
252
                    open_to_train_operators = :open_to_train_operators
253
                where code_uic = :code_uic
254
                """,
255
            ]
256
257
            for index, row in self.data.iterrows():
258
                for query in queries:
259
                    connection.execute(
260
                        text(query),
261
                        row.to_dict(),
262
                    )
263