1 | #!/usr/bin/env python |
||
2 | # -*- encoding: utf-8 -*- |
||
3 | # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
||
4 | # Author: Binux<[email protected]> |
||
5 | # http://binux.me |
||
6 | # Created on 2014-12-04 18:48:15 |
||
7 | |||
8 | import re |
||
9 | import six |
||
10 | import time |
||
11 | import json |
||
12 | import sqlalchemy.exc |
||
13 | |||
14 | from sqlalchemy import (create_engine, MetaData, Table, Column, |
||
15 | String, Float, LargeBinary) |
||
16 | from sqlalchemy.engine.url import make_url |
||
17 | from pyspider.database.base.resultdb import ResultDB as BaseResultDB |
||
18 | from pyspider.libs import utils |
||
19 | from .sqlalchemybase import SplitTableMixin, result2dict |
||
20 | |||
21 | |||
22 | class ResultDB(SplitTableMixin, BaseResultDB): |
||
23 | __tablename__ = '' |
||
24 | |||
25 | def __init__(self, url): |
||
26 | self.table = Table('__tablename__', MetaData(), |
||
27 | Column('taskid', String(64), primary_key=True, nullable=False), |
||
28 | Column('url', String(1024)), |
||
29 | Column('result', LargeBinary), |
||
30 | Column('updatetime', Float(32)), |
||
31 | mysql_engine='InnoDB', |
||
32 | mysql_charset='utf8' |
||
33 | ) |
||
34 | |||
35 | self.url = make_url(url) |
||
36 | if self.url.database: |
||
37 | database = self.url.database |
||
38 | self.url.database = None |
||
39 | try: |
||
40 | engine = create_engine(self.url, convert_unicode=True, pool_recycle=3600) |
||
41 | conn = engine.connect() |
||
42 | conn.execute("commit") |
||
43 | conn.execute("CREATE DATABASE %s" % database) |
||
44 | except sqlalchemy.exc.SQLAlchemyError: |
||
45 | pass |
||
46 | self.url.database = database |
||
47 | self.engine = create_engine(url, convert_unicode=True, |
||
48 | pool_recycle=3600) |
||
49 | |||
50 | self._list_project() |
||
51 | |||
52 | def _create_project(self, project): |
||
53 | assert re.match(r'^\w+$', project) is not None |
||
54 | if project in self.projects: |
||
55 | return |
||
56 | self.table.name = self._tablename(project) |
||
57 | self.table.create(self.engine) |
||
58 | |||
59 | @staticmethod |
||
60 | def _parse(data): |
||
61 | for key, value in list(six.iteritems(data)): |
||
62 | if isinstance(value, six.binary_type): |
||
63 | data[key] = utils.text(value) |
||
64 | if 'result' in data: |
||
65 | if isinstance(data['result'], bytearray): |
||
66 | data['result'] = str(data['result']) |
||
67 | data['result'] = json.loads(data['result']) |
||
68 | return data |
||
69 | |||
70 | @staticmethod |
||
71 | def _stringify(data): |
||
72 | if 'result' in data: |
||
73 | data['result'] = utils.utf8(json.dumps(data['result'])) |
||
74 | return data |
||
75 | |||
76 | def save(self, project, taskid, url, result): |
||
77 | if project not in self.projects: |
||
78 | self._create_project(project) |
||
79 | self._list_project() |
||
80 | self.table.name = self._tablename(project) |
||
81 | obj = { |
||
82 | 'taskid': taskid, |
||
83 | 'url': url, |
||
84 | 'result': result, |
||
85 | 'updatetime': time.time(), |
||
86 | } |
||
87 | if self.get(project, taskid, ('taskid', )): |
||
88 | del obj['taskid'] |
||
89 | return self.engine.execute(self.table.update() |
||
90 | .where(self.table.c.taskid == taskid) |
||
91 | .values(**self._stringify(obj))) |
||
92 | else: |
||
93 | return self.engine.execute(self.table.insert() |
||
94 | .values(**self._stringify(obj))) |
||
95 | View Code Duplication | ||
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
96 | def select(self, project, fields=None, offset=0, limit=None): |
||
97 | if project not in self.projects: |
||
98 | self._list_project() |
||
99 | if project not in self.projects: |
||
100 | return |
||
101 | self.table.name = self._tablename(project) |
||
102 | |||
103 | columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c |
||
104 | for task in self.engine.execute(self.table.select() |
||
105 | .with_only_columns(columns=columns) |
||
106 | .order_by(self.table.c.updatetime.desc()) |
||
107 | .offset(offset).limit(limit) |
||
108 | .execution_options(autocommit=True)): |
||
109 | yield self._parse(result2dict(columns, task)) |
||
110 | |||
111 | def count(self, project): |
||
112 | if project not in self.projects: |
||
113 | self._list_project() |
||
114 | if project not in self.projects: |
||
115 | return 0 |
||
116 | self.table.name = self._tablename(project) |
||
117 | |||
118 | for count, in self.engine.execute(self.table.count()): |
||
119 | return count |
||
120 | View Code Duplication | ||
0 ignored issues
–
show
|
|||
121 | def get(self, project, taskid, fields=None): |
||
122 | if project not in self.projects: |
||
123 | self._list_project() |
||
124 | if project not in self.projects: |
||
125 | return |
||
126 | self.table.name = self._tablename(project) |
||
127 | |||
128 | columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c |
||
129 | for task in self.engine.execute(self.table.select() |
||
130 | .with_only_columns(columns=columns) |
||
131 | .where(self.table.c.taskid == taskid) |
||
132 | .limit(1)): |
||
133 | return self._parse(result2dict(columns, task)) |
||
134 |