Issues (47)

pyspider/database/sqlalchemy/resultdb.py (2 issues)

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-12-04 18:48:15
7
8
import re
9
import six
10
import time
11
import json
12
import sqlalchemy.exc
13
14
from sqlalchemy import (create_engine, MetaData, Table, Column,
15
                        String, Float, LargeBinary)
16
from sqlalchemy.engine.url import make_url
17
from pyspider.database.base.resultdb import ResultDB as BaseResultDB
18
from pyspider.libs import utils
19
from .sqlalchemybase import SplitTableMixin, result2dict
20
21
22
class ResultDB(SplitTableMixin, BaseResultDB):
23
    __tablename__ = ''
24
25
    def __init__(self, url):
26
        self.table = Table('__tablename__', MetaData(),
27
                           Column('taskid', String(64), primary_key=True, nullable=False),
28
                           Column('url', String(1024)),
29
                           Column('result', LargeBinary),
30
                           Column('updatetime', Float(32)),
31
                           mysql_engine='InnoDB',
32
                           mysql_charset='utf8'
33
                           )
34
35
        self.url = make_url(url)
36
        if self.url.database:
37
            database = self.url.database
38
            self.url.database = None
39
            try:
40
                engine = create_engine(self.url, convert_unicode=True, pool_recycle=3600)
41
                conn = engine.connect()
42
                conn.execute("commit")
43
                conn.execute("CREATE DATABASE %s" % database)
44
            except sqlalchemy.exc.SQLAlchemyError:
45
                pass
46
            self.url.database = database
47
        self.engine = create_engine(url, convert_unicode=True,
48
                                    pool_recycle=3600)
49
50
        self._list_project()
51
52
    def _create_project(self, project):
53
        assert re.match(r'^\w+$', project) is not None
54
        if project in self.projects:
55
            return
56
        self.table.name = self._tablename(project)
57
        self.table.create(self.engine)
58
59
    @staticmethod
60
    def _parse(data):
61
        for key, value in list(six.iteritems(data)):
62
            if isinstance(value, six.binary_type):
63
                data[key] = utils.text(value)
64
        if 'result' in data:
65
            if isinstance(data['result'], bytearray):
66
                data['result'] = str(data['result'])
67
            data['result'] = json.loads(data['result'])
68
        return data
69
70
    @staticmethod
71
    def _stringify(data):
72
        if 'result' in data:
73
            data['result'] = utils.utf8(json.dumps(data['result']))
74
        return data
75
76
    def save(self, project, taskid, url, result):
77
        if project not in self.projects:
78
            self._create_project(project)
79
            self._list_project()
80
        self.table.name = self._tablename(project)
81
        obj = {
82
            'taskid': taskid,
83
            'url': url,
84
            'result': result,
85
            'updatetime': time.time(),
86
        }
87
        if self.get(project, taskid, ('taskid', )):
88
            del obj['taskid']
89
            return self.engine.execute(self.table.update()
90
                                       .where(self.table.c.taskid == taskid)
91
                                       .values(**self._stringify(obj)))
92
        else:
93
            return self.engine.execute(self.table.insert()
94
                                       .values(**self._stringify(obj)))
95 View Code Duplication
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
96
    def select(self, project, fields=None, offset=0, limit=None):
97
        if project not in self.projects:
98
            self._list_project()
99
        if project not in self.projects:
100
            return
101
        self.table.name = self._tablename(project)
102
103
        columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c
104
        for task in self.engine.execute(self.table.select()
105
                                        .with_only_columns(columns=columns)
106
                                        .order_by(self.table.c.updatetime.desc())
107
                                        .offset(offset).limit(limit)
108
                                        .execution_options(autocommit=True)):
109
            yield self._parse(result2dict(columns, task))
110
111
    def count(self, project):
112
        if project not in self.projects:
113
            self._list_project()
114
        if project not in self.projects:
115
            return 0
116
        self.table.name = self._tablename(project)
117
118
        for count, in self.engine.execute(self.table.count()):
119
            return count
120 View Code Duplication
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
121
    def get(self, project, taskid, fields=None):
122
        if project not in self.projects:
123
            self._list_project()
124
        if project not in self.projects:
125
            return
126
        self.table.name = self._tablename(project)
127
128
        columns = [getattr(self.table.c, f, f) for f in fields] if fields else self.table.c
129
        for task in self.engine.execute(self.table.select()
130
                                        .with_only_columns(columns=columns)
131
                                        .where(self.table.c.taskid == taskid)
132
                                        .limit(1)):
133
            return self._parse(result2dict(columns, task))
134