Completed
Push — master ( ebb800...87337e )
by Roy
27s
created

TaskDB._count_for_status()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-10-11 23:54:50
7
8
import json
9
import time
10
11
from pymongo import MongoClient
12
13
from pyspider.database.base.taskdb import TaskDB as BaseTaskDB
14
from .mongodbbase import SplitTableMixin
15
16
17
class TaskDB(SplitTableMixin, BaseTaskDB):
18
    collection_prefix = ''
19
20
    def __init__(self, url, database='taskdb'):
21
        self.conn = MongoClient(url)
22
        self.conn.admin.command("ismaster")
23
        self.database = self.conn[database]
24
        self.projects = set()
25
26
        self._list_project()
27
        # we suggest manually build index in advance, instead of indexing
28
        #  in the startup process,
29
        # for project in self.projects:
30
        #     collection_name = self._collection_name(project)
31
        #     self.database[collection_name].ensure_index('status')
32
        #     self.database[collection_name].ensure_index('taskid')
33
34
    def _create_project(self, project):
35
        collection_name = self._collection_name(project)
36
        self.database[collection_name].ensure_index('status')
37 View Code Duplication
        self.database[collection_name].ensure_index('taskid')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
38
        self._list_project()
39
40
    def _parse(self, data):
41
        if '_id' in data:
42
            del data['_id']
43
        for each in ('schedule', 'fetch', 'process', 'track'):
44
            if each in data:
45
                if data[each]:
46
                    if isinstance(data[each], bytearray):
47
                        data[each] = str(data[each])
48
                    data[each] = json.loads(data[each], encoding='utf8')
49
                else:
50
                    data[each] = {}
51
        return data
52
53
    def _stringify(self, data):
54
        for each in ('schedule', 'fetch', 'process', 'track'):
55
            if each in data:
56
                data[each] = json.dumps(data[each])
57
        return data
58
59
    def load_tasks(self, status, project=None, fields=None):
60
        if not project:
61
            self._list_project()
62
63
        if project:
64
            projects = [project, ]
65
        else:
66
            projects = self.projects
67
68
        for project in projects:
69
            collection_name = self._collection_name(project)
70
            for task in self.database[collection_name].find({'status': status}, fields):
71
                yield self._parse(task)
72
73
    def get_task(self, project, taskid, fields=None):
74
        if project not in self.projects:
75
            self._list_project()
76
        if project not in self.projects:
77
            return
78
        collection_name = self._collection_name(project)
79
        ret = self.database[collection_name].find_one({'taskid': taskid}, fields)
80
        if not ret:
81
            return ret
82
        return self._parse(ret)
83
84
    def status_count(self, project):
85
        if project not in self.projects:
86
            self._list_project()
87
        if project not in self.projects:
88
            return {}
89
        collection_name = self._collection_name(project)
90
91
        # when there are too many data in task collection , aggregate operation will take a very long time,
92
        #  and this will cause scheduler module startup to be particularly slow
93
94
        # ret = self.database[collection_name].aggregate([
95
        #     {'$group': {
96
        #         '_id'  : '$status',
97
        #         'total': {
98
        #             '$sum': 1
99
        #         }
100
        #     }
101
        #     }])
102
103
        # Instead of aggregate, use find-count on status(with index) field.
104
        def _count_for_status(collection, status):
105
            total = collection.find({'status': status}).count()
106
            return {'total': total, "_id": status} if total else None
107
108
        c = self.database[collection_name]
109
        ret = filter(
110
            lambda x: x,
111
            map(
112
                lambda s: _count_for_status(c, s), [self.ACTIVE, self.SUCCESS, self.FAILED]
113
            )
114
        )
115
116
        result = {}
117
        if isinstance(ret, dict):
118
            ret = ret.get('result', [])
119
        for each in ret:
120
            result[each['_id']] = each['total']
121
        return result
122
123
    def insert(self, project, taskid, obj={}):
124
        if project not in self.projects:
125
            self._create_project(project)
126
        obj = dict(obj)
127
        obj['taskid'] = taskid
128
        obj['project'] = project
129
        obj['updatetime'] = time.time()
130
        return self.update(project, taskid, obj=obj)
131
132
    def update(self, project, taskid, obj={}, **kwargs):
133
        obj = dict(obj)
134
        obj.update(kwargs)
135
        obj['updatetime'] = time.time()
136
        collection_name = self._collection_name(project)
137
        return self.database[collection_name].update(
138
            {'taskid': taskid},
139
            {"$set": self._stringify(obj)},
140
            upsert=True
141
        )
142