Passed
Push — master ( 305ba4...2a620b )
by Máté
01:46
created

mapper.Mapper.__init__()   A

Complexity

Conditions 3

Size

Total Lines 69
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 33
nop 2
dl 0
loc 69
rs 9.0879
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import contextlib
2
from datetime import datetime
3
import json
4
from typing import Any, Final, List
5
6
# future: remove the comment below when stubs for the library below are available
7
import geojson  # type: ignore
8
9
# future: remove the comment below when stubs for the library below are available
10
from overpy import Area, Element, Node, Overpass, Relation, Result, Way  # type: ignore
11
12
# noinspection PyPackageRequirements
13
from plum import dispatch
14
15
# future: remove the comment below when stubs for the library below are available
16
from pydeck import Deck, Layer, ViewState  # type: ignore
17
from pyproj import Geod
18
import requests
19
from requests import HTTPError
20
21
# future: remove the comment below when stubs for the library below are available
22
import shapely  # type: ignore
23
from shapely import distance, from_geojson, get_coordinates, intersection, snap
24
25
# future: remove the comment below when stubs for the library below are available
26
from shapely.geometry import shape  # type: ignore
27
28
# future: remove the comment below when stubs for the library below are available
29
from shapely.ops import split  # type: ignore
30
from sqlalchemy.sql import text
31
from tenacity import (
32
    retry,
33
    retry_if_exception_type,
34
    stop_after_attempt,
35
    wait_exponential,
36
)
37
38
from src.OSM_processors.map_data_helpers import (
39
    convert_way_to_gejson,
40
    convert_way_to_linestring,
41
    extract_operating_site_polygons,
42
    further_in_same_direction,
43
    get_distance_percentage_between_milestones,
44
    get_length,
45
    get_milestone_location,
46
    get_milestones,
47
    get_nearest_milestone,
48
    get_percentage,
49
    get_tolerance_for_linestring_length,
50
    merge_ways_into_linestring,
51
    point_on_line_if_you_squint,
52
    remove_irrelevant_duplicate_milestones,
53
    split_lines,
54
)
55
from src.SR import SR
56
from src.new_data_processors.common import DataProcessor
57
58
59
def get_nearest_milestones(
60
    milestones: list[Node],
61
    metre_post: int,
62
    sr: SR,
63
    on_ways: list[Way],
64
) -> list[Node]:
65
    if sr.line == "146":
66
        milestones = remove_irrelevant_duplicate_milestones(milestones, sr)
67
68
    nearest_milestones: list[Node] = []
69
    while len(nearest_milestones) < 2:
70
        seemingly_nearest_milestone = get_nearest_milestone(
71
            exact_location=metre_post,
72
            milestones=milestones,
73
            sr=sr,
74
            on_ways=on_ways,
75
        )
76
        if get_milestone_location(seemingly_nearest_milestone) == metre_post:
77
            return [seemingly_nearest_milestone]
78
        if not nearest_milestones or not (
79
            further_in_same_direction(
80
                milestone=seemingly_nearest_milestone,
81
                current_nearest_milestones=nearest_milestones,
82
                metre_post=metre_post,
83
            )
84
        ):
85
            nearest_milestones.append(seemingly_nearest_milestone)
86
        milestones.remove(seemingly_nearest_milestone)
87
    return nearest_milestones
88
89
90
def get_linestring_between_points(
91
    lines_split_first: shapely.MultiLineString,
92
    lines_split_second: shapely.MultiLineString,
93
    expected_length: int,
94
) -> shapely.LineString | shapely.Point:
95
    geod = Geod(ellps="WGS84")
96
    # future: report bug (false positive) to JetBrains developers
97
    # noinspection PyTypeChecker
98
    differences_from_expected_length: list[dict[str, int]] = []
99
    for i in (0, -1):
100
        for j in (0, -1):
101
            line_between_points = intersection(
102
                lines_split_first.geoms[i],
103
                lines_split_second.geoms[j],
104
            )
105
            length = get_length(geod=geod, linestring=line_between_points)
106
            difference_from_expected_length = int(abs(length - expected_length))
107
            differences_from_expected_length.append(
108
                {"length": int(length), "difference": difference_from_expected_length}
109
            )
110
            length_of_found_linestring_is_reasonable = (
111
                difference_from_expected_length
112
                <= expected_length
113
                * get_tolerance_for_linestring_length(expected_length)
114
            )
115
            if length_of_found_linestring_is_reasonable:
116
                return line_between_points
117
    found_linestring_accepted_as_point = expected_length < 50
118
    if found_linestring_accepted_as_point:
119
        return line_between_points
120
    assert differences_from_expected_length
121
    closest_length = sorted(
122
        differences_from_expected_length, key=lambda x: x["difference"]
123
    )[0]
124
    raise ValueError(
125
        f"Line between two points not found! "
126
        f"The closest length to the expected length of {expected_length} m was "
127
        f"{closest_length["length"]} m (±{closest_length["difference"]} m / "
128
        f"±{get_percentage(closest_length["difference"], expected_length)}%)!"
129
    )
130
131
132
class Mapper(DataProcessor):
133
    def __init__(self, show_lines_with_no_data: bool) -> None:
134
        super().__init__()
135
136
        self.TODAY_SIMULATED: Final = datetime(2024, 1, 18, 21, 59, 59)
137
        self.COLOR_TAG: Final = "line_color"
138
        self.QUERY_MAIN_PARAMETERS: Final = """
139
            [out:json];
140
            
141
            area["ISO3166-1"="HU"]
142
            // area["admin_level"="8"]["name"="Hegyeshalom"]
143
              -> .country;
144
              
145
        """
146
        self.OPERATORS: Final = ["MÁV", "GYSEV"]
147
        self.OPERATING_SITE_TAG_VALUES: Final = [  # type: ignore
148
            # TODO: uncomment lines below when implementing stations
149
            # "station",
150
            # "halt",
151
            # "yard",
152
            # "service_station",
153
            # "junction",
154
            # "crossover",
155
            # "spur_junction",
156
            # "site",
157
        ]
158
159
        self._api: Final = Overpass()
160
        self._dowload_session: Final = requests.Session()
161
162
        self.show_lines_with_no_data = show_lines_with_no_data
163
164
        self.query_operating_site_elements = (
165
            self.QUERY_MAIN_PARAMETERS
166
            + """
167
            (
168
        """
169
        )
170
        for value in self.OPERATING_SITE_TAG_VALUES:
171
            for operator in self.OPERATORS:
172
                self.query_operating_site_elements += f"""
173
                    node["operator"="{operator}"]["railway"="{value}"]["name"](area.country);
174
                    area["operator"="{operator}"]["railway"="{value}"]["name"](area.country);
175
                    relation["type"="multipolygon"]["operator"="{operator}"]["railway"="{value}"]["name"](area.country);
176
                """
177
            self.query_operating_site_elements += "\n"
178
        self.query_operating_site_elements += """
179
            );
180
            out;
181
        """
182
183
        self.query_final: str = (
184
            self.QUERY_MAIN_PARAMETERS
185
            # future: replace lines below when https://github.com/drolbr/Overpass-API/issues/146 is closed
186
            #     relation["route"="railway"]["ref"]["operator"~"(^MÁV(?=;))|((?<=;)MÁV(?=;))|((?<=;)MÁV$)"](area.country);
187
            #     relation["route"="railway"]["ref"]["operator"~"(^GYSEV(?=;))|((?<=;)GYSEV(?=;))|((?<=;)GYSEV$)"](area.country);
188
            + """
189
            (
190
                relation["route"="railway"]["ref"]["operator"~"MÁV"](area.country);
191
                relation["route"="railway"]["ref"]["operator"~"GYSEV"](area.country);
192
            );
193
            >>;
194
            out;
195
            
196
        """
197
        )
198
199
        self.osm_data_raw: dict = NotImplemented
200
        self.osm_data: Result = NotImplemented
201
        self.srs: list[SR] = []
202
        self.sr_ways: list[int] = []
203
204
    def run(self) -> None:
205
        self.download_osm_data()
206
        self.process_srs()
207
        self.visualise_srs()
208
209
    @retry(
210
        retry=retry_if_exception_type(ConnectionResetError),
211
        wait=wait_exponential(multiplier=1, min=4, max=10),
212
        stop=stop_after_attempt(2),
213
    )
214
    def download_osm_data(self) -> None:
215
        operating_site_elements = self.run_query(
216
            api=self._api,
217
            query_text=self.query_operating_site_elements,
218
        )
219
220
        node_polygons_query = self.QUERY_MAIN_PARAMETERS
221
        for node in operating_site_elements.nodes:
222
            node_polygons_query += f"""
223
                node({node.id}) -> .station;
224
                .station out;
225
                nwr(around.station:100)["landuse"="railway"];
226
                convert item ::geom=geom();
227
                out geom;
228
            """
229
        operating_site_node_polygons = geojson.loads(
230
            self.run_query_raw(
231
                api=self._api,
232
                query_text=node_polygons_query,
233
            )
234
        )
235
236
        self.download_final(
237
            node_polygons=operating_site_node_polygons,
238
            areas=operating_site_elements.areas,
239
            multipolygons=operating_site_elements.relations,
240
        )
241
242
    def run_query(self, api: Overpass, query_text: str) -> Result:
243
        self.logger.debug(f"Short query started...")
244
        result = api.query(query_text)
245
        self.logger.debug(f"...finished!")
246
        return result
247
248
    def run_query_raw(self, api: Overpass, query_text: str) -> bytes:
249
        url = api.url
250
        try:
251
            response = self._dowload_session.get(
252
                url=url,
253
                headers={
254
                    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
255
                    "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1.2 Safari/605.1.15"
256
                },
257
                data=query_text,
258
            )
259
            response.raise_for_status()
260
            self.logger.debug(f"File successfully downloaded from {url}!")
261
            return bytes(response.content)
262
        except HTTPError:
263
            self.logger.critical(f"Failed to download file from {url}!")
264
            raise
265
266
    def download_final(
267
        self,
268
        node_polygons: Any,
269
        areas: list[Area],
270
        multipolygons: list[Relation],
271
    ) -> None:
272
        self.add_node_poly_elements(node_polygons)
273
274
        operating_site_areas, operating_site_mp_relations = (
275
            extract_operating_site_polygons(
276
                areas=areas,
277
                multipolygons=multipolygons,
278
            )
279
        )
280
281
        for operating_site_area in operating_site_areas:
282
            self.query_final += f"""
283
            way({operating_site_area["element_id"]}) -> .operatingSite;
284
            """
285
            self.add_area_or_mp_relation_elements(operating_site_area)
286
287
        for operating_site_relation in operating_site_mp_relations:
288
            self.query_final += f"""
289
            relation({operating_site_relation["element_id"]});
290
            map_to_area -> .operatingSite;
291
            """
292
            self.add_area_or_mp_relation_elements(operating_site_relation)
293
294
        self.logger.debug(f"Long query started...")
295
        self.osm_data_raw = json.loads(
296
            self.run_query_raw(
297
                api=self._api,
298
                query_text=self.query_final,
299
            )
300
        )
301
        self.osm_data = Result.from_json(self.osm_data_raw)
302
        self.logger.debug(f"...finished!")
303
        pass
304
305
    def add_node_poly_elements(self, node_polygons: Any) -> None:
306
        for i, element in enumerate(node_polygons["elements"]):
307
            try:
308
                assert element["geometry"]
309
                operating_site_polygon_array = get_coordinates(
310
                    from_geojson(str(element["geometry"]))
311
                )
312
                operating_site_polygon = ""
313
                for coordinate in operating_site_polygon_array:
314
                    operating_site_polygon += f"{coordinate[1]} {coordinate[0]} "
315
                operating_site_polygon = operating_site_polygon.strip()
316
                try:
317
                    layer = node_polygons["elements"][i - 1]["tags"]["layer"]
318
                    self.query_final += f"""
319
                        (
320
                            way["railway"="rail"]["layer"="{layer}"](poly:"{operating_site_polygon}");
321
                            way["disused:railway"="rail"]["layer"="{layer}"](poly:"{operating_site_polygon}");
322
                            way["abandoned:railway"="rail"]["layer"="{layer}"](poly:"{operating_site_polygon}");
323
                        );"""
324
                except KeyError:
325
                    self.query_final += f"""
326
                        (
327
                            way["railway"="rail"][!"layer"](poly:"{operating_site_polygon}");
328
                            way["disused:railway"="rail"][!"layer"](poly:"{operating_site_polygon}");
329
                            way["abandoned:railway"="rail"][!"layer"](poly:"{operating_site_polygon}");
330
                            
331
                            way["railway"="rail"]["layer"="0"](poly:"{operating_site_polygon}");
332
                            way["disused:railway"="rail"]["layer"="0"](poly:"{operating_site_polygon}");
333
                            way["abandoned:railway"="rail"]["layer"="0"](poly:"{operating_site_polygon}");
334
                        );"""
335
                self.query_final += f"""
336
                    (._;>;);
337
                    out;
338
                    node(1);
339
                    out ids;
340
                """
341
            except KeyError:
342
                pass
343
344
    def add_area_or_mp_relation_elements(
345
        self, operating_site_area: dict[str, int | None]
346
    ) -> None:
347
        if operating_site_area["layer"]:
348
            self.query_final += f"""
349
                (
350
                    way["railway"="rail"]["layer"="{operating_site_area["layer"]}"](area.operatingSite);
351
                    way["disused:railway"="rail"]["layer"="{operating_site_area["layer"]}"](area.operatingSite);
352
                    way["abandoned:railway"="rail"]["layer"="{operating_site_area["layer"]}"](area.operatingSite);
353
                );
354
                """
355
        else:
356
            self.query_final += f"""
357
                (
358
                    way["railway"="rail"][!"layer"](area.operatingSite);
359
                    way["disused:railway"="rail"][!"layer"](area.operatingSite);
360
                    way["abandoned:railway"="rail"][!"layer"](area.operatingSite);
361
                    
362
                    way["railway"="rail"]["layer"="0"](area.operatingSite);
363
                    way["disused:railway"="rail"]["layer"="0"](area.operatingSite);
364
                    way["abandoned:railway"="rail"]["layer"="0"](area.operatingSite);
365
                );
366
                """
367
        self.query_final += f"""
368
            (._;>;);
369
            out;
370
            node(1);
371
            out ids;
372
        """
373
374
    def process_srs(self) -> None:
375
        self.get_all_srs_from_database()
376
377
        if self.show_lines_with_no_data:
378
            self.get_id_of_sr_main_track_ways()
379
380
    def get_all_srs_from_database(self) -> None:
381
        with self.database.engine.begin() as connection:
382
            # TODO: remove on_main_track filter when beginning station implementation
383
            # TODO: replace time filter with the line below in production
384
            #     time_from <= now() and (now() < time_to or time_to is null);
385
            query = """
386
            select *
387
            from speed_restrictions
388
            where
389
                on_main_track = 1 and
390
                time_from <= :now and (:now < time_to or time_to is null)
391
            order by line, metre_post_from, metre_post_to;
392
            """
393
            result = connection.execute(
394
                text(query),
395
                {"now": self.TODAY_SIMULATED},
396
            )
397
398
        for row in result:
399
            self.srs.append(
400
                # future: report bug (false positive) to mypy developers
401
                SR(  # type: ignore
402
                    *row[1:],
403
                    sr_id=row[0],
404
                )
405
            )
406
407
    def get_id_of_sr_main_track_ways(self) -> None:
408
        for sr in self.srs:
409
            if sr.on_main_track:
410
                try:
411
                    for relation in self.osm_data.relations:
412
                        with contextlib.suppress(KeyError):
413
                            if (
414
                                relation.tags["route"] == "railway"
415
                                and relation.tags["ref"].upper() == sr.line.upper()
416
                            ):
417
                                for member in relation.members:
418
                                    self.sr_ways.append(member.ref)
419
                                break
420
                    else:
421
                        raise ValueError(f"Relation with `ref={sr.line}` not found!")
422
                except ValueError as exception:
423
                    self.logger.warn(exception)
424
425
    def visualise_srs(self) -> None:
426
        features_to_visualise: list[geojson.Feature] = []
427
428
        self.add_all_nodes(features_to_visualise)
429
        self.add_all_ways(features_to_visualise)
430
431
        self.logger.info(f"Visualising {len(self.srs)} speed restrictions started...")
432
        notification_percentage_interval = 2
433
        notify_at_indexes: list[int] = []
434
        for i in range(1, int(100 / notification_percentage_interval) - 1):
435
            notify_at_index = int(
436
                len(self.srs) * (notification_percentage_interval / 100) * i
437
            )
438
            notify_at_indexes.append(notify_at_index + 1)
439
        # future: implement multithreading for the loop below
440
        for sr_index, sr in enumerate(self.srs):
441
            try:
442
                ways_of_line = self.get_ways_of_corresponding_line(sr)
443
                nodes_of_line = [node for way in ways_of_line for node in way.nodes]
444
                milestones_of_line = get_milestones(nodes=nodes_of_line)
445
446
                for j, sr_metre_post_boundary in enumerate(
447
                    (sr.metre_post_from, sr.metre_post_to)
448
                ):
449
                    milestones_of_line_copy = milestones_of_line.copy()
450
                    nearest_milestones = get_nearest_milestones(
451
                        milestones=milestones_of_line_copy,
452
                        metre_post=sr_metre_post_boundary,
453
                        sr=sr,
454
                        on_ways=ways_of_line,
455
                    )
456
                    if len(nearest_milestones) >= 2:
457
                        at_percentage_between_milestones = (
458
                            get_distance_percentage_between_milestones(
459
                                nearest_milestones=nearest_milestones,
460
                                metre_post_boundary=sr_metre_post_boundary,
461
                            )
462
                        )
463
                        way_of_lower_milestone, way_of_greater_milestone = (
464
                            # future: use kwargs when https://github.com/beartype/plum/issues/40 is fixed
465
                            self.get_ways_at_locations(nearest_milestones, ways_of_line)
466
                        )
467
                        ways_between_milestones = self.get_ways_between_milestones(
468
                            way_of_greater_milestone=way_of_greater_milestone,
469
                            way_of_lower_milestone=way_of_lower_milestone,
470
                            ways_of_line=ways_of_line,
471
                        )
472
                        merged_ways_between_milestones = merge_ways_into_linestring(
473
                            ways_between_milestones
474
                        )
475
476
                        lines_split_at_lower_milestone = split_lines(
477
                            line=merged_ways_between_milestones,
478
                            splitting_point=shapely.Point(
479
                                (
480
                                    float(nearest_milestones[0].lon),
481
                                    float(nearest_milestones[0].lat),
482
                                )
483
                            ),
484
                        )
485
                        lines_split_at_greater_milestone = split_lines(
486
                            line=merged_ways_between_milestones,
487
                            splitting_point=shapely.Point(
488
                                (
489
                                    float(nearest_milestones[-1].lon),
490
                                    float(nearest_milestones[-1].lat),
491
                                )
492
                            ),
493
                        )
494
                        length_between_milestones = abs(
495
                            int(
496
                                float(nearest_milestones[0].tags["railway:position"])
497
                                * 1000
498
                                - float(nearest_milestones[-1].tags["railway:position"])
499
                                * 1000
500
                            )
501
                        )
502
                        line_between_milestones = get_linestring_between_points(
503
                            lines_split_first=lines_split_at_lower_milestone,
504
                            lines_split_second=lines_split_at_greater_milestone,
505
                            expected_length=length_between_milestones,
506
                        )
507
508
                        coordinate_of_metre_post = line_between_milestones.interpolate(
509
                            distance=at_percentage_between_milestones,
510
                            normalized=True,
511
                        )
512
                        if distance(
513
                            coordinate_of_metre_post,
514
                            shapely.Point(
515
                                nearest_milestones[0].lon, nearest_milestones[0].lat
516
                            ),
517
                        ) > distance(
518
                            coordinate_of_metre_post,
519
                            shapely.Point(
520
                                nearest_milestones[-1].lon, nearest_milestones[-1].lat
521
                            ),
522
                        ):
523
                            coordinate_of_metre_post = (
524
                                line_between_milestones.interpolate(
525
                                    distance=1 - at_percentage_between_milestones,
526
                                    normalized=True,
527
                                )
528
                            )
529
                    else:
530
                        coordinate_of_metre_post = shapely.Point(
531
                            (
532
                                float(nearest_milestones[0].lon),
533
                                float(nearest_milestones[0].lat),
534
                            )
535
                        )
536
537
                    # future: init `metre_post_from_coordinates` and `metre_post_to_coordinates` in the constructor
538
                    if sr_metre_post_boundary == sr.metre_post_from:
539
                        sr.metre_post_from_coordinates = coordinate_of_metre_post  # type: ignore
540
                    else:
541
                        sr.metre_post_to_coordinates = coordinate_of_metre_post  # type: ignore
542
543
                # future: init `geometry` in the constructor
544
                sr.geometry = self.get_linestring_of_sr(sr, ways_of_line)  # type: ignore
545
            except (IndexError, ValueError, ZeroDivisionError) as exception:
546
                prepared_lines = [
547
                    "1",
548
                    "1d",
549
                    "8",
550
                    "9",
551
                    "17 (1)",
552
                    "17 (2)",
553
                    "18",
554
                    "30",
555
                    "113 (1)",
556
                    "113 (2)",
557
                    "146",
558
                ]
559
                if sr.line not in prepared_lines:
560
                    pass
561
                else:
562
                    assert sr.id
563
                    self.logger.critical(
564
                        f"Fatal error with SR #{sr.id[-8:]}: {exception}"
565
                    )
566
                    raise
567
            if sr_index in notify_at_indexes:
568
                self.logger.info(f"⏳ {int(sr_index / len(self.srs) * 100)}% done...")
569
            pass
570
        self.logger.info(f"...finished visualising speed restrictions!")
571
572
        feature_collection_to_visualise = geojson.FeatureCollection(
573
            features_to_visualise
574
        )
575
        self.export_map(feature_collection_to_visualise)
576
577
    def get_linestring_of_sr(
578
        self, sr: SR, ways_of_line: list[Way]
579
    ) -> shapely.LineString:
580
        way_of_metre_post_from, way_of_metre_post_to = self.get_ways_at_locations(
581
            # future: use kwargs when https://github.com/beartype/plum/issues/40 is fixed
582
            [
583
                # future: init `metre_post_from_coordinates` and `metre_post_to_coordinates` in the constructor
584
                sr.metre_post_from_coordinates,  # type: ignore
585
                sr.metre_post_to_coordinates,  # type: ignore
586
            ],
587
            ways_of_line,
588
        )
589
        ways_between_metre_posts = self.get_ways_between_milestones(
590
            way_of_greater_milestone=way_of_metre_post_to,
591
            way_of_lower_milestone=way_of_metre_post_from,
592
            ways_of_line=ways_of_line,
593
        )
594
        merged_ways_between_metre_posts = merge_ways_into_linestring(
595
            ways_between_metre_posts
596
        )
597
598
        snapping_tolerances_best_to_worst = [
599
            0.0004,
600
            0.0003,
601
            0.0005,
602
            0.0009,
603
            0.001,
604
            0.0015,
605
        ]
606
        return self.try_to_get_linestring_of_sr(
607
            merged_ways_between_metre_posts=merged_ways_between_metre_posts,
608
            snapping_tolerances=snapping_tolerances_best_to_worst,
609
            sr=sr,
610
        )
611
612
    def try_to_get_linestring_of_sr(
613
        self,
614
        merged_ways_between_metre_posts: shapely.LineString,
615
        snapping_tolerances: list[float],
616
        sr: SR,
617
    ) -> shapely.LineString:
618
        try:
619
            snapping_tolerance = snapping_tolerances.pop(0)
620
            split_lines_at_lower_metre_post = split_lines(
621
                line=merged_ways_between_metre_posts,
622
                splitting_point=snap(
623
                    geometry=sr.metre_post_from_coordinates,  # type: ignore
624
                    reference=merged_ways_between_metre_posts,
625
                    tolerance=snapping_tolerance,
626
                ),
627
            )
628
            split_lines_at_greater_metre_post = split_lines(
629
                line=merged_ways_between_metre_posts,
630
                splitting_point=snap(
631
                    geometry=sr.metre_post_to_coordinates,  # type: ignore
632
                    reference=merged_ways_between_metre_posts,
633
                    tolerance=snapping_tolerance,
634
                ),
635
            )
636
            linestring_of_sr = get_linestring_between_points(
637
                lines_split_first=split_lines_at_lower_metre_post,
638
                lines_split_second=split_lines_at_greater_metre_post,
639
                expected_length=abs(sr.metre_post_from - sr.metre_post_to),
640
            )
641
            return linestring_of_sr
642
        except ValueError:
643
            if snapping_tolerances:
644
                return self.try_to_get_linestring_of_sr(
645
                    merged_ways_between_metre_posts, snapping_tolerances, sr
646
                )
647
            else:
648
                raise
649
650
    def get_ways_of_corresponding_line(self, sr: SR) -> list[Way]:
651
        relation = self.get_corresponding_relation(sr)
652
        way_ids = [way.ref for way in relation.members]
653
        ways = [way for way in self.osm_data.ways if way.id in way_ids]
654
        return ways
655
656
    @dispatch
657
    # future: make `nearest_milestones` a two-element tuple?
658
    def get_ways_at_locations(
659
        self, locations: List[Node], ways_to_search_in: List[Way]
660
    ) -> tuple[Way, Way]:
661
        way_of_lower_milestone: Way | None = None
662
        way_of_greater_milestone: Way | None = None
663
664
        for way in ways_to_search_in:
665
            for node in way.nodes:
666
                if node == locations[0]:
667
                    way_of_lower_milestone = way
668
                elif node == locations[-1]:
669
                    way_of_greater_milestone = way
670
671
                if way_of_lower_milestone and way_of_greater_milestone:
672
                    return way_of_lower_milestone, way_of_greater_milestone
673
        if not way_of_lower_milestone:
674
            self.logger.critical(
675
                f"Way of https://www.openstreetmap.org/node/{locations[0].id} "
676
                f"at {locations[0].lon}, {locations[0].lat} not found!"
677
            )
678
        if not way_of_greater_milestone:
679
            self.logger.critical(
680
                f"Way of https://www.openstreetmap.org/node/{locations[-1].id} "
681
                f"at {locations[-1].lon}, {locations[-1].lat} not found!"
682
            )
683
        raise ValueError
684
685
    # future: request mypy support from plum developers
686
    @dispatch  # type: ignore
687
    def get_ways_at_locations(
688
        self, locations: List[shapely.Point], ways_to_search_in: List[Way]
689
    ) -> tuple[Way, Way]:
690
        way_of_lower_metre_post: Way | None = None
691
        way_of_greater_metre_post: Way | None = None
692
693
        for way in ways_to_search_in:
694
            way_line = convert_way_to_linestring(way)
695
            if point_on_line_if_you_squint(point=locations[0], line=way_line):
696
                way_of_lower_metre_post = way
697
            if point_on_line_if_you_squint(point=locations[-1], line=way_line):
698
                way_of_greater_metre_post = way
699
700
            if way_of_lower_metre_post and way_of_greater_metre_post:
701
                return way_of_lower_metre_post, way_of_greater_metre_post
702
        if not way_of_lower_metre_post:
703
            self.logger.critical(f"Way of point at {locations[0].wkt} not found!")
704
        if not way_of_greater_metre_post:
705
            self.logger.critical(f"Way of point at {locations[-1].wkt} not found!")
706
        raise ValueError
707
708
    def get_ways_between_milestones(
709
        self,
710
        way_of_greater_milestone: Way,
711
        way_of_lower_milestone: Way,
712
        ways_of_line: list[Way],
713
    ) -> list[Way]:
714
        if way_of_lower_milestone is way_of_greater_milestone:
715
            return [way_of_lower_milestone]
716
717
        ways_of_line_copy = ways_of_line.copy()
718
        ways_of_line_copy.remove(way_of_lower_milestone)
719
        neighbouring_ways_of_lower_milestone: tuple[list[Way], list[Way]] = (
720
            [way_of_lower_milestone],
721
            [way_of_lower_milestone],
722
        )
723
724
        toggle = False
725
        return self.add_neighboring_ways(
726
            collection=neighbouring_ways_of_lower_milestone,
727
            ways_to_search_in=ways_of_line_copy,
728
            destination_way=way_of_greater_milestone,
729
            toggle=toggle,
730
        )
731
732
    def add_neighboring_ways(
733
        self,
734
        collection: tuple[list[Way], list[Way]],
735
        ways_to_search_in: list[Way],
736
        destination_way: Way,
737
        toggle: bool,
738
        one_side_is_dead_end: bool = False,
739
    ) -> list[Way]:
740
        for way in ways_to_search_in:
741
            found_neighbor_way = (collection[toggle][-1].nodes[0] in way.nodes) or (
742
                collection[toggle][-1].nodes[-1] in way.nodes
743
            )
744
            if found_neighbor_way:
745
                collection[toggle].append(way)
746
                if way is destination_way:
747
                    return collection[toggle]
748
                ways_to_search_in.remove(way)
749
                if not one_side_is_dead_end:
750
                    ways_to_search_in.reverse()
751
                    toggle = not toggle
752
                return self.add_neighboring_ways(
753
                    collection,
754
                    ways_to_search_in,
755
                    destination_way,
756
                    toggle,
757
                )
758
        if one_side_is_dead_end:
759
            raise ValueError(
760
                f"Couldn't reach destination_way (https://openstreetmap.org/way/{destination_way.id}) "
761
                f"from way_of_lower_milestone (https://openstreetmap.org/way/{collection[0][0].id})!"
762
            )
763
        else:
764
            one_side_is_dead_end = True
765
            ways_to_search_in.reverse()
766
            toggle = not toggle
767
            return self.add_neighboring_ways(
768
                collection,
769
                ways_to_search_in,
770
                destination_way,
771
                toggle,
772
                one_side_is_dead_end,
773
            )
774
775
    def add_all_ways(self, features_to_visualise: list[geojson.Feature]) -> None:
776
        self.logger.info(f"Adding all ways started...")
777
        for way in self.osm_data.ways:
778
            way_line = convert_way_to_gejson(way)
779
            way.tags |= {
780
                self.COLOR_TAG: (
781
                    [255, 255, 255] if way.id in self.sr_ways else [65, 65, 65]
782
                )
783
            }
784
785
            feature = geojson.Feature(
786
                geometry=way_line,
787
                properties=way.tags,
788
            )
789
            features_to_visualise.append(feature)
790
        self.logger.info(f"...finished!")
791
792
    def add_all_nodes(self, features_to_visualise: list[geojson.Feature]) -> None:
793
        self.logger.info(f"Adding all nodes started...")
794
        for node in self.osm_data.nodes:
795
            if node.id != 1:
796
                point = geojson.Point((float(node.lon), float(node.lat)))
797
                node.tags |= {self.COLOR_TAG: [0, 0, 0, 0]}
798
799
                feature = geojson.Feature(
800
                    geometry=point,
801
                    properties=node.tags,
802
                )
803
                features_to_visualise.append(feature)
804
        self.logger.info(f"...finished!")
805
806
    def export_map(self, feature_collection: geojson.FeatureCollection) -> None:
807
        geojson_layer = Layer(
808
            "GeoJsonLayer",
809
            data=feature_collection,
810
            pickable=True,
811
            line_width_min_pixels=2,
812
            get_line_color=self.COLOR_TAG,
813
            get_fill_color=[0, 0, 0],
814
        )
815
        view_state = ViewState(
816
            latitude=47.180833,
817
            longitude=19.503056,
818
            zoom=7,
819
        )
820
        deck = Deck(
821
            layers=[geojson_layer],
822
            initial_view_state=view_state,
823
        )
824
        self.logger.debug(f"Exporting map started...")
825
        deck.to_html(f"data/04_exported/map_pydeck_{self.TODAY}.html")
826
        self.logger.debug(f"...finished!")
827
828
    def get_corresponding_relation(self, sr: SR) -> Relation:
829
        try:
830
            relation = [
831
                relation
832
                for relation in self.osm_data.relations
833
                if relation.tags["ref"].upper() == sr.line.upper()
834
            ][0]
835
            return relation
836
        except IndexError:
837
            self.logger.warn(f"Relation with `ref={sr.line}` not found!")
838
            raise
839