ResultDB.projects()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2016-01-18 19:41:24
7
8
9
import time
10
11
import elasticsearch.helpers
12
from elasticsearch import Elasticsearch
13
from pyspider.database.base.resultdb import ResultDB as BaseResultDB
14
15
16
class ResultDB(BaseResultDB):
17
    __type__ = 'result'
18
19 View Code Duplication
    def __init__(self, hosts, index='pyspider'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
20
        self.index = index
21
        self.es = Elasticsearch(hosts=hosts)
22
23
        self.es.indices.create(index=self.index, ignore=400)
24
        if not self.es.indices.get_mapping(index=self.index, doc_type=self.__type__):
25
            self.es.indices.put_mapping(index=self.index, doc_type=self.__type__, body={
26
                "_all": {"enabled": True},
27
                "properties": {
28
                    "taskid": {"enabled": False},
29
                    "project": {"type": "string", "index": "not_analyzed"},
30
                    "url": {"enabled": False},
31
                }
32
            })
33
34
    @property
35
    def projects(self):
36
        ret = self.es.search(index=self.index, doc_type=self.__type__,
37
                             body={"aggs": {"projects": {
38
                                 "terms": {"field": "project"}
39
                             }}}, _source=False)
40
        return [each['key'] for each in ret['aggregations']['projects'].get('buckets', [])]
41
42
    def save(self, project, taskid, url, result):
43
        obj = {
44
            'taskid': taskid,
45
            'project': project,
46
            'url': url,
47
            'result': result,
48
            'updatetime': time.time(),
49
        }
50
        return self.es.index(index=self.index, doc_type=self.__type__,
51
                             body=obj, id='%s:%s' % (project, taskid))
52
53
    def select(self, project, fields=None, offset=0, limit=0):
54
        offset = offset or 0
55
        limit = limit or 0
56
        if not limit:
57
            for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
58
                                                     query={'query': {'term': {'project': project}}},
59
                                                     _source_include=fields or [], from_=offset,
60
                                                     sort="updatetime:desc"):
61
                yield record['_source']
62
        else:
63
            for record in self.es.search(index=self.index, doc_type=self.__type__,
64
                                         body={'query': {'term': {'project': project}}},
65
                                         _source_include=fields or [], from_=offset, size=limit,
66
                                         sort="updatetime:desc"
67
                                         ).get('hits', {}).get('hits', []):
68
                yield record['_source']
69
70
    def count(self, project):
71
        return self.es.count(index=self.index, doc_type=self.__type__,
72
                             body={'query': {'term': {'project': project}}}
73
                             ).get('count', 0)
74
75
    def get(self, project, taskid, fields=None):
76
        ret = self.es.get(index=self.index, doc_type=self.__type__, id="%s:%s" % (project, taskid),
77
                          _source_include=fields or [], ignore=404)
78
        return ret.get('_source', None)
79
80 View Code Duplication
    def drop(self, project):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
81
        self.refresh()
82
        for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
83
                                                 query={'query': {'term': {'project': project}}},
84
                                                 _source=False):
85
            self.es.delete(index=self.index, doc_type=self.__type__, id=record['_id'])
86
87
    def refresh(self):
88
        """
89
        Explicitly refresh one or more index, making all operations
90
        performed since the last refresh available for search.
91
        """
92
        self.es.indices.refresh(index=self.index)
93