Total Complexity | 40 |
Total Lines | 138 |
Duplicated Lines | 0 % |
Complex classes like pyspider.database.sqlalchemy.TaskDB often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
22 | class TaskDB(SplitTableMixin, BaseTaskDB): |
||
23 | __tablename__ = '' |
||
24 | |||
25 | View Code Duplication | def __init__(self, url): |
|
|
|||
26 | self.table = Table('__tablename__', MetaData(), |
||
27 | Column('taskid', String(64), primary_key=True, nullable=False), |
||
28 | Column('project', String(64)), |
||
29 | Column('url', String(1024)), |
||
30 | Column('status', Integer), |
||
31 | Column('schedule', LargeBinary), |
||
32 | Column('fetch', LargeBinary), |
||
33 | Column('process', LargeBinary), |
||
34 | Column('track', LargeBinary), |
||
35 | Column('lastcrawltime', Float(32)), |
||
36 | Column('updatetime', Float(32)), |
||
37 | mysql_engine='InnoDB', |
||
38 | mysql_charset='utf8' |
||
39 | ) |
||
40 | |||
41 | self.url = make_url(url) |
||
42 | if self.url.database: |
||
43 | database = self.url.database |
||
44 | self.url.database = None |
||
45 | try: |
||
46 | engine = create_engine(self.url) |
||
47 | conn = engine.connect() |
||
48 | conn.execute("commit") |
||
49 | conn.execute("CREATE DATABASE %s" % database) |
||
50 | except sqlalchemy.exc.SQLAlchemyError: |
||
51 | pass |
||
52 | self.url.database = database |
||
53 | self.engine = create_engine(url) |
||
54 | |||
55 | self._list_project() |
||
56 | |||
57 | def _create_project(self, project): |
||
58 | assert re.match(r'^\w+$', project) is not None |
||
59 | if project in self.projects: |
||
60 | return |
||
61 | self.table.name = self._tablename(project) |
||
62 | Index('status_%s_index' % self.table.name, self.table.c.status) |
||
63 | self.table.create(self.engine, checkfirst=True) |
||
64 | self.table.indexes.clear() |
||
65 | |||
66 | View Code Duplication | @staticmethod |
|
67 | def _parse(data): |
||
68 | for key, value in list(six.iteritems(data)): |
||
69 | if isinstance(value, six.binary_type): |
||
70 | data[key] = utils.text(value) |
||
71 | for each in ('schedule', 'fetch', 'process', 'track'): |
||
72 | if each in data: |
||
73 | if data[each]: |
||
74 | if isinstance(data[each], bytearray): |
||
75 | data[each] = str(data[each]) |
||
76 | data[each] = json.loads(data[each]) |
||
77 | else: |
||
78 | data[each] = {} |
||
79 | return data |
||
80 | |||
81 | @staticmethod |
||
82 | def _stringify(data): |
||
83 | for each in ('schedule', 'fetch', 'process', 'track'): |
||
84 | if each in data: |
||
85 | data[each] = utils.utf8(json.dumps(data[each])) |
||
86 | return data |
||
87 | |||
88 | def load_tasks(self, status, project=None, fields=None): |
||
89 | if project and project not in self.projects: |
||
90 | return |
||
91 | |||
92 | if project: |
||
93 | projects = [project, ] |
||
94 | else: |
||
95 | projects = self.projects |
||
96 | |||
97 | columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c |
||
98 | for project in projects: |
||
99 | self.table.name = self._tablename(project) |
||
100 | for task in self.engine.execute(self.table.select() |
||
101 | .with_only_columns(columns) |
||
102 | .where(self.table.c.status == status)): |
||
103 | yield self._parse(result2dict(columns, task)) |
||
104 | |||
105 | View Code Duplication | def get_task(self, project, taskid, fields=None): |
|
106 | if project not in self.projects: |
||
107 | self._list_project() |
||
108 | if project not in self.projects: |
||
109 | return None |
||
110 | |||
111 | self.table.name = self._tablename(project) |
||
112 | columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c |
||
113 | for each in self.engine.execute(self.table.select() |
||
114 | .with_only_columns(columns) |
||
115 | .limit(1) |
||
116 | .where(self.table.c.taskid == taskid)): |
||
117 | return self._parse(result2dict(columns, each)) |
||
118 | |||
119 | def status_count(self, project): |
||
120 | result = dict() |
||
121 | if project not in self.projects: |
||
122 | self._list_project() |
||
123 | if project not in self.projects: |
||
124 | return result |
||
125 | |||
126 | self.table.name = self._tablename(project) |
||
127 | for status, count in self.engine.execute( |
||
128 | self.table.select() |
||
129 | .with_only_columns((self.table.c.status, func.count(1))) |
||
130 | .group_by(self.table.c.status)): |
||
131 | result[status] = count |
||
132 | return result |
||
133 | |||
134 | def insert(self, project, taskid, obj={}): |
||
135 | if project not in self.projects: |
||
136 | self._list_project() |
||
137 | if project not in self.projects: |
||
138 | self._create_project(project) |
||
139 | self._list_project() |
||
140 | obj = dict(obj) |
||
141 | obj['taskid'] = taskid |
||
142 | obj['project'] = project |
||
143 | obj['updatetime'] = time.time() |
||
144 | self.table.name = self._tablename(project) |
||
145 | return self.engine.execute(self.table.insert() |
||
146 | .values(**self._stringify(obj))) |
||
147 | |||
148 | def update(self, project, taskid, obj={}, **kwargs): |
||
149 | if project not in self.projects: |
||
150 | self._list_project() |
||
151 | if project not in self.projects: |
||
152 | raise LookupError |
||
153 | self.table.name = self._tablename(project) |
||
154 | obj = dict(obj) |
||
155 | obj.update(kwargs) |
||
156 | obj['updatetime'] = time.time() |
||
157 | return self.engine.execute(self.table.update() |
||
158 | .where(self.table.c.taskid == taskid) |
||
159 | .values(**self._stringify(obj))) |
||
160 |