1
|
|
|
from six.moves import UserDict |
|
|
|
|
2
|
|
|
import logging |
3
|
|
|
import time |
4
|
|
|
|
5
|
|
|
from elasticsearch import Elasticsearch, helpers |
6
|
|
|
|
7
|
|
|
from ._registry import register_output |
8
|
|
|
from .base_output import OutputInterface |
9
|
|
|
from topik.vectorizers.vectorizer_output import VectorizerOutput |
10
|
|
|
from topik.models.base_model_output import ModelOutput |
11
|
|
|
|
12
|
|
|
def es_setitem(key, value, doc_type, instance, index, batch_size=1000): |
13
|
|
|
"""load an iterable of (id, value) pairs to the specified new or |
14
|
|
|
new or existing field within existing documents.""" |
15
|
|
|
batch = [] |
16
|
|
|
for id, val in value: |
|
|
|
|
17
|
|
|
action = {'_op_type': 'update', |
18
|
|
|
'_index': index, |
19
|
|
|
'_type': doc_type, |
20
|
|
|
'_id': id, |
21
|
|
|
'doc': {key: val}, |
22
|
|
|
'doc_as_upsert': "true", |
23
|
|
|
} |
24
|
|
|
batch.append(action) |
25
|
|
|
if len(batch) >= batch_size: |
26
|
|
|
helpers.bulk(client=instance, actions=batch, |
27
|
|
|
index=index) |
28
|
|
|
batch = [] |
29
|
|
|
if batch: |
30
|
|
|
helpers.bulk(client=instance, actions=batch, index=index) |
31
|
|
|
instance.indices.refresh(index) |
32
|
|
|
|
33
|
|
|
def es_getitem(key, doc_type, instance, index, query=None): |
|
|
|
|
34
|
|
|
results = helpers.scan(instance, index=index, |
35
|
|
|
query=query, doc_type=doc_type) |
36
|
|
|
for result in results: |
37
|
|
|
try: |
38
|
|
|
id = int(result["_id"]) |
|
|
|
|
39
|
|
|
except ValueError: |
|
|
|
|
40
|
|
|
id = result["_id"] |
|
|
|
|
41
|
|
|
yield id, result['_source'][key] |
42
|
|
|
|
43
|
|
|
class BaseElasticCorpora(UserDict): |
|
|
|
|
44
|
|
|
def __init__(self, instance, index, corpus_type, query=None, |
45
|
|
|
batch_size=1000): |
46
|
|
|
self.instance = instance |
|
|
|
|
47
|
|
|
self.index = index |
|
|
|
|
48
|
|
|
self.corpus_type = corpus_type |
|
|
|
|
49
|
|
|
self.query = query |
|
|
|
|
50
|
|
|
self.batch_size = batch_size |
|
|
|
|
51
|
|
|
pass |
|
|
|
|
52
|
|
|
|
53
|
|
|
def __setitem__(self, key, value): |
54
|
|
|
es_setitem(key, value, self.corpus_type, self.instance, self.index) |
|
|
|
|
55
|
|
|
|
56
|
|
|
|
57
|
|
|
def __getitem__(self, key): |
58
|
|
|
return es_getitem(key,self.corpus_type,self.instance,self.index, |
|
|
|
|
59
|
|
|
self.query) |
60
|
|
|
|
61
|
|
View Code Duplication |
class VectorizedElasticCorpora(BaseElasticCorpora): |
|
|
|
|
62
|
|
|
def __setitem__(self, key, value): |
63
|
|
|
#id_term_map |
64
|
|
|
es_setitem(key,value.id_term_map.items(),"term",self.instance,self.index) |
|
|
|
|
65
|
|
|
#document_term_counts |
66
|
|
|
es_setitem(key,value.document_term_counts.items(),"document_term_count",self.instance,self.index) |
|
|
|
|
67
|
|
|
#doc_lengths |
68
|
|
|
es_setitem(key,value.doc_lengths.items(),"document_length",self.instance,self.index) |
|
|
|
|
69
|
|
|
#global term_frequency |
70
|
|
|
es_setitem(key,value.term_frequency.items(),"term_frequency",self.instance,self.index) |
|
|
|
|
71
|
|
|
#vectors |
72
|
|
|
es_setitem(key,value.vectors.items(),"vector",self.instance,self.index) |
|
|
|
|
73
|
|
|
# could either upload vectors explicitly here (above) or using Super (below) |
74
|
|
|
#super(VectorizedElasticCorpora, self).__setitem__(key, value) |
75
|
|
|
|
76
|
|
|
def __getitem__(self, key): |
77
|
|
|
# TODO: each of these should be retrieved from a query. Populate the VectorizerOutput object |
|
|
|
|
78
|
|
|
# and return it. These things can be iterators instead of dicts; VectorizerOutput should |
79
|
|
|
# not care. |
80
|
|
|
# TODO: this is the id->term map for the full set of unique terms across all docs |
|
|
|
|
81
|
|
|
id_term_map = {int(term_id): term for term_id, term in es_getitem(key,"term",self.instance,self.index,self.query)} |
|
|
|
|
82
|
|
|
# 15 |
83
|
|
|
# TODO: this is the count of terms associated with each document |
|
|
|
|
84
|
|
|
document_term_count = {int(doc_id): doc_term_count for doc_id, doc_term_count in es_getitem(key,"document_term_count",self.instance,self.index,self.query)} |
|
|
|
|
85
|
|
|
# {"doc1": 3, "doc2": 5} |
86
|
|
|
doc_lengths = {int(doc_id): doc_length for doc_id, doc_length in es_getitem(key,"document_length",self.instance,self.index,self.query)} |
|
|
|
|
87
|
|
|
term_frequency = {int(term_id): global_frequency for term_id, global_frequency in es_getitem(key,"term_frequency",self.instance,self.index,self.query)} |
|
|
|
|
88
|
|
|
# TODO: this is the vectorized representation of each document |
|
|
|
|
89
|
|
|
vectors = {int(doc_id): {int(term_id): term_weight for term_id, term_weight in doc_term_weights.items()} for doc_id, doc_term_weights in es_getitem(key,"vector",self.instance,self.index,self.query)} |
|
|
|
|
90
|
|
|
#vectors = {int(doc_id): {doc_term_weights for doc_id, doc_term_weights in es_getitem(key,"vector",self.instance,self.index,self.query)} |
91
|
|
|
#vectors = list(es_getitem(key,"vector",self.instance,self.index,self.query)) |
92
|
|
|
# {"doc1": {1: 3, 2: 1} # word id is key, word count is value (for bag of words model) |
93
|
|
|
return VectorizerOutput(id_term_map=id_term_map, |
|
|
|
|
94
|
|
|
document_term_counts=document_term_count, |
|
|
|
|
95
|
|
|
doc_lengths=doc_lengths, |
|
|
|
|
96
|
|
|
term_frequency=term_frequency, |
|
|
|
|
97
|
|
|
vectors=vectors) |
|
|
|
|
98
|
|
|
|
99
|
|
View Code Duplication |
class ModeledElasticCorpora(BaseElasticCorpora): |
|
|
|
|
100
|
|
|
def __setitem__(self, key, value): |
101
|
|
|
es_setitem(key,value.vocab.items(),"term",self.instance,self.index) |
|
|
|
|
102
|
|
|
es_setitem(key,value.term_frequency.items(),"term_frequency",self.instance,self.index) |
|
|
|
|
103
|
|
|
es_setitem(key,value.topic_term_matrix.items(),"topic_term_dist",self.instance,self.index) |
|
|
|
|
104
|
|
|
es_setitem(key,value.doc_lengths.items(),"doc_length",self.instance,self.index) |
|
|
|
|
105
|
|
|
es_setitem(key,value.doc_topic_matrix.items(),"doc_topic_dist",self.instance,self.index) |
|
|
|
|
106
|
|
|
|
107
|
|
|
def __lt__(self, y): |
|
|
|
|
108
|
|
|
return super(ModeledElasticCorpora, self).__lt__(y) |
|
|
|
|
109
|
|
|
|
110
|
|
|
def __getitem__(self, key): |
111
|
|
|
vocab = {int(term_id): term for term_id, term in \ |
|
|
|
|
112
|
|
|
es_getitem(key,"term",self.instance,self.index,self.query)} |
|
|
|
|
113
|
|
|
term_frequency = {int(term_id): tf for term_id, tf in \ |
|
|
|
|
114
|
|
|
es_getitem(key,"term_frequency",self.instance,self.index,self.query)} |
|
|
|
|
115
|
|
|
topic_term_matrix = {topic_id: topic_term_dist for topic_id, topic_term_dist in \ |
|
|
|
|
116
|
|
|
es_getitem(key,"topic_term_dist",self.instance,self.index,self.query)} |
|
|
|
|
117
|
|
|
doc_lengths = {topic_id: doc_length for topic_id, doc_length in \ |
|
|
|
|
118
|
|
|
es_getitem(key,"doc_length",self.instance,self.index,self.query)} |
|
|
|
|
119
|
|
|
doc_topic_matrix = {int(doc_id): doc_topic_dist for doc_id, doc_topic_dist in \ |
|
|
|
|
120
|
|
|
es_getitem(key,"doc_topic_dist",self.instance,self.index,self.query)} |
|
|
|
|
121
|
|
|
return ModelOutput(vocab=vocab, term_frequency=term_frequency, |
|
|
|
|
122
|
|
|
topic_term_matrix=topic_term_matrix, |
|
|
|
|
123
|
|
|
doc_lengths=doc_lengths, |
|
|
|
|
124
|
|
|
doc_topic_matrix=doc_topic_matrix) |
|
|
|
|
125
|
|
|
|
126
|
|
|
@register_output |
|
|
|
|
127
|
|
|
class ElasticSearchOutput(OutputInterface): |
|
|
|
|
128
|
|
|
def __init__(self, source, index, hash_field=None, doc_type='continuum', |
129
|
|
|
query=None, iterable=None, filter_expression="", |
130
|
|
|
vectorized_corpora=None, tokenized_corpora=None, modeled_corpora=None, |
131
|
|
|
**kwargs): |
132
|
|
|
super(ElasticSearchOutput, self).__init__() |
133
|
|
|
self.hosts = source |
|
|
|
|
134
|
|
|
self.instance = Elasticsearch(hosts=source, **kwargs) |
|
|
|
|
135
|
|
|
self.index = index |
|
|
|
|
136
|
|
|
self.doc_type = doc_type |
|
|
|
|
137
|
|
|
self.query = query |
|
|
|
|
138
|
|
|
self.hash_field = hash_field |
|
|
|
|
139
|
|
|
if iterable: |
|
|
|
|
140
|
|
|
self.import_from_iterable(iterable, hash_field) |
141
|
|
|
self.filter_expression = filter_expression |
|
|
|
|
142
|
|
|
|
143
|
|
|
self.tokenized_corpora = tokenized_corpora if tokenized_corpora else \ |
|
|
|
|
144
|
|
|
BaseElasticCorpora(self.instance, self.index, 'tokenized', self.query) |
|
|
|
|
145
|
|
|
self.vectorized_corpora = vectorized_corpora if vectorized_corpora else \ |
|
|
|
|
146
|
|
|
VectorizedElasticCorpora(self.instance, self.index, 'vectorized', self.query) |
147
|
|
|
self.modeled_corpora = modeled_corpora if modeled_corpora else \ |
|
|
|
|
148
|
|
|
ModeledElasticCorpora(self.instance, self.index, "models", self.query) |
149
|
|
|
|
150
|
|
|
|
151
|
|
|
@property |
|
|
|
|
152
|
|
|
def filter_string(self): |
|
|
|
|
153
|
|
|
return self.filter_expression |
|
|
|
|
154
|
|
|
|
155
|
|
|
def import_from_iterable(self, iterable, field_to_hash='text', batch_size=500): |
156
|
|
|
"""Load data into Elasticsearch from iterable. |
157
|
|
|
|
158
|
|
|
iterable: generally a list of dicts, but possibly a list of strings |
159
|
|
|
This is your data. Your dictionary structure defines the schema |
160
|
|
|
of the elasticsearch index. |
161
|
|
|
field_to_hash: string identifier of field to hash for content ID. For |
162
|
|
|
list of dicts, a valid key value in the dictionary is required. For |
163
|
|
|
list of strings, a dictionary with one key, "text" is created and |
164
|
|
|
used. |
165
|
|
|
""" |
166
|
|
|
if field_to_hash: |
|
|
|
|
167
|
|
|
self.hash_field = field_to_hash |
168
|
|
|
batch = [] |
169
|
|
|
for item in iterable: |
|
|
|
|
170
|
|
|
if isinstance(item, basestring): |
|
|
|
|
171
|
|
|
item = {field_to_hash: item} |
172
|
|
|
id = hash(item[field_to_hash]) |
|
|
|
|
173
|
|
|
action = {'_op_type': 'update', |
174
|
|
|
'_index': self.index, |
|
|
|
|
175
|
|
|
'_type': self.doc_type, |
176
|
|
|
'_id': id, |
|
|
|
|
177
|
|
|
'doc': item, |
178
|
|
|
'doc_as_upsert': "true", |
179
|
|
|
} |
180
|
|
|
batch.append(action) |
|
|
|
|
181
|
|
|
if len(batch) >= batch_size: |
|
|
|
|
182
|
|
|
helpers.bulk(client=self.instance, actions=batch, index=self.index) |
183
|
|
|
batch = [] |
184
|
|
|
if batch: |
185
|
|
|
helpers.bulk(client=self.instance, actions=batch, index=self.index) |
186
|
|
|
self.instance.indices.refresh(self.index) |
187
|
|
|
else: |
188
|
|
|
raise ValueError("A field_to_hash is required for import_from_iterable") |
189
|
|
|
|
190
|
|
|
def convert_date_field_and_reindex(self, field): |
|
|
|
|
191
|
|
|
index = self.index |
|
|
|
|
192
|
|
|
if self.instance.indices.get_field_mapping(fields=[field], |
|
|
|
|
193
|
|
|
index=index, |
|
|
|
|
194
|
|
|
doc_type=self.doc_type) != 'date': |
195
|
|
|
index = self.index+"_{}_alias_date".format(field) |
196
|
|
|
if not self.instance.indices.exists(index) or self.instance.indices.get_field_mapping(field=field, |
197
|
|
|
index=index, |
198
|
|
|
doc_type=self.doc_type) != 'date': |
199
|
|
|
mapping = self.instance.indices.get_mapping(index=self.index, |
200
|
|
|
doc_type=self.doc_type) |
201
|
|
|
mapping[self.index]["mappings"][self.doc_type]["properties"][field] = {"type": "date"} |
202
|
|
|
self.instance.indices.put_alias(index=self.index, |
203
|
|
|
name=index, |
204
|
|
|
body=mapping) |
|
|
|
|
205
|
|
|
self.instance.indices.refresh(index) |
206
|
|
|
while self.instance.count(index=self.index) != self.instance.count(index=index): |
207
|
|
|
logging.info("Waiting for date indexed data to be indexed...") |
208
|
|
|
time.sleep(1) |
209
|
|
|
return index |
210
|
|
|
|
211
|
|
|
# TODO: validate input data to ensure that it has valid year data |
|
|
|
|
212
|
|
|
def get_date_filtered_data(self, field_to_get, start, end, filter_field="date"): |
|
|
|
|
213
|
|
|
converted_index = self.convert_date_field_and_reindex(field=filter_field) |
|
|
|
|
214
|
|
|
|
215
|
|
|
results = helpers.scan(self.instance, index=converted_index, |
|
|
|
|
216
|
|
|
doc_type=self.doc_type, query={ |
217
|
|
|
"query": {"filtered": {"filter": {"range": {filter_field: { |
218
|
|
|
"gte": start,"lte": end}}}}}}) |
|
|
|
|
219
|
|
|
for result in results: |
|
|
|
|
220
|
|
|
yield result["_id"], result['_source'][field_to_get] |
|
|
|
|
221
|
|
|
|
222
|
|
|
def get_filtered_data(self, field_to_get, filter=""): |
|
|
|
|
223
|
|
|
results = helpers.scan(self.instance, index=self.index, |
|
|
|
|
224
|
|
|
query=self.query, doc_type=self.doc_type) |
225
|
|
|
for result in results: |
|
|
|
|
226
|
|
|
yield result["_id"], result['_source'][field_to_get] |
|
|
|
|
227
|
|
|
|
228
|
|
|
def save(self, filename, saved_data=None): |
229
|
|
|
if saved_data is None: |
|
|
|
|
230
|
|
|
saved_data = {"source": self.hosts, "index": self.index, "hash_field": self.hash_field, |
|
|
|
|
231
|
|
|
"doc_type": self.doc_type, "query": self.query} |
232
|
|
|
return super(ElasticSearchOutput, self).save(filename, saved_data) |
|
|
|
|
233
|
|
|
|
234
|
|
|
def synchronize(self, max_wait, field): |
235
|
|
|
# TODO: change this to a more general condition for wider use, including read_input |
|
|
|
|
236
|
|
|
# could just pass in a string condition and then 'while not eval(condition)' |
237
|
|
|
count_not_yet_updated = -1 |
238
|
|
|
while count_not_yet_updated != 0: |
|
|
|
|
239
|
|
|
count_not_yet_updated = self.instance.count(index=self.index, |
|
|
|
|
240
|
|
|
doc_type=self.doc_type, |
241
|
|
|
body={"query": { |
242
|
|
|
"constant_score" : { |
243
|
|
|
"filter" : { |
244
|
|
|
"missing" : { |
245
|
|
|
"field" : field}}}}})['count'] |
|
|
|
|
246
|
|
|
logging.debug("Count not yet updated: {}".format(count_not_yet_updated)) |
247
|
|
|
time.sleep(0.01) |
248
|
|
|
pass |
|
|
|
|
249
|
|
|
|
250
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.