1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
import json |
3
|
|
|
import logging |
4
|
|
|
|
5
|
|
|
import elasticsearch_dsl |
6
|
|
|
from elasticsearch.exceptions import NotFoundError |
7
|
|
|
from elasticsearch.helpers import parallel_bulk |
8
|
|
|
from elasticsearch_dsl.connections import connections |
9
|
|
|
from tg import config |
10
|
|
|
|
11
|
|
|
from pyjobsweb import model |
12
|
|
|
from pyjobsweb.lib.sqlalchemy_ import current_server_timestamp |
13
|
|
|
from pyjobsweb.lib.lock import acquire_inter_process_lock |
14
|
|
|
from pyjobsweb.commands import AppContextCommand |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
class PopulateESCommand(AppContextCommand): |
18
|
|
|
""" |
19
|
|
|
Populate (or synchronize) Elasticsearch indexes |
20
|
|
|
""" |
21
|
|
|
def __init__(self, *args, **kwargs): |
22
|
|
|
super(PopulateESCommand, self).__init__(args, kwargs) |
23
|
|
|
self._logger = logging.getLogger(__name__) |
24
|
|
|
|
25
|
|
View Code Duplication |
def get_parser(self, prog_name): |
|
|
|
|
26
|
|
|
parser = super(PopulateESCommand, self).get_parser(prog_name) |
27
|
|
|
|
28
|
|
|
jobs_help_msg = 'synchronizes the jobs index from the Elasticsearch ' \ |
29
|
|
|
'database with the jobs table from the Postgresql ' \ |
30
|
|
|
'database' |
31
|
|
|
parser.add_argument('-j', '--jobs', |
32
|
|
|
help=jobs_help_msg, |
33
|
|
|
dest='synchronize_jobs_index', |
34
|
|
|
action='store_const', const=True) |
35
|
|
|
|
36
|
|
|
companies_help_msg = 'synchronizes the companies index from the ' \ |
37
|
|
|
'Elasticsearch database with the companies ' \ |
38
|
|
|
'table from the Postgresql database' |
39
|
|
|
parser.add_argument('-co', '--companies', |
40
|
|
|
help=companies_help_msg, |
41
|
|
|
dest='synchronize_companies_index', |
42
|
|
|
action='store_const', const=True) |
43
|
|
|
|
44
|
|
|
geocomplete_help_msg = \ |
45
|
|
|
'populates the geocomplete index of the elasticsearch database' |
46
|
|
|
parser.add_argument('-g', '--geocomplete', |
47
|
|
|
help=geocomplete_help_msg, |
48
|
|
|
dest='populate_geocomplete_index', |
49
|
|
|
action='store_const', const=True) |
50
|
|
|
|
51
|
|
|
return parser |
52
|
|
|
|
53
|
|
|
def _logging(self, logging_level, message): |
54
|
|
|
self._logger.log(logging_level, message) |
55
|
|
|
|
56
|
|
|
def _job_id_logging(self, job_id, logging_level, message): |
57
|
|
|
log_msg = u'[Job offer id: %s] %s' % (job_id, message) |
58
|
|
|
self._logging(logging_level, log_msg) |
59
|
|
|
|
60
|
|
|
def _company_id_logging(self, company_id, logging_level, message): |
61
|
|
|
log_msg = u'[Company: %s] %s' % (company_id, message) |
62
|
|
|
self._logging(logging_level, log_msg) |
63
|
|
|
|
64
|
|
|
def _compute_dirty_documents(self, sql_table_cls, doc_type): |
65
|
|
|
self._logging(logging.INFO, |
66
|
|
|
'Computing out of sync %s documents.' % doc_type) |
67
|
|
|
|
68
|
|
|
dirty_rows = sql_table_cls.get_dirty_rows() |
69
|
|
|
|
70
|
|
|
for row in dirty_rows: |
71
|
|
|
yield row.to_elasticsearch_document() |
72
|
|
|
|
73
|
|
|
@staticmethod |
74
|
|
|
def _geocompletion_documents(): |
75
|
|
|
geolocation_data = open(config.get('fr.geolocation_data.path')) |
76
|
|
|
|
77
|
|
|
json_dict = json.loads(geolocation_data.read()) |
78
|
|
|
|
79
|
|
|
for postal_code, places in json_dict.items(): |
80
|
|
|
for place in places: |
81
|
|
|
yield model.Geocomplete(name=place['name'], |
82
|
|
|
complement=place['complement'], |
83
|
|
|
postal_code=postal_code, |
84
|
|
|
geolocation=dict( |
85
|
|
|
lat=float(place['lat']), |
86
|
|
|
lon=float(place['lon']) |
87
|
|
|
), |
88
|
|
|
weight=place['weight']) |
89
|
|
|
|
90
|
|
|
def _synchronisation_op(self, elasticsearch_doctype, pending_insertions): |
91
|
|
|
self._logging(logging.INFO, |
92
|
|
|
'Computing required operations to synchronize documents.') |
93
|
|
|
|
94
|
|
|
for p in pending_insertions: |
95
|
|
|
doc_dict = p.to_dict(True) |
96
|
|
|
|
97
|
|
|
try: |
98
|
|
|
elasticsearch_doctype.get(p.id) |
99
|
|
|
update_op = doc_dict |
100
|
|
|
update_op['_op_type'] = 'update' |
101
|
|
|
update_op['doc'] = doc_dict['_source'] |
102
|
|
|
del update_op['_source'] |
103
|
|
|
sync_op = update_op |
104
|
|
|
except NotFoundError: |
105
|
|
|
add_op = doc_dict |
106
|
|
|
add_op['_op_type'] = 'index' |
107
|
|
|
sync_op = add_op |
108
|
|
|
|
109
|
|
|
yield sync_op |
110
|
|
|
|
111
|
|
|
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger): |
112
|
|
|
es_doc = es_doc_cls() |
113
|
|
|
|
114
|
|
|
elasticsearch_conn = connections.get_connection() |
115
|
|
|
|
116
|
|
|
sync_timestamp = current_server_timestamp() |
117
|
|
|
|
118
|
|
|
pending_insertions = self._compute_dirty_documents( |
119
|
|
|
sql_table_cls, es_doc.doc_type) |
120
|
|
|
|
121
|
|
|
bulk_op = self._synchronisation_op(es_doc, pending_insertions) |
122
|
|
|
|
123
|
|
|
self._logging(logging.INFO, 'Performing synchronization.') |
124
|
|
|
|
125
|
|
|
for ok, info in parallel_bulk(elasticsearch_conn, bulk_op): |
126
|
|
|
obj_id = info['index']['_id'] \ |
127
|
|
|
if 'index' in info else info['update']['_id'] |
128
|
|
|
|
129
|
|
|
if ok: |
130
|
|
|
# Mark the task as handled so we don't retreat it next time |
131
|
|
|
self._logging(logging.INFO, |
132
|
|
|
'Document %s has been synced successfully.' |
133
|
|
|
% obj_id) |
134
|
|
|
|
135
|
|
|
sql_table_cls.update_last_sync(obj_id, sync_timestamp) |
136
|
|
|
else: |
137
|
|
|
id_logger(obj_id, logging.ERROR, |
138
|
|
|
'Error while syncing document %s index.' % obj_id) |
139
|
|
|
|
140
|
|
|
# Refresh indices to increase research speed |
141
|
|
|
elasticsearch_dsl.Index(es_doc.index).refresh() |
142
|
|
|
|
143
|
|
|
def _synchronise_index(self, sql_table_cls, es_doc_cls, id_logger): |
144
|
|
|
es_doc = es_doc_cls() |
145
|
|
|
|
146
|
|
|
self._logging(logging.INFO, |
147
|
|
|
'Synchronizing %s index.' % es_doc.index) |
148
|
|
|
|
149
|
|
|
with acquire_inter_process_lock('sync_%s' % es_doc.index) as acquired: |
150
|
|
|
if not acquired: |
151
|
|
|
es_doc = es_doc_cls() |
152
|
|
|
err_msg = 'Another process is already synchronizing the %s ' \ |
153
|
|
|
'index, aborting now.' % es_doc.index |
154
|
|
|
self._logging(logging.WARNING, err_msg) |
155
|
|
|
else: |
156
|
|
|
self._perform_index_sync(sql_table_cls, es_doc_cls, id_logger) |
157
|
|
|
|
158
|
|
|
self._logging(logging.INFO, |
159
|
|
|
'Index %s is now synchronized.' % es_doc.index) |
160
|
|
|
|
161
|
|
|
def _synchronise_jobs_index(self): |
162
|
|
|
self._synchronise_index(model.JobAlchemy, |
163
|
|
|
model.JobElastic, self._job_id_logging) |
164
|
|
|
|
165
|
|
|
def _synchronise_companies_index(self): |
166
|
|
|
self._synchronise_index(model.CompanyAlchemy, |
167
|
|
|
model.CompanyElastic, self._company_id_logging) |
168
|
|
|
|
169
|
|
|
def _geocomplete_index_batch(self, elasticsearch_conn, to_index): |
170
|
|
|
log_msg = 'Indexing documents.' |
171
|
|
|
self._logging(logging.INFO, log_msg) |
172
|
|
|
|
173
|
|
|
for ok, info in parallel_bulk(elasticsearch_conn, to_index): |
174
|
|
|
if not ok: |
175
|
|
|
doc_id = info['create']['_id'] |
176
|
|
|
doc_type = info['create']['_type'] |
177
|
|
|
doc_index = info['create']['_index'] |
178
|
|
|
|
179
|
|
|
logging_level = logging.ERROR |
180
|
|
|
err_msg = "Couldn't index document: '%s', of type: %s, " \ |
181
|
|
|
"under index: %s." % (doc_id, doc_type, doc_index) |
182
|
|
|
|
183
|
|
|
self._logging(logging_level, err_msg) |
184
|
|
|
|
185
|
|
|
def _perform_geocomplete_index_population(self, max_doc): |
186
|
|
|
elasticsearch_conn = connections.get_connection() |
187
|
|
|
|
188
|
|
|
to_index = list() |
189
|
|
|
|
190
|
|
|
for i, document in enumerate(self._geocompletion_documents()): |
191
|
|
|
if i % max_doc == 0: |
192
|
|
|
log_msg = 'Computing required geoloc-entry documents.' |
193
|
|
|
self._logging(logging.INFO, log_msg) |
194
|
|
|
|
195
|
|
|
to_index.append(document.to_dict(True)) |
196
|
|
|
|
197
|
|
|
if len(to_index) < max_doc: |
198
|
|
|
continue |
199
|
|
|
|
200
|
|
|
self._geocomplete_index_batch(elasticsearch_conn, to_index) |
201
|
|
|
|
202
|
|
|
to_index = list() |
203
|
|
|
|
204
|
|
|
if len(to_index) != 0: |
205
|
|
|
self._geocomplete_index_batch(elasticsearch_conn, to_index) |
206
|
|
|
|
207
|
|
|
elasticsearch_dsl.Index('geocomplete').refresh() |
208
|
|
|
|
209
|
|
|
def _populate_geocomplete_index(self, max_doc=1000): |
210
|
|
|
log_msg = 'Populating geocomplete index.' |
211
|
|
|
self._logging(logging.INFO, log_msg) |
212
|
|
|
|
213
|
|
|
with acquire_inter_process_lock('populate_geocomplete') as acquired: |
214
|
|
|
if not acquired: |
215
|
|
|
err_msg = 'Another process is already populating the ' \ |
216
|
|
|
'geocomplete index, aborting now.' |
217
|
|
|
self._logging(logging.WARNING, err_msg) |
218
|
|
|
else: |
219
|
|
|
self._perform_geocomplete_index_population(max_doc) |
220
|
|
|
|
221
|
|
|
log_msg = 'gecomplete index populated and refreshed.' |
222
|
|
|
self._logging(logging.INFO, log_msg) |
223
|
|
|
|
224
|
|
|
def take_action(self, parsed_args): |
225
|
|
|
super(PopulateESCommand, self).take_action(parsed_args) |
226
|
|
|
|
227
|
|
|
if parsed_args.populate_geocomplete_index: |
228
|
|
|
self._populate_geocomplete_index() |
229
|
|
|
|
230
|
|
|
if parsed_args.synchronize_jobs_index: |
231
|
|
|
self._synchronise_jobs_index() |
232
|
|
|
|
233
|
|
|
if parsed_args.synchronize_companies_index: |
234
|
|
|
self._synchronise_companies_index() |
235
|
|
|
|