1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- encoding: utf-8 -*- |
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
4
|
|
|
# Author: Binux<[email protected]> |
5
|
|
|
# http://binux.me |
6
|
|
|
# Created on 2016-01-18 19:41:24 |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
import time |
10
|
|
|
|
11
|
|
|
import elasticsearch.helpers |
12
|
|
|
from elasticsearch import Elasticsearch |
13
|
|
|
from pyspider.database.base.resultdb import ResultDB as BaseResultDB |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
class ResultDB(BaseResultDB): |
17
|
|
|
__type__ = 'result' |
18
|
|
|
|
19
|
|
View Code Duplication |
def __init__(self, hosts, index='pyspider'): |
|
|
|
|
20
|
|
|
self.index = index |
21
|
|
|
self.es = Elasticsearch(hosts=hosts) |
22
|
|
|
|
23
|
|
|
self.es.indices.create(index=self.index, ignore=400) |
24
|
|
|
if not self.es.indices.get_mapping(index=self.index, doc_type=self.__type__): |
25
|
|
|
self.es.indices.put_mapping(index=self.index, doc_type=self.__type__, body={ |
26
|
|
|
"_all": {"enabled": True}, |
27
|
|
|
"properties": { |
28
|
|
|
"taskid": {"enabled": False}, |
29
|
|
|
"project": {"type": "string", "index": "not_analyzed"}, |
30
|
|
|
"url": {"enabled": False}, |
31
|
|
|
} |
32
|
|
|
}) |
33
|
|
|
|
34
|
|
|
@property |
35
|
|
|
def projects(self): |
36
|
|
|
ret = self.es.search(index=self.index, doc_type=self.__type__, |
37
|
|
|
body={"aggs": {"projects": { |
38
|
|
|
"terms": {"field": "project"} |
39
|
|
|
}}}, _source=False) |
40
|
|
|
return [each['key'] for each in ret['aggregations']['projects'].get('buckets', [])] |
41
|
|
|
|
42
|
|
|
def save(self, project, taskid, url, result): |
43
|
|
|
obj = { |
44
|
|
|
'taskid': taskid, |
45
|
|
|
'project': project, |
46
|
|
|
'url': url, |
47
|
|
|
'result': result, |
48
|
|
|
'updatetime': time.time(), |
49
|
|
|
} |
50
|
|
|
return self.es.index(index=self.index, doc_type=self.__type__, |
51
|
|
|
body=obj, id='%s:%s' % (project, taskid)) |
52
|
|
|
|
53
|
|
|
def select(self, project, fields=None, offset=0, limit=0): |
54
|
|
|
offset = offset or 0 |
55
|
|
|
limit = limit or 0 |
56
|
|
|
if not limit: |
57
|
|
|
for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__, |
58
|
|
|
query={'query': {'term': {'project': project}}}, |
59
|
|
|
_source_include=fields or [], from_=offset, |
60
|
|
|
sort="updatetime:desc"): |
61
|
|
|
yield record['_source'] |
62
|
|
|
else: |
63
|
|
|
for record in self.es.search(index=self.index, doc_type=self.__type__, |
64
|
|
|
body={'query': {'term': {'project': project}}}, |
65
|
|
|
_source_include=fields or [], from_=offset, size=limit, |
66
|
|
|
sort="updatetime:desc" |
67
|
|
|
).get('hits', {}).get('hits', []): |
68
|
|
|
yield record['_source'] |
69
|
|
|
|
70
|
|
|
def count(self, project): |
71
|
|
|
return self.es.count(index=self.index, doc_type=self.__type__, |
72
|
|
|
body={'query': {'term': {'project': project}}} |
73
|
|
|
).get('count', 0) |
74
|
|
|
|
75
|
|
|
def get(self, project, taskid, fields=None): |
76
|
|
|
ret = self.es.get(index=self.index, doc_type=self.__type__, id="%s:%s" % (project, taskid), |
77
|
|
|
_source_include=fields or [], ignore=404) |
78
|
|
|
return ret.get('_source', None) |
79
|
|
|
|
80
|
|
View Code Duplication |
def drop(self, project): |
|
|
|
|
81
|
|
|
self.refresh() |
82
|
|
|
for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__, |
83
|
|
|
query={'query': {'term': {'project': project}}}, |
84
|
|
|
_source=False): |
85
|
|
|
self.es.delete(index=self.index, doc_type=self.__type__, id=record['_id']) |
86
|
|
|
|
87
|
|
|
def refresh(self): |
88
|
|
|
""" |
89
|
|
|
Explicitly refresh one or more index, making all operations |
90
|
|
|
performed since the last refresh available for search. |
91
|
|
|
""" |
92
|
|
|
self.es.indices.refresh(index=self.index) |
93
|
|
|
|