Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

pyspider.database.sqlalchemy.TaskDB   B

Complexity

Total Complexity 40

Size/Duplication

Total Lines 138
Duplicated Lines 0 %
Metric Value
dl 0
loc 138
rs 8.2608
wmc 40

How to fix   Complexity   

Complex Class

Complex classes like pyspider.database.sqlalchemy.TaskDB often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-12-04 22:33:43
7
8
import re
9
import six
10
import time
11
import json
12
import sqlalchemy.exc
13
14
from sqlalchemy import (create_engine, MetaData, Table, Column, Index,
15
                        Integer, String, Float, LargeBinary, func)
16
from sqlalchemy.engine.url import make_url
17
from pyspider.libs import utils
18
from pyspider.database.base.taskdb import TaskDB as BaseTaskDB
19
from .sqlalchemybase import SplitTableMixin, result2dict
20
21
22
class TaskDB(SplitTableMixin, BaseTaskDB):
23
    __tablename__ = ''
24
25 View Code Duplication
    def __init__(self, url):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26
        self.table = Table('__tablename__', MetaData(),
27
                           Column('taskid', String(64), primary_key=True, nullable=False),
28
                           Column('project', String(64)),
29
                           Column('url', String(1024)),
30
                           Column('status', Integer),
31
                           Column('schedule', LargeBinary),
32
                           Column('fetch', LargeBinary),
33
                           Column('process', LargeBinary),
34
                           Column('track', LargeBinary),
35
                           Column('lastcrawltime', Float(32)),
36
                           Column('updatetime', Float(32)),
37
                           mysql_engine='InnoDB',
38
                           mysql_charset='utf8'
39
                           )
40
41
        self.url = make_url(url)
42
        if self.url.database:
43
            database = self.url.database
44
            self.url.database = None
45
            try:
46
                engine = create_engine(self.url)
47
                conn = engine.connect()
48
                conn.execute("commit")
49
                conn.execute("CREATE DATABASE %s" % database)
50
            except sqlalchemy.exc.SQLAlchemyError:
51
                pass
52
            self.url.database = database
53
        self.engine = create_engine(url)
54
55
        self._list_project()
56
57
    def _create_project(self, project):
58
        assert re.match(r'^\w+$', project) is not None
59
        if project in self.projects:
60
            return
61
        self.table.name = self._tablename(project)
62
        Index('status_%s_index' % self.table.name, self.table.c.status)
63
        self.table.create(self.engine, checkfirst=True)
64
        self.table.indexes.clear()
65
66 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
67
    def _parse(data):
68
        for key, value in list(six.iteritems(data)):
69
            if isinstance(value, six.binary_type):
70
                data[key] = utils.text(value)
71
        for each in ('schedule', 'fetch', 'process', 'track'):
72
            if each in data:
73
                if data[each]:
74
                    if isinstance(data[each], bytearray):
75
                        data[each] = str(data[each])
76
                    data[each] = json.loads(data[each])
77
                else:
78
                    data[each] = {}
79
        return data
80
81
    @staticmethod
82
    def _stringify(data):
83
        for each in ('schedule', 'fetch', 'process', 'track'):
84
            if each in data:
85
                data[each] = utils.utf8(json.dumps(data[each]))
86
        return data
87
88
    def load_tasks(self, status, project=None, fields=None):
89
        if project and project not in self.projects:
90
            return
91
92
        if project:
93
            projects = [project, ]
94
        else:
95
            projects = self.projects
96
97
        columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c
98
        for project in projects:
99
            self.table.name = self._tablename(project)
100
            for task in self.engine.execute(self.table.select()
101
                                            .with_only_columns(columns)
102
                                            .where(self.table.c.status == status)):
103
                yield self._parse(result2dict(columns, task))
104
105 View Code Duplication
    def get_task(self, project, taskid, fields=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106
        if project not in self.projects:
107
            self._list_project()
108
        if project not in self.projects:
109
            return None
110
111
        self.table.name = self._tablename(project)
112
        columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c
113
        for each in self.engine.execute(self.table.select()
114
                                        .with_only_columns(columns)
115
                                        .limit(1)
116
                                        .where(self.table.c.taskid == taskid)):
117
            return self._parse(result2dict(columns, each))
118
119
    def status_count(self, project):
120
        result = dict()
121
        if project not in self.projects:
122
            self._list_project()
123
        if project not in self.projects:
124
            return result
125
126
        self.table.name = self._tablename(project)
127
        for status, count in self.engine.execute(
128
                self.table.select()
129
                .with_only_columns((self.table.c.status, func.count(1)))
130
                .group_by(self.table.c.status)):
131
            result[status] = count
132
        return result
133
134
    def insert(self, project, taskid, obj={}):
135
        if project not in self.projects:
136
            self._list_project()
137
        if project not in self.projects:
138
            self._create_project(project)
139
            self._list_project()
140
        obj = dict(obj)
141
        obj['taskid'] = taskid
142
        obj['project'] = project
143
        obj['updatetime'] = time.time()
144
        self.table.name = self._tablename(project)
145
        return self.engine.execute(self.table.insert()
146
                                   .values(**self._stringify(obj)))
147
148
    def update(self, project, taskid, obj={}, **kwargs):
149
        if project not in self.projects:
150
            self._list_project()
151
        if project not in self.projects:
152
            raise LookupError
153
        self.table.name = self._tablename(project)
154
        obj = dict(obj)
155
        obj.update(kwargs)
156
        obj['updatetime'] = time.time()
157
        return self.engine.execute(self.table.update()
158
                                   .where(self.table.c.taskid == taskid)
159
                                   .values(**self._stringify(obj)))
160