|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
import json |
|
3
|
|
|
import logging |
|
4
|
|
|
|
|
5
|
|
|
import elasticsearch_dsl |
|
6
|
|
|
from elasticsearch.exceptions import NotFoundError |
|
7
|
|
|
from elasticsearch.helpers import parallel_bulk |
|
8
|
|
|
from elasticsearch_dsl.connections import connections |
|
9
|
|
|
from tg import config |
|
10
|
|
|
|
|
11
|
|
|
from pyjobsweb import model |
|
12
|
|
|
from pyjobsweb.lib.sqlalchemy_ import current_server_timestamp |
|
13
|
|
|
from pyjobsweb.lib.lock import acquire_inter_process_lock |
|
14
|
|
|
from pyjobsweb.commands import AppContextCommand |
|
15
|
|
|
|
|
16
|
|
|
|
|
17
|
|
|
class PopulateESCommand(AppContextCommand): |
|
18
|
|
|
""" |
|
19
|
|
|
Populate (or synchronize) Elasticsearch indexes |
|
20
|
|
|
""" |
|
21
|
|
|
def __init__(self, *args, **kwargs): |
|
22
|
|
|
super(PopulateESCommand, self).__init__(args, kwargs) |
|
23
|
|
|
self._logger = logging.getLogger(__name__) |
|
24
|
|
|
|
|
25
|
|
View Code Duplication |
def get_parser(self, prog_name): |
|
|
|
|
|
|
26
|
|
|
parser = super(PopulateESCommand, self).get_parser(prog_name) |
|
27
|
|
|
|
|
28
|
|
|
jobs_help_msg = 'synchronizes the jobs index from the Elasticsearch ' \ |
|
29
|
|
|
'database with the jobs table from the Postgresql ' \ |
|
30
|
|
|
'database' |
|
31
|
|
|
parser.add_argument('-j', '--jobs', |
|
32
|
|
|
help=jobs_help_msg, |
|
33
|
|
|
dest='synchronize_jobs_index', |
|
34
|
|
|
action='store_const', const=True) |
|
35
|
|
|
|
|
36
|
|
|
companies_help_msg = 'synchronizes the companies index from the ' \ |
|
37
|
|
|
'Elasticsearch database with the companies ' \ |
|
38
|
|
|
'table from the Postgresql database' |
|
39
|
|
|
parser.add_argument('-co', '--companies', |
|
40
|
|
|
help=companies_help_msg, |
|
41
|
|
|
dest='synchronize_companies_index', |
|
42
|
|
|
action='store_const', const=True) |
|
43
|
|
|
|
|
44
|
|
|
geocomplete_help_msg = \ |
|
45
|
|
|
'populates the geocomplete index of the elasticsearch database' |
|
46
|
|
|
parser.add_argument('-g', '--geocomplete', |
|
47
|
|
|
help=geocomplete_help_msg, |
|
48
|
|
|
dest='populate_geocomplete_index', |
|
49
|
|
|
action='store_const', const=True) |
|
50
|
|
|
|
|
51
|
|
|
return parser |
|
52
|
|
|
|
|
53
|
|
|
def _logging(self, logging_level, message): |
|
54
|
|
|
self._logger.log(logging_level, message) |
|
55
|
|
|
|
|
56
|
|
|
def _job_id_logging(self, job_id, logging_level, message): |
|
57
|
|
|
log_msg = u'[Job offer id: %s] %s' % (job_id, message) |
|
58
|
|
|
self._logging(logging_level, log_msg) |
|
59
|
|
|
|
|
60
|
|
|
def _company_id_logging(self, company_id, logging_level, message): |
|
61
|
|
|
log_msg = u'[Company: %s] %s' % (company_id, message) |
|
62
|
|
|
self._logging(logging_level, log_msg) |
|
63
|
|
|
|
|
64
|
|
|
def _compute_dirty_documents(self, sql_table_cls, doc_type): |
|
65
|
|
|
self._logging(logging.INFO, |
|
66
|
|
|
'Computing out of sync %s documents.' % doc_type) |
|
67
|
|
|
|
|
68
|
|
|
dirty_rows = sql_table_cls.get_dirty_rows() |
|
69
|
|
|
|
|
70
|
|
|
for row in dirty_rows: |
|
71
|
|
|
yield row.to_elasticsearch_document() |
|
72
|
|
|
|
|
73
|
|
|
@staticmethod |
|
74
|
|
|
def _geocompletion_documents(): |
|
75
|
|
|
geolocation_data = open(config.get('fr.geolocation_data.path')) |
|
76
|
|
|
|
|
77
|
|
|
json_dict = json.loads(geolocation_data.read()) |
|
78
|
|
|
|
|
79
|
|
|
for postal_code, places in json_dict.items(): |
|
80
|
|
|
for place in places: |
|
81
|
|
|
yield model.Geocomplete(name=place['name'], |
|
82
|
|
|
complement=place['complement'], |
|
83
|
|
|
postal_code=postal_code, |
|
84
|
|
|
geolocation=dict( |
|
85
|
|
|
lat=float(place['lat']), |
|
86
|
|
|
lon=float(place['lon']) |
|
87
|
|
|
), |
|
88
|
|
|
weight=place['weight']) |
|
89
|
|
|
|
|
90
|
|
|
def _synchronisation_op(self, elasticsearch_doctype, pending_insertions): |
|
91
|
|
|
self._logging(logging.INFO, |
|
92
|
|
|
'Computing required operations to synchronize documents.') |
|
93
|
|
|
|
|
94
|
|
|
for p in pending_insertions: |
|
95
|
|
|
doc_dict = p.to_dict(True) |
|
96
|
|
|
|
|
97
|
|
|
try: |
|
98
|
|
|
elasticsearch_doctype.get(p.id) |
|
99
|
|
|
update_op = doc_dict |
|
100
|
|
|
update_op['_op_type'] = 'update' |
|
101
|
|
|
update_op['doc'] = doc_dict['_source'] |
|
102
|
|
|
del update_op['_source'] |
|
103
|
|
|
sync_op = update_op |
|
104
|
|
|
except NotFoundError: |
|
105
|
|
|
add_op = doc_dict |
|
106
|
|
|
add_op['_op_type'] = 'index' |
|
107
|
|
|
sync_op = add_op |
|
108
|
|
|
|
|
109
|
|
|
yield sync_op |
|
110
|
|
|
|
|
111
|
|
|
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger): |
|
112
|
|
|
es_doc = es_doc_cls() |
|
113
|
|
|
|
|
114
|
|
|
elasticsearch_conn = connections.get_connection() |
|
115
|
|
|
|
|
116
|
|
|
sync_timestamp = current_server_timestamp() |
|
117
|
|
|
|
|
118
|
|
|
pending_insertions = self._compute_dirty_documents( |
|
119
|
|
|
sql_table_cls, es_doc.doc_type) |
|
120
|
|
|
|
|
121
|
|
|
bulk_op = self._synchronisation_op(es_doc, pending_insertions) |
|
122
|
|
|
|
|
123
|
|
|
self._logging(logging.INFO, 'Performing synchronization.') |
|
124
|
|
|
|
|
125
|
|
|
for ok, info in parallel_bulk(elasticsearch_conn, bulk_op): |
|
126
|
|
|
obj_id = info['index']['_id'] \ |
|
127
|
|
|
if 'index' in info else info['update']['_id'] |
|
128
|
|
|
|
|
129
|
|
|
if ok: |
|
130
|
|
|
# Mark the task as handled so we don't retreat it next time |
|
131
|
|
|
self._logging(logging.INFO, |
|
132
|
|
|
'Document %s has been synced successfully.' |
|
133
|
|
|
% obj_id) |
|
134
|
|
|
|
|
135
|
|
|
sql_table_cls.update_last_sync(obj_id, sync_timestamp) |
|
136
|
|
|
else: |
|
137
|
|
|
id_logger(obj_id, logging.ERROR, |
|
138
|
|
|
'Error while syncing document %s index.' % obj_id) |
|
139
|
|
|
|
|
140
|
|
|
# Refresh indices to increase research speed |
|
141
|
|
|
elasticsearch_dsl.Index(es_doc.index).refresh() |
|
142
|
|
|
|
|
143
|
|
|
def _synchronise_index(self, sql_table_cls, es_doc_cls, id_logger): |
|
144
|
|
|
es_doc = es_doc_cls() |
|
145
|
|
|
|
|
146
|
|
|
self._logging(logging.INFO, |
|
147
|
|
|
'Synchronizing %s index.' % es_doc.index) |
|
148
|
|
|
|
|
149
|
|
|
with acquire_inter_process_lock('sync_%s' % es_doc.index) as acquired: |
|
150
|
|
|
if not acquired: |
|
151
|
|
|
es_doc = es_doc_cls() |
|
152
|
|
|
err_msg = 'Another process is already synchronizing the %s ' \ |
|
153
|
|
|
'index, aborting now.' % es_doc.index |
|
154
|
|
|
self._logging(logging.WARNING, err_msg) |
|
155
|
|
|
else: |
|
156
|
|
|
self._perform_index_sync(sql_table_cls, es_doc_cls, id_logger) |
|
157
|
|
|
|
|
158
|
|
|
self._logging(logging.INFO, |
|
159
|
|
|
'Index %s is now synchronized.' % es_doc.index) |
|
160
|
|
|
|
|
161
|
|
|
def _synchronise_jobs_index(self): |
|
162
|
|
|
self._synchronise_index(model.JobAlchemy, |
|
163
|
|
|
model.JobElastic, self._job_id_logging) |
|
164
|
|
|
|
|
165
|
|
|
def _synchronise_companies_index(self): |
|
166
|
|
|
self._synchronise_index(model.CompanyAlchemy, |
|
167
|
|
|
model.CompanyElastic, self._company_id_logging) |
|
168
|
|
|
|
|
169
|
|
|
def _geocomplete_index_batch(self, elasticsearch_conn, to_index): |
|
170
|
|
|
log_msg = 'Indexing documents.' |
|
171
|
|
|
self._logging(logging.INFO, log_msg) |
|
172
|
|
|
|
|
173
|
|
|
for ok, info in parallel_bulk(elasticsearch_conn, to_index): |
|
174
|
|
|
if not ok: |
|
175
|
|
|
doc_id = info['create']['_id'] |
|
176
|
|
|
doc_type = info['create']['_type'] |
|
177
|
|
|
doc_index = info['create']['_index'] |
|
178
|
|
|
|
|
179
|
|
|
logging_level = logging.ERROR |
|
180
|
|
|
err_msg = "Couldn't index document: '%s', of type: %s, " \ |
|
181
|
|
|
"under index: %s." % (doc_id, doc_type, doc_index) |
|
182
|
|
|
|
|
183
|
|
|
self._logging(logging_level, err_msg) |
|
184
|
|
|
|
|
185
|
|
|
def _perform_geocomplete_index_population(self, max_doc): |
|
186
|
|
|
elasticsearch_conn = connections.get_connection() |
|
187
|
|
|
|
|
188
|
|
|
to_index = list() |
|
189
|
|
|
|
|
190
|
|
|
for i, document in enumerate(self._geocompletion_documents()): |
|
191
|
|
|
if i % max_doc == 0: |
|
192
|
|
|
log_msg = 'Computing required geoloc-entry documents.' |
|
193
|
|
|
self._logging(logging.INFO, log_msg) |
|
194
|
|
|
|
|
195
|
|
|
to_index.append(document.to_dict(True)) |
|
196
|
|
|
|
|
197
|
|
|
if len(to_index) < max_doc: |
|
198
|
|
|
continue |
|
199
|
|
|
|
|
200
|
|
|
self._geocomplete_index_batch(elasticsearch_conn, to_index) |
|
201
|
|
|
|
|
202
|
|
|
to_index = list() |
|
203
|
|
|
|
|
204
|
|
|
if len(to_index) != 0: |
|
205
|
|
|
self._geocomplete_index_batch(elasticsearch_conn, to_index) |
|
206
|
|
|
|
|
207
|
|
|
elasticsearch_dsl.Index('geocomplete').refresh() |
|
208
|
|
|
|
|
209
|
|
|
def _populate_geocomplete_index(self, max_doc=1000): |
|
210
|
|
|
log_msg = 'Populating geocomplete index.' |
|
211
|
|
|
self._logging(logging.INFO, log_msg) |
|
212
|
|
|
|
|
213
|
|
|
with acquire_inter_process_lock('populate_geocomplete') as acquired: |
|
214
|
|
|
if not acquired: |
|
215
|
|
|
err_msg = 'Another process is already populating the ' \ |
|
216
|
|
|
'geocomplete index, aborting now.' |
|
217
|
|
|
self._logging(logging.WARNING, err_msg) |
|
218
|
|
|
else: |
|
219
|
|
|
self._perform_geocomplete_index_population(max_doc) |
|
220
|
|
|
|
|
221
|
|
|
log_msg = 'gecomplete index populated and refreshed.' |
|
222
|
|
|
self._logging(logging.INFO, log_msg) |
|
223
|
|
|
|
|
224
|
|
|
def take_action(self, parsed_args): |
|
225
|
|
|
super(PopulateESCommand, self).take_action(parsed_args) |
|
226
|
|
|
|
|
227
|
|
|
if parsed_args.populate_geocomplete_index: |
|
228
|
|
|
self._populate_geocomplete_index() |
|
229
|
|
|
|
|
230
|
|
|
if parsed_args.synchronize_jobs_index: |
|
231
|
|
|
self._synchronise_jobs_index() |
|
232
|
|
|
|
|
233
|
|
|
if parsed_args.synchronize_companies_index: |
|
234
|
|
|
self._synchronise_companies_index() |
|
235
|
|
|
|