Passed
Pull Request — master (#1621)
by dgw
01:45
created

test_db.test_set_plugin_value()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# coding=utf-8
2
"""Tests for the new database functionality.
3
4
TODO: Most of these tests assume functionality tested in other tests. This is
5
enough to get everything working (and is better than nothing), but best
6
practice would probably be not to do that."""
7
from __future__ import unicode_literals, absolute_import, print_function, division
8
9
import json
10
import os
11
import sqlite3
12
import sys
13
import tempfile
14
15
import pytest
16
17
from sopel.db import SopelDB
18
from sopel.test_tools import MockConfig
19
from sopel.tools import Identifier
20
21
db_filename = tempfile.mkstemp()[1]
22
if sys.version_info.major >= 3:
23
    unicode = str
24
    basestring = str
25
    iteritems = dict.items
26
    itervalues = dict.values
27
    iterkeys = dict.keys
28
else:
29
    iteritems = dict.iteritems
30
    itervalues = dict.itervalues
31
    iterkeys = dict.iterkeys
32
33
34
@pytest.fixture
35
def db():
36
    config = MockConfig()
37
    config.core.db_filename = db_filename
38
    db = SopelDB(config)
39
    # TODO add tests to ensure db creation works properly, too.
40
    return db
41
42
43
def teardown_function(function):
44
    os.remove(db_filename)
45
46
47
def test_get_nick_id(db):
48
    conn = sqlite3.connect(db_filename)
49
    tests = [
50
        [None, 'embolalia', Identifier('Embolalia')],
51
        # Ensures case conversion is handled properly
52
        [None, '[][]', Identifier('[]{}')],
53
        # Unicode, just in case
54
        [None, 'embölaliå', Identifier('EmbölaliÅ')],
55
    ]
56
57
    for test in tests:
58
        test[0] = db.get_nick_id(test[2])
59
        nick_id, slug, nick = test
60
        with conn:
61
            cursor = conn.cursor()
62
            registered = cursor.execute(
63
                'SELECT nick_id, slug, canonical FROM nicknames WHERE canonical IS ?', [nick]
64
            ).fetchall()
65
            assert len(registered) == 1
66
            assert registered[0][1] == slug and registered[0][2] == nick
67
68
    # Check that each nick ended up with a different id
69
    assert len(set(test[0] for test in tests)) == len(tests)
70
71
    # Check that the retrieval actually is idempotent
72
    for test in tests:
73
        nick_id = test[0]
74
        new_id = db.get_nick_id(test[2])
75
        assert nick_id == new_id
76
77
    # Even if the case is different
78
    for test in tests:
79
        nick_id = test[0]
80
        new_id = db.get_nick_id(Identifier(test[2].upper()))
81
        assert nick_id == new_id
82
83
84
def test_alias_nick(db):
85
    nick = 'Embolalia'
86
    aliases = ['EmbölaliÅ', 'Embo`work', 'Embo']
87
88
    nick_id = db.get_nick_id(nick)
89
    for alias in aliases:
90
        db.alias_nick(nick, alias)
91
92
    for alias in aliases:
93
        assert db.get_nick_id(alias) == nick_id
94
95
    db.alias_nick('both', 'arenew')  # Shouldn't fail.
96
97
    with pytest.raises(ValueError):
98
        db.alias_nick('Eve', nick)
99
100
    with pytest.raises(ValueError):
101
        db.alias_nick(nick, nick)
102
103
104
def test_set_nick_value(db):
105
    conn = sqlite3.connect(db_filename)
106
    cursor = conn.cursor()
107
    nick = 'Embolalia'
108
    nick_id = db.get_nick_id(nick)
109
    data = {
110
        'key': 'value',
111
        'number_key': 1234,
112
        'unicode': 'EmbölaliÅ',
113
    }
114
115
    def check():
116
        for key, value in iteritems(data):
117
            db.set_nick_value(nick, key, value)
118
119
        for key, value in iteritems(data):
120
            found_value = cursor.execute(
121
                'SELECT value FROM nick_values WHERE nick_id = ? AND key = ?',
122
                [nick_id, key]
123
            ).fetchone()[0]
124
            assert json.loads(unicode(found_value)) == value
125
    check()
126
127
    # Test updates
128
    data['number_key'] = 'not a number anymore!'
129
    data['unicode'] = 'This is different toö!'
130
    check()
131
132
133
def test_get_nick_value(db):
134
    conn = sqlite3.connect(db_filename)
135
    cursor = conn.cursor()
136
    nick = 'Embolalia'
137
    nick_id = db.get_nick_id(nick)
138
    data = {
139
        'key': 'value',
140
        'number_key': 1234,
141
        'unicode': 'EmbölaliÅ',
142
    }
143
144
    for key, value in iteritems(data):
145
        cursor.execute('INSERT INTO nick_values VALUES (?, ?, ?)',
146
                       [nick_id, key, json.dumps(value, ensure_ascii=False)])
147
    conn.commit()
148
149
    for key, value in iteritems(data):
150
        found_value = db.get_nick_value(nick, key)
151
        assert found_value == value
152
153
154
def test_delete_nick_value(db):
155
    nick = 'Embolalia'
156
    db.set_nick_value(nick, 'wasd', 'uldr')
157
    assert db.get_nick_value(nick, 'wasd') == 'uldr'
158
    db.delete_nick_value(nick, 'wasd')
159
    assert db.get_nick_value(nick, 'wasd') is None
160
161
162
def test_unalias_nick(db):
163
    conn = sqlite3.connect(db_filename)
164
    nick = 'Embolalia'
165
    nick_id = 42
166
    conn.execute('INSERT INTO nicknames VALUES (?, ?, ?)',
167
                 [nick_id, Identifier(nick).lower(), nick])
168
    aliases = ['EmbölaliÅ', 'Embo`work', 'Embo']
169
    for alias in aliases:
170
        conn.execute('INSERT INTO nicknames VALUES (?, ?, ?)',
171
                     [nick_id, Identifier(alias).lower(), alias])
172
    conn.commit()
173
174
    for alias in aliases:
175
        db.unalias_nick(alias)
176
177
    for alias in aliases:
178
        found = conn.execute(
179
            'SELECT * FROM nicknames WHERE nick_id = ?',
180
            [nick_id]).fetchall()
181
        assert len(found) == 1
182
183
184
def test_delete_nick_group(db):
185
    conn = sqlite3.connect(db_filename)
186
    aliases = ['Embolalia', 'Embo']
187
    nick_id = 42
188
    for alias in aliases:
189
        conn.execute('INSERT INTO nicknames VALUES (?, ?, ?)',
190
                     [nick_id, Identifier(alias).lower(), alias])
191
    conn.commit()
192
193
    db.set_nick_value(aliases[0], 'foo', 'bar')
194
    db.set_nick_value(aliases[1], 'spam', 'eggs')
195
196
    db.delete_nick_group(aliases[0])
197
198
    # Nothing else has created values, so we know the tables are empty
199
    nicks = conn.execute('SELECT * FROM nicknames').fetchall()
200
    assert len(nicks) == 0
201
    data = conn.execute('SELECT * FROM nick_values').fetchone()
202
    assert data is None
203
204
205
def test_merge_nick_groups(db):
206
    conn = sqlite3.connect(db_filename)
207
    aliases = ['Embolalia', 'Embo']
208
    for nick_id, alias in enumerate(aliases):
209
        conn.execute('INSERT INTO nicknames VALUES (?, ?, ?)',
210
                     [nick_id, Identifier(alias).lower(), alias])
211
    conn.commit()
212
213
    finals = (('foo', 'bar'), ('bar', 'blue'), ('spam', 'eggs'))
214
215
    db.set_nick_value(aliases[0], finals[0][0], finals[0][1])
216
    db.set_nick_value(aliases[0], finals[1][0], finals[1][1])
217
    db.set_nick_value(aliases[1], 'foo', 'baz')
218
    db.set_nick_value(aliases[1], finals[2][0], finals[2][1])
219
220
    db.merge_nick_groups(aliases[0], aliases[1])
221
222
    nick_ids = conn.execute('SELECT nick_id FROM nicknames')
223
    nick_id = nick_ids.fetchone()[0]
224
    alias_id = nick_ids.fetchone()[0]
225
    assert nick_id == alias_id
226
227
    for key, value in finals:
228
        found = conn.execute(
229
            'SELECT value FROM nick_values WHERE nick_id = ? AND key = ?',
230
            [nick_id, key]).fetchone()[0]
231
        assert json.loads(unicode(found)) == value
232
233
234
def test_set_channel_value(db):
235
    conn = sqlite3.connect(db_filename)
236
    db.set_channel_value('#asdf', 'qwer', 'zxcv')
237
    result = conn.execute(
238
        'SELECT value FROM channel_values WHERE channel = ? and key = ?',
239
        ['#asdf', 'qwer']).fetchone()[0]
240
    assert result == '"zxcv"'
241
242
243
def test_delete_channel_value(db):
244
    db.set_channel_value('#asdf', 'wasd', 'uldr')
245
    assert db.get_channel_value('#asdf', 'wasd') == 'uldr'
246
    db.delete_channel_value('#asdf', 'wasd')
247
    assert db.get_channel_value('#asdf', 'wasd') is None
248
249
250
def test_get_channel_value(db):
251
    conn = sqlite3.connect(db_filename)
252
    conn.execute("INSERT INTO channel_values VALUES ('#asdf', 'qwer', '\"zxcv\"')")
253
    conn.commit()
254
    result = db.get_channel_value('#asdf', 'qwer')
255
    assert result == 'zxcv'
256
257
258
def test_get_nick_or_channel_value(db):
259
    db.set_nick_value('asdf', 'qwer', 'poiu')
260
    db.set_channel_value('#asdf', 'qwer', '/.,m')
261
    assert db.get_nick_or_channel_value('asdf', 'qwer') == 'poiu'
262
    assert db.get_nick_or_channel_value('#asdf', 'qwer') == '/.,m'
263
264
265
def test_get_preferred_value(db):
266
    db.set_nick_value('asdf', 'qwer', 'poiu')
267
    db.set_channel_value('#asdf', 'qwer', '/.,m')
268
    db.set_channel_value('#asdf', 'lkjh', '1234')
269
    names = ['asdf', '#asdf']
270
    assert db.get_preferred_value(names, 'qwer') == 'poiu'
271
    assert db.get_preferred_value(names, 'lkjh') == '1234'
272
273
274
def test_set_plugin_value(db):
275
    conn = sqlite3.connect(db_filename)
276
    db.set_plugin_value('plugname', 'qwer', 'zxcv')
277
    result = conn.execute(
278
        'SELECT value FROM plugin_values WHERE plugin = ? and key = ?',
279
        ['plugname', 'qwer']).fetchone()[0]
280
    assert result == '"zxcv"'
281
282
283
def test_get_plugin_value(db):
284
    conn = sqlite3.connect(db_filename)
285
    conn.execute("INSERT INTO plugin_values VALUES ('plugname', 'qwer', '\"zxcv\"')")
286
    conn.commit()
287
    result = db.get_plugin_value('plugname', 'qwer')
288
    assert result == 'zxcv'
289
290
291
def test_delete_plugin_value(db):
292
    db.set_plugin_value('plugin', 'wasd', 'uldr')
293
    assert db.get_plugin_value('plugin', 'wasd') == 'uldr'
294
    db.delete_plugin_value('plugin', 'wasd')
295
    assert db.get_plugin_value('plugin', 'wasd') is None
296