Test Failed
Push — develop ( a80574...5ed66c )
by Dean
02:36
created

db_connect()   B

Complexity

Conditions 6

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 24
rs 7.6129
cc 6
1
from plugin.core.database.wrapper import APSWConnectionWrapper, APSWDatabaseWrapper
2
3
from playhouse.apsw_ext import APSWDatabase
4
import apsw
0 ignored issues
show
Configuration introduced by
The import apsw could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
5
import logging
6
7
BUSY_TIMEOUT = 3000
8
9
log = logging.getLogger(__name__)
10
11
12
def db_connect(path, type, name=None, wrapper=True):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in type.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
13
    # Connect to new database
14
    if type == 'peewee':
15
        # Retrieve database class
16
        cls = APSWDatabaseWrapper if wrapper else APSWDatabase
17
18
        # Construct database
19
        db = cls(path, autorollback=True, journal_mode='WAL', timeout=BUSY_TIMEOUT)
20
    elif type == 'raw':
21
        # Retrieve connection class
22
        cls = APSWConnectionWrapper if wrapper else apsw.Connection
23
24
        # Construct connection
25
        db = cls(path, flags=apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE | apsw.SQLITE_OPEN_WAL)
26
        db.setbusytimeout(BUSY_TIMEOUT)
27
    else:
28
        raise ValueError('Unknown database type: %r' % type)
29
30
    # Set database name
31
    if wrapper:
32
        db.name = name
33
34
    log.debug('Connected to database at %r', path)
35
    return db
36
37
38
def db_connection(database):
39
    if isinstance(database, APSWDatabase):
40
        return database.get_conn()
41
42
    if isinstance(database, apsw.Connection):
43
        return database
44
45
    raise ValueError('Unknown "database" parameter provided')
46