1 | #!/usr/bin/env python |
||
2 | # -*- encoding: utf-8 -*- |
||
3 | # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
||
4 | # Author: Binux<[email protected]> |
||
5 | # http://binux.me |
||
6 | # Created on 2016-01-18 19:41:24 |
||
7 | |||
8 | |||
9 | import time |
||
10 | |||
11 | import elasticsearch.helpers |
||
12 | from elasticsearch import Elasticsearch |
||
13 | from pyspider.database.base.resultdb import ResultDB as BaseResultDB |
||
14 | |||
15 | |||
16 | class ResultDB(BaseResultDB): |
||
17 | __type__ = 'result' |
||
18 | |||
19 | View Code Duplication | def __init__(self, hosts, index='pyspider'): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
20 | self.index = index |
||
21 | self.es = Elasticsearch(hosts=hosts) |
||
22 | |||
23 | self.es.indices.create(index=self.index, ignore=400) |
||
24 | if not self.es.indices.get_mapping(index=self.index, doc_type=self.__type__): |
||
25 | self.es.indices.put_mapping(index=self.index, doc_type=self.__type__, body={ |
||
26 | "_all": {"enabled": True}, |
||
27 | "properties": { |
||
28 | "taskid": {"enabled": False}, |
||
29 | "project": {"type": "string", "index": "not_analyzed"}, |
||
30 | "url": {"enabled": False}, |
||
31 | } |
||
32 | }) |
||
33 | |||
34 | @property |
||
35 | def projects(self): |
||
36 | ret = self.es.search(index=self.index, doc_type=self.__type__, |
||
37 | body={"aggs": {"projects": { |
||
38 | "terms": {"field": "project"} |
||
39 | }}}, _source=False) |
||
40 | return [each['key'] for each in ret['aggregations']['projects'].get('buckets', [])] |
||
41 | |||
42 | def save(self, project, taskid, url, result): |
||
43 | obj = { |
||
44 | 'taskid': taskid, |
||
45 | 'project': project, |
||
46 | 'url': url, |
||
47 | 'result': result, |
||
48 | 'updatetime': time.time(), |
||
49 | } |
||
50 | return self.es.index(index=self.index, doc_type=self.__type__, |
||
51 | body=obj, id='%s:%s' % (project, taskid)) |
||
52 | |||
53 | def select(self, project, fields=None, offset=0, limit=0): |
||
54 | offset = offset or 0 |
||
55 | limit = limit or 0 |
||
56 | if not limit: |
||
57 | for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__, |
||
58 | query={'query': {'term': {'project': project}}}, |
||
59 | _source_include=fields or [], from_=offset, |
||
60 | sort="updatetime:desc"): |
||
61 | yield record['_source'] |
||
62 | else: |
||
63 | for record in self.es.search(index=self.index, doc_type=self.__type__, |
||
64 | body={'query': {'term': {'project': project}}}, |
||
65 | _source_include=fields or [], from_=offset, size=limit, |
||
66 | sort="updatetime:desc" |
||
67 | ).get('hits', {}).get('hits', []): |
||
68 | yield record['_source'] |
||
69 | |||
70 | def count(self, project): |
||
71 | return self.es.count(index=self.index, doc_type=self.__type__, |
||
72 | body={'query': {'term': {'project': project}}} |
||
73 | ).get('count', 0) |
||
74 | |||
75 | def get(self, project, taskid, fields=None): |
||
76 | ret = self.es.get(index=self.index, doc_type=self.__type__, id="%s:%s" % (project, taskid), |
||
77 | _source_include=fields or [], ignore=404) |
||
78 | return ret.get('_source', None) |
||
79 | |||
80 | View Code Duplication | def drop(self, project): |
|
0 ignored issues
–
show
|
|||
81 | self.refresh() |
||
82 | for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__, |
||
83 | query={'query': {'term': {'project': project}}}, |
||
84 | _source=False): |
||
85 | self.es.delete(index=self.index, doc_type=self.__type__, id=record['_id']) |
||
86 | |||
87 | def refresh(self): |
||
88 | """ |
||
89 | Explicitly refresh one or more index, making all operations |
||
90 | performed since the last refresh available for search. |
||
91 | """ |
||
92 | self.es.indices.refresh(index=self.index) |
||
93 |