Passed
Push — master ( c5c1f2...f30c75 )
by KAMI
02:43
created

osm_poi_matchmaker.create_db   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 188
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 149
dl 0
loc 188
rs 10
c 0
b 0
f 0
wmc 17
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
4
__author__ = 'kami911'
5
__program__ = 'create_db'
6
__version__ = '0.7.0'
7
8
try:
9
    import os
10
    import traceback
11
    import logging
12
    import logging.config
13
    import sys
14
    import numpy as np
15
    import pandas as pd
16
    import multiprocessing
17
    from osm_poi_matchmaker.utils import config, timing
18
    from osm_poi_matchmaker.libs.osm import timestamp_now
19
    from osm_poi_matchmaker.libs.online_poi_matching import online_poi_matching
20
    from osm_poi_matchmaker.libs.import_poi_data_module import import_poi_data_module
21
    from osm_poi_matchmaker.libs.export import export_raw_poi_data, export_raw_poi_data_xml, export_grouped_poi_data, \
22
        export_grouped_poi_data_with_postcode_groups
23
    from sqlalchemy.orm import scoped_session, sessionmaker
24
    from osm_poi_matchmaker.dao.poi_base import POIBase
25
except ImportError as err:
26
    logging.error('Error {0} import module: {1}',__name__, err))
0 ignored issues
show
introduced by
invalid syntax (<unknown>, line 26)
Loading history...
27
    logging.error(traceback.print_exc())
28
    sys.exit(128)
29
30
POI_COLS = ['poi_code', 'poi_postcode', 'poi_city', 'poi_name', 'poi_branch', 'poi_website', 'original',
31
            'poi_addr_street',
32
            'poi_addr_housenumber', 'poi_conscriptionnumber', 'poi_ref', 'poi_geom']
33
RETRY = 3
34
35
36
def init_log():
37
    logging.config.fileConfig('log.conf')
38
39
40
def import_basic_data(session):
41
    logging.info('Importing cities ...')
42
    from osm_poi_matchmaker.dataproviders.hu_generic import hu_city_postcode_from_xml
43
    work = hu_city_postcode_from_xml(session, 'http://httpmegosztas.posta.hu/PartnerExtra/OUT/ZipCodes.xml',
44
                                     config.get_directory_cache_url())
45
    work.process()
46
47
    logging.info('Importing street types ...')
48
    from osm_poi_matchmaker.dataproviders.hu_generic import hu_street_types_from_xml
49
    work = hu_street_types_from_xml(session, 'http://httpmegosztas.posta.hu/PartnerExtra/OUT/StreetTypes.xml',
50
                                    config.get_directory_cache_url())
51
    work.process()
52
53
54
def load_poi_data(database):
55
    logging.info('Loading POI_data from database ...')
56
    if not os.path.exists(config.get_directory_output()):
57
        os.makedirs(config.get_directory_output())
58
    # Build Dataframe from our POI database
59
    addr_data = database.query_all_gpd_in_order('poi_address')
60
    addr_data[['poi_addr_city', 'poi_postcode']] = addr_data[['poi_addr_city', 'poi_postcode']].fillna('0').astype(int)
61
    return addr_data
62
63
64
def load_common_data(database):
65
    logging.info('Loading common data from database ...')
66
    return database.query_all_pd('poi_common')
67
68
69
class WorkflowManager(object):
70
71
    def __init__(self):
72
        self.manager = multiprocessing.Manager()
73
        self.queue = self.manager.Queue()
74
        self.NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
75
        self.items = 0
76
        self.pool = None
77
        self.results = []
78
79
    def start_poi_harvest(self):
80
        for m in config.get_dataproviders_modules_enable():
81
            self.queue.put(m)
82
        try:
83
            # Start multiprocessing in case multiple cores
84
            logging.info('Starting processing on {} cores.', self.NUMBER_OF_PROCESSES)
85
            self.results = []
86
            self.pool = multiprocessing.Pool(processes=self.NUMBER_OF_PROCESSES)
87
            self.results = self.pool.map_async(import_poi_data_module, config.get_dataproviders_modules_enable())
88
            self.pool.close()
89
        except Exception as e:
90
            logging.error(e)
91
            logging.error(traceback.print_exc())
92
93
94
    def start_exporter(self, data: list, postfix: str = '', to_do = export_grouped_poi_data):
95
        poi_codes = data['poi_code'].unique()
96
        modules = [[config.get_directory_output(), 'poi_address_{}{}'.format(postfix, c), data[data.poi_code == c],
97
                    'poi_address'] for c in poi_codes]
98
        try:
99
            # Start multiprocessing in case multiple cores
100
            logging.info('Starting processing on {} cores.', self.NUMBER_OF_PROCESSES)
101
            self.results = []
102
            self.pool = multiprocessing.Pool(processes=self.NUMBER_OF_PROCESSES)
103
            self.results = self.pool.map_async(to_do, modules)
104
            self.pool.close()
105
        except Exception as e:
106
            logging.error(e)
107
            logging.error(traceback.print_exc())
108
109
110
    def start_matcher(self, data, comm_data):
111
        try:
112
            workers = self.NUMBER_OF_PROCESSES
113
            self.pool = multiprocessing.Pool(processes=self.NUMBER_OF_PROCESSES)
114
            self.results = self.pool.map_async(online_poi_matching,
115
                                               [(d, comm_data) for d in np.array_split(data, workers)])
116
            self.pool.close()
117
            return pd.concat(list(self.results.get()), sort=False)
118
        except Exception as e:
119
            logging.error(e)
120
            logging.error(traceback.print_exc())
121
122
    def join(self):
123
        self.pool.join()
124
125
126
def main():
127
    logging.info('Starting {0} ...', __program__)
128
    db = POIBase('{}://{}:{}@{}:{}/{}'.format(config.get_database_type(), config.get_database_writer_username(),
129
                                              config.get_database_writer_password(),
130
                                              config.get_database_writer_host(),
131
                                              config.get_database_writer_port(),
132
                                              config.get_database_poi_database()))
133
    pgsql_pool = db.pool
134
    session_factory = sessionmaker(pgsql_pool)
135
    Session = scoped_session(session_factory)
136
    session = Session()
137
    try:
138
        import_basic_data(db.session)
139
        manager = WorkflowManager()
140
        manager.start_poi_harvest()
141
        manager.join()
142
        # Load basic dataset from database
143
        poi_addr_data = load_poi_data(db)
144
        # Download and load POI dataset to database
145
        poi_common_data = load_common_data(db)
146
        logging.info('Merging dataframes ...')
147
        poi_addr_data = pd.merge(poi_addr_data, poi_common_data, left_on='poi_common_id', right_on='pc_id', how='inner')
148
        # Add additional empty fields
149
        poi_addr_data['osm_id'] = None
150
        poi_addr_data['osm_node'] = None
151
        poi_addr_data['osm_version'] = None
152
        poi_addr_data['osm_changeset'] = None
153
        poi_addr_data['osm_timestamp'] = timestamp_now()
154
        poi_addr_data['osm_live_tags'] = None
155
        # Export non-transformed data
156
        export_raw_poi_data(poi_addr_data, poi_common_data)
157
        #export_raw_poi_data_xml(poi_addr_data)
158
        logging.info('Saving poi_code grouped filesets...')
159
        # Export non-transformed filesets
160
        manager.start_exporter(poi_addr_data)
161
        manager.join()
162
        logging.info('Merging with OSM datasets ...')
163
        poi_addr_data['osm_nodes'] = None
164
        poi_addr_data['poi_distance'] = None
165
        # Enrich POI datasets from online OpenStreetMap database
166
        logging.info('Starting online POI matching part...')
167
        poi_addr_data = manager.start_matcher(poi_addr_data, poi_common_data)
168
        manager.join()
169
        # Export filesets
170
        export_raw_poi_data(poi_addr_data, poi_common_data, '_merge')
171
        manager.start_exporter(poi_addr_data, 'merge_')
172
        manager.start_exporter(poi_addr_data, 'merge_', export_grouped_poi_data_with_postcode_groups)
173
        manager.join()
174
175
    except (KeyboardInterrupt, SystemExit):
176
        logging.info('Interrupt signal received')
177
        sys.exit(1)
178
    except Exception as err:
179
        raise err
180
181
182
if __name__ == '__main__':
183
    config.set_mode(config.Mode.matcher)
184
    init_log()
185
    timer = timing.Timing()
186
    main()
187
    logging.info('Total duration of process: {}. Finished, exiting and go home ...', timer.end())
188