test_pool.test_acquire_release()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 11
rs 9.85
c 0
b 0
f 0
cc 1
nop 1
1
# Copyright 2016 David M. Brown
2
#
3
# This file is part of Goblin.
4
#
5
# Goblin is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as published by
7
# the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# Goblin is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with Goblin.  If not, see <http://www.gnu.org/licenses/>.
17
18
import asyncio
19
import pytest
20
21
22
@pytest.mark.asyncio
23
async def test_pool_init(connection_pool):
24
    await connection_pool.init_pool()
25
    assert len(connection_pool._available) == 1
26
    await connection_pool.close()
27
28
29
@pytest.mark.asyncio
30
async def test_acquire_release(connection_pool):
31
    conn = await connection_pool.acquire()
32
    assert not len(connection_pool._available)
33
    assert len(connection_pool._acquired) == 1
34
    assert conn.times_acquired == 1
35
    connection_pool.release(conn)
36
    assert len(connection_pool._available) == 1
37
    assert not len(connection_pool._acquired)
38
    assert not conn.times_acquired
39
    await connection_pool.close()
40
41
42
@pytest.mark.asyncio
43
async def test_acquire_multiple(connection_pool):
44
    conn1 = await connection_pool.acquire()
45
    conn2 = await connection_pool.acquire()
46
    assert not conn1 is conn2
47
    assert len(connection_pool._acquired) == 2
48
    await connection_pool.close()
49
50
51
@pytest.mark.asyncio
52
async def test_share(connection_pool):
53
    connection_pool._max_conns = 1
54
    conn1 = await connection_pool.acquire()
55
    conn2 = await connection_pool.acquire()
56
    assert conn1 is conn2
57
    assert conn1.times_acquired == 2
58
    await connection_pool.close()
59
60
61
@pytest.mark.asyncio
62
async def test_acquire_multiple_and_share(connection_pool):
63
    connection_pool._max_conns = 2
64
    connection_pool._max_times_acquired = 2
65
    conn1 = await connection_pool.acquire()
66
    conn2 = await connection_pool.acquire()
67
    assert not conn1 is conn2
68
    conn3 = await connection_pool.acquire()
69
    conn4 = await connection_pool.acquire()
70
    assert not conn3 is conn4
71
    assert conn3 is conn1
72
    assert conn4 is conn2
73
    await connection_pool.close()
74
75
76
@pytest.mark.asyncio
77
async def test_max_acquired(connection_pool):
78
    connection_pool._max_conns = 2
79
    connection_pool._max_times_acquired = 2
80
    conn1 = await connection_pool.acquire()
81
    conn2 = await connection_pool.acquire()
82
    conn3 = await connection_pool.acquire()
83
    conn4 = await connection_pool.acquire()
84
    with pytest.raises(asyncio.TimeoutError):
85
        await asyncio.wait_for(connection_pool.acquire(), timeout=0.1)
86
    await connection_pool.close()
87
88
89
@pytest.mark.asyncio
90
async def test_release_notify(connection_pool):
91
    connection_pool._max_conns = 2
92
    connection_pool._max_times_acquired = 2
93
    conn1 = await connection_pool.acquire()
94
    conn2 = await connection_pool.acquire()
95
    conn3 = await connection_pool.acquire()
96
    conn4 = await connection_pool.acquire()
97
98
    async def release(conn):
99
        conn.release()
100
101
    results = await asyncio.gather(
102
        *[connection_pool.acquire(), release(conn4)])
103
    conn4 = results[0]
104
    assert conn4 is conn2
105
    await connection_pool.close()
106