Completed
Push — master ( a387ce...6273fb )
by Roy
01:15
created

pyspider.database.elasticsearch.ResultDB.count()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 4
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2016-01-18 19:41:24
7
8
9
import time
10
11
import elasticsearch.helpers
12
from elasticsearch import Elasticsearch
13
from pyspider.database.base.resultdb import ResultDB as BaseResultDB
14
15
16
class ResultDB(BaseResultDB):
17
    __type__ = 'result'
18
19
    def __init__(self, hosts, index='pyspider'):
20
        self.index = index
21
        self.es = Elasticsearch(hosts=hosts)
22
23
        self.es.indices.create(index=self.index, ignore=400)
24
        if not self.es.indices.get_mapping(index=self.index, doc_type=self.__type__):
25
            self.es.indices.put_mapping(index=self.index, doc_type=self.__type__, body={
26
                "_all": {"enabled": True},
27
                "properties": {
28
                    "taskid": {"enabled": False},
29
                    "project": {"type": "string", "index": "not_analyzed"},
30
                    "url": {"enabled": False},
31
                }
32
            })
33
34
    def save(self, project, taskid, url, result):
35
        obj = {
36
            'taskid': taskid,
37
            'project': project,
38
            'url': url,
39
            'result': result,
40
            'updatetime': time.time(),
41
        }
42
        return self.es.index(index=self.index, doc_type=self.__type__,
43
                             body=obj, id='%s:%s' % (project, taskid))
44
45
    def select(self, project, fields=None, offset=0, limit=0):
46
        if not limit:
47
            for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
48
                                                     query={'query': {'term': {'project': project}}},
49
                                                     _source_include=fields or [], from_=offset,
50
                                                     sort="updatetime:desc"):
51
                yield record['_source']
52
        else:
53
            for record in self.es.search(index=self.index, doc_type=self.__type__,
54
                                         body={'query': {'term': {'project': project}}},
55
                                         _source_include=fields or [], from_=offset, size=limit,
56
                                         sort="updatetime:desc"
57
                                         ).get('hits', {}).get('hits', []):
58
                yield record['_source']
59
60
    def count(self, project):
61
        return self.es.count(index=self.index, doc_type=self.__type__,
62
                             body={'query': {'term': {'project': project}}}
63
                             ).get('count', 0)
64
65
    def get(self, project, taskid, fields=None):
66
        ret = self.es.get(index=self.index, doc_type=self.__type__, id="%s:%s" % (project, taskid),
67
                          _source_include=fields or [], ignore=404)
68
        return ret.get('_source', None)
69
70
    def drop(self, project):
71
        self.refresh()
72
        for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
73
                                                 query={'query': {'term': {'project': project}}},
74
                                                 _source=False):
75
            self.es.delete(index=self.index, doc_type=self.__type__, id=record['_id'])
76
77
    def refresh(self):
78
        """
79
        Explicitly refresh one or more index, making all operations
80
        performed since the last refresh available for search.
81
        """
82
        self.es.indices.refresh(index=self.index)
83