Completed
Push — master ( 4d0741...57a274 )
by Roy
01:06
created

pyspider.database.elasticsearch.TaskDB   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 108
Duplicated Lines 0 %
Metric Value
dl 0
loc 108
rs 10
wmc 26

11 Methods

Rating   Name   Duplication   Size   Complexity  
A insert() 0 8 1
A drop() 0 7 2
A refresh() 0 7 1
A _stringify() 0 5 3
A get_task() 0 6 2
B load_tasks() 0 13 5
B _parse() 0 10 5
A update() 0 7 1
A status_count() 0 11 2
A __init__() 0 12 2
A projects() 0 7 2
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2016-01-20 20:20:55
7
8
9
import time
10
import json
11
12
import elasticsearch.helpers
13
from elasticsearch import Elasticsearch
14
from pyspider.database.base.taskdb import TaskDB as BaseTaskDB
15
16
17
class TaskDB(BaseTaskDB):
18
    __type__ = 'task'
19
20
    def __init__(self, hosts, index='pyspider'):
21
        self.index = index
22
        self._changed = False
23
        self.es = Elasticsearch(hosts=hosts)
24
25
        self.es.indices.create(index=self.index, ignore=400)
26
        if not self.es.indices.get_mapping(index=self.index, doc_type=self.__type__):
27
            self.es.indices.put_mapping(index=self.index, doc_type=self.__type__, body={
28
                "_all": {"enabled": False},
29
                "properties": {
30
                    "project": {"type": "string", "index": "not_analyzed"},
31
                    "status": {"type": "byte"},
32
                }
33
            })
34
35
    def _parse(self, data):
36
        if not data:
37
            return data
38
        for each in ('schedule', 'fetch', 'process', 'track'):
39
            if each in data:
40
                if data[each]:
41
                    data[each] = json.loads(data[each])
42
                else:
43
                    data[each] = {}
44
        return data
45
46
    def _stringify(self, data):
47
        for each in ('schedule', 'fetch', 'process', 'track'):
48
            if each in data:
49
                data[each] = json.dumps(data[each])
50
        return data
51
52
    @property
53
    def projects(self):
54
        ret = self.es.search(index=self.index, doc_type=self.__type__,
55
                             body={"aggs": {"projects": {
56
                                 "terms": {"field": "project"}
57
                             }}}, _source=False)
58
        return [each['key'] for each in ret['aggregations']['projects'].get('buckets', [])]
59
60
    def load_tasks(self, status, project=None, fields=None):
61
        self.refresh()
62
        if project is None:
63
            for project in self.projects:
64
                for each in self.load_tasks(status, project, fields):
65
                    yield each
66
        else:
67
            for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
68
                                                     query={'query': {'bool': {
69
                                                         'must': {'term': {'project': project}},
70
                                                         'filter': {'term': {'status': status}},
71
                                                     }}}, _source_include=fields or []):
72
                yield self._parse(record['_source'])
73
74
    def get_task(self, project, taskid, fields=None):
75
        if self._changed:
76
            self.refresh()
77
        ret = self.es.get(index=self.index, doc_type=self.__type__, id="%s:%s" % (project, taskid),
78
                          _source_include=fields or [], ignore=404)
79
        return self._parse(ret.get('_source', None))
80
81
    def status_count(self, project):
82
        self.refresh()
83
        ret = self.es.search(index=self.index, doc_type=self.__type__,
84
                             body={"query": {'term': {'project': project}},
85
                                   "aggs": {"status": {
86
                                       "terms": {"field": "status"}
87
                                   }}}, _source=False)
88
        result = {}
89
        for each in ret['aggregations']['status'].get('buckets', []):
90
            result[each['key']] = each['doc_count']
91
        return result
92
93
    def insert(self, project, taskid, obj={}):
94
        self._changed = True
95
        obj = dict(obj)
96
        obj['taskid'] = taskid
97
        obj['project'] = project
98
        obj['updatetime'] = time.time()
99
        return self.es.index(index=self.index, doc_type=self.__type__,
100
                             body=self._stringify(obj), id='%s:%s' % (project, taskid))
101
102
    def update(self, project, taskid, obj={}, **kwargs):
103
        self._changed = True
104
        obj = dict(obj)
105
        obj.update(kwargs)
106
        obj['updatetime'] = time.time()
107
        return self.es.update(index=self.index, doc_type=self.__type__, id='%s:%s' % (project, taskid),
108
                              body={"doc": self._stringify(obj)}, ignore=404)
109
110
    def drop(self, project):
111
        self.refresh()
112
        for record in elasticsearch.helpers.scan(self.es, index=self.index, doc_type=self.__type__,
113
                                                 query={'query': {'term': {'project': project}}},
114
                                                 _source=False):
115
            self.es.delete(index=self.index, doc_type=self.__type__, id=record['_id'])
116
        self.refresh()
117
118
    def refresh(self):
119
        """
120
        Explicitly refresh one or more index, making all operations
121
        performed since the last refresh available for search.
122
        """
123
        self._changed = False
124
        self.es.indices.refresh(index=self.index)
125