GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

PopulateESCommand   B
last analyzed

Complexity

Total Complexity 37

Size/Duplication

Total Lines 218
Duplicated Lines 12.39 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 27
loc 218
rs 8.6
wmc 37

16 Methods

Rating   Name   Duplication   Size   Complexity  
B _perform_geocomplete_index_population() 0 23 5
A _company_id_logging() 0 3 1
A _synchronise_companies_index() 0 3 1
A _compute_dirty_documents() 0 8 2
A _synchronise_jobs_index() 0 3 1
A _geocomplete_index_batch() 0 15 3
B _perform_index_sync() 0 31 4
A _geocompletion_documents() 0 16 3
A __init__() 0 3 1
A _populate_geocomplete_index() 0 14 3
B get_parser() 27 27 1
A _synchronisation_op() 0 20 3
A _logging() 0 2 1
A take_action() 0 11 4
A _job_id_logging() 0 3 1
A _synchronise_index() 0 17 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
import json
3
import logging
4
5
import elasticsearch_dsl
6
from elasticsearch.exceptions import NotFoundError
7
from elasticsearch.helpers import parallel_bulk
8
from elasticsearch_dsl.connections import connections
9
from tg import config
10
11
from pyjobsweb import model
12
from pyjobsweb.lib.sqlalchemy_ import current_server_timestamp
13
from pyjobsweb.lib.lock import acquire_inter_process_lock
14
from pyjobsweb.commands import AppContextCommand
15
16
17
class PopulateESCommand(AppContextCommand):
18
    """
19
    Populate (or synchronize) Elasticsearch indexes
20
    """
21
    def __init__(self, *args, **kwargs):
22
        super(PopulateESCommand, self).__init__(args, kwargs)
23
        self._logger = logging.getLogger(__name__)
24
25 View Code Duplication
    def get_parser(self, prog_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26
        parser = super(PopulateESCommand, self).get_parser(prog_name)
27
28
        jobs_help_msg = 'synchronizes the jobs index from the Elasticsearch ' \
29
                        'database with the jobs table from the Postgresql ' \
30
                        'database'
31
        parser.add_argument('-j', '--jobs',
32
                            help=jobs_help_msg,
33
                            dest='synchronize_jobs_index',
34
                            action='store_const', const=True)
35
36
        companies_help_msg = 'synchronizes the companies index from the ' \
37
                             'Elasticsearch database with the companies ' \
38
                             'table from the Postgresql database'
39
        parser.add_argument('-co', '--companies',
40
                            help=companies_help_msg,
41
                            dest='synchronize_companies_index',
42
                            action='store_const', const=True)
43
44
        geocomplete_help_msg = \
45
            'populates the geocomplete index of the elasticsearch database'
46
        parser.add_argument('-g', '--geocomplete',
47
                            help=geocomplete_help_msg,
48
                            dest='populate_geocomplete_index',
49
                            action='store_const', const=True)
50
51
        return parser
52
53
    def _logging(self, logging_level, message):
54
        self._logger.log(logging_level, message)
55
56
    def _job_id_logging(self, job_id, logging_level, message):
57
        log_msg = u'[Job offer id: %s] %s' % (job_id, message)
58
        self._logging(logging_level, log_msg)
59
60
    def _company_id_logging(self, company_id, logging_level, message):
61
        log_msg = u'[Company: %s] %s' % (company_id, message)
62
        self._logging(logging_level, log_msg)
63
64
    def _compute_dirty_documents(self, sql_table_cls, doc_type):
65
        self._logging(logging.INFO,
66
                      'Computing out of sync %s documents.' % doc_type)
67
68
        dirty_rows = sql_table_cls.get_dirty_rows()
69
70
        for row in dirty_rows:
71
            yield row.to_elasticsearch_document()
72
73
    @staticmethod
74
    def _geocompletion_documents():
75
        geolocation_data = open(config.get('fr.geolocation_data.path'))
76
77
        json_dict = json.loads(geolocation_data.read())
78
79
        for postal_code, places in json_dict.items():
80
            for place in places:
81
                yield model.Geocomplete(name=place['name'],
82
                                        complement=place['complement'],
83
                                        postal_code=postal_code,
84
                                        geolocation=dict(
85
                                            lat=float(place['lat']),
86
                                            lon=float(place['lon'])
87
                                        ),
88
                                        weight=place['weight'])
89
90
    def _synchronisation_op(self, elasticsearch_doctype, pending_insertions):
91
        self._logging(logging.INFO,
92
                      'Computing required operations to synchronize documents.')
93
94
        for p in pending_insertions:
95
            doc_dict = p.to_dict(True)
96
97
            try:
98
                elasticsearch_doctype.get(p.id)
99
                update_op = doc_dict
100
                update_op['_op_type'] = 'update'
101
                update_op['doc'] = doc_dict['_source']
102
                del update_op['_source']
103
                sync_op = update_op
104
            except NotFoundError:
105
                add_op = doc_dict
106
                add_op['_op_type'] = 'index'
107
                sync_op = add_op
108
109
            yield sync_op
110
111
    def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger):
112
        es_doc = es_doc_cls()
113
114
        elasticsearch_conn = connections.get_connection()
115
116
        sync_timestamp = current_server_timestamp()
117
118
        pending_insertions = self._compute_dirty_documents(
119
            sql_table_cls, es_doc.doc_type)
120
121
        bulk_op = self._synchronisation_op(es_doc, pending_insertions)
122
123
        self._logging(logging.INFO, 'Performing synchronization.')
124
125
        for ok, info in parallel_bulk(elasticsearch_conn, bulk_op):
126
            obj_id = info['index']['_id'] \
127
                if 'index' in info else info['update']['_id']
128
129
            if ok:
130
                # Mark the task as handled so we don't retreat it next time
131
                self._logging(logging.INFO,
132
                              'Document %s has been synced successfully.'
133
                              % obj_id)
134
135
                sql_table_cls.update_last_sync(obj_id, sync_timestamp)
136
            else:
137
                id_logger(obj_id, logging.ERROR,
138
                          'Error while syncing document %s index.' % obj_id)
139
140
        # Refresh indices to increase research speed
141
        elasticsearch_dsl.Index(es_doc.index).refresh()
142
143
    def _synchronise_index(self, sql_table_cls, es_doc_cls, id_logger):
144
        es_doc = es_doc_cls()
145
146
        self._logging(logging.INFO,
147
                      'Synchronizing %s index.' % es_doc.index)
148
149
        with acquire_inter_process_lock('sync_%s' % es_doc.index) as acquired:
150
            if not acquired:
151
                es_doc = es_doc_cls()
152
                err_msg = 'Another process is already synchronizing the %s ' \
153
                          'index, aborting now.' % es_doc.index
154
                self._logging(logging.WARNING, err_msg)
155
            else:
156
                self._perform_index_sync(sql_table_cls, es_doc_cls, id_logger)
157
158
                self._logging(logging.INFO,
159
                              'Index %s is now synchronized.' % es_doc.index)
160
161
    def _synchronise_jobs_index(self):
162
        self._synchronise_index(model.JobAlchemy,
163
                                model.JobElastic, self._job_id_logging)
164
165
    def _synchronise_companies_index(self):
166
        self._synchronise_index(model.CompanyAlchemy,
167
                                model.CompanyElastic, self._company_id_logging)
168
169
    def _geocomplete_index_batch(self, elasticsearch_conn, to_index):
170
        log_msg = 'Indexing documents.'
171
        self._logging(logging.INFO, log_msg)
172
173
        for ok, info in parallel_bulk(elasticsearch_conn, to_index):
174
            if not ok:
175
                doc_id = info['create']['_id']
176
                doc_type = info['create']['_type']
177
                doc_index = info['create']['_index']
178
179
                logging_level = logging.ERROR
180
                err_msg = "Couldn't index document: '%s', of type: %s, " \
181
                          "under index: %s." % (doc_id, doc_type, doc_index)
182
183
                self._logging(logging_level, err_msg)
184
185
    def _perform_geocomplete_index_population(self, max_doc):
186
        elasticsearch_conn = connections.get_connection()
187
188
        to_index = list()
189
190
        for i, document in enumerate(self._geocompletion_documents()):
191
            if i % max_doc == 0:
192
                log_msg = 'Computing required geoloc-entry documents.'
193
                self._logging(logging.INFO, log_msg)
194
195
            to_index.append(document.to_dict(True))
196
197
            if len(to_index) < max_doc:
198
                continue
199
200
            self._geocomplete_index_batch(elasticsearch_conn, to_index)
201
202
            to_index = list()
203
204
        if len(to_index) != 0:
205
            self._geocomplete_index_batch(elasticsearch_conn, to_index)
206
207
        elasticsearch_dsl.Index('geocomplete').refresh()
208
209
    def _populate_geocomplete_index(self, max_doc=1000):
210
        log_msg = 'Populating geocomplete index.'
211
        self._logging(logging.INFO, log_msg)
212
213
        with acquire_inter_process_lock('populate_geocomplete') as acquired:
214
            if not acquired:
215
                err_msg = 'Another process is already populating the ' \
216
                          'geocomplete index, aborting now.'
217
                self._logging(logging.WARNING, err_msg)
218
            else:
219
                self._perform_geocomplete_index_population(max_doc)
220
221
                log_msg = 'gecomplete index populated and refreshed.'
222
                self._logging(logging.INFO, log_msg)
223
224
    def take_action(self, parsed_args):
225
        super(PopulateESCommand, self).take_action(parsed_args)
226
227
        if parsed_args.populate_geocomplete_index:
228
            self._populate_geocomplete_index()
229
230
        if parsed_args.synchronize_jobs_index:
231
            self._synchronise_jobs_index()
232
233
        if parsed_args.synchronize_companies_index:
234
            self._synchronise_companies_index()
235