Passed
Push — develop ( a47354...ce4c01 )
by Bastien
01:39
created

FunctionalTest.connect_database()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
import logging
3
import typing
4
import unittest
5
6
import plaster
7
import requests
8
import transaction
9
from depot.manager import DepotManager
10
from pyramid import testing
11
from sqlalchemy.exc import IntegrityError
12
13
from tracim_backend.lib.core.content import ContentApi
14
from tracim_backend.lib.core.workspace import WorkspaceApi
15
from tracim_backend.models import get_engine
16
from tracim_backend.models import DeclarativeBase
17
from tracim_backend.models import get_session_factory
18
from tracim_backend.models import get_tm_session
19
from tracim_backend.app_models.contents import content_type_list
20
from tracim_backend.models.data import Workspace
21
from tracim_backend.models.data import ContentRevisionRO
22
from tracim_backend.models.data import Content
23
from tracim_backend.lib.utils.logger import logger
24
from tracim_backend.fixtures import FixturesLoader
25
from tracim_backend.fixtures.users_and_groups import Base as BaseFixture
26
from tracim_backend.config import CFG
27
from tracim_backend.extensions import hapic
28
from tracim_backend import web
29
from webtest import TestApp
30
from io import BytesIO
31
from PIL import Image
32
33
34
def eq_(a, b, msg=None) -> None:
35
    # TODO - G.M - 05-04-2018 - Remove this when all old nose code is removed
36
    assert a == b, msg or "%r != %r" % (a, b)
37
38
# TODO - G.M - 2018-06-179 - Refactor slug change function
39
#  as a kind of pytest fixture ?
40
41
42
def set_html_document_slug_to_legacy(session_factory) -> None:
43
    """
44
    Simple function to help some functional test. This modify "html-documents"
45
    type content in database to legacy "page" slug.
46
    :param session_factory: session factory of the test
47
    :return: Nothing.
48
    """
49
    dbsession = get_tm_session(
50
        session_factory,
51
        transaction.manager
52
    )
53
    content_query = dbsession.query(ContentRevisionRO).filter(ContentRevisionRO.type == 'page').filter(ContentRevisionRO.content_id == 6)  # nopep8
54
    assert content_query.count() == 0
55
    html_documents_query = dbsession.query(ContentRevisionRO).filter(ContentRevisionRO.type == 'html-document')  # nopep8
56
    html_documents_query.update({ContentRevisionRO.type: 'page'})
57
    transaction.commit()
58
    assert content_query.count() > 0
59
60
61
def create_1000px_png_test_image() -> None:
62
    file = BytesIO()
63
    image = Image.new('RGBA', size=(1000, 1000), color=(0, 0, 0))
64
    image.save(file, 'png')
65
    file.name = 'test_image.png'
66
    file.seek(0)
67
    return file
68
69
70
class FunctionalTest(unittest.TestCase):
71
72
    fixtures = [BaseFixture]
73
    config_uri = 'tests_configs.ini'
74
    config_section = 'functional_test'
75
76
    def _set_logger(self) -> None:
77
        """
78
        Set all logger to a high level to avoid getting too much noise for tests
79
        """
80
        logger._logger.setLevel('ERROR')
81
        logging.getLogger().setLevel('ERROR')
82
        logging.getLogger('sqlalchemy').setLevel('ERROR')
83
        logging.getLogger('txn').setLevel('ERROR')
84
        logging.getLogger('cliff').setLevel('ERROR')
85
        logging.getLogger('_jb_pytest_runner').setLevel('ERROR')
86
87
    def setUp(self) -> None:
88
        self._set_logger()
89
        DepotManager._clear()
90
        settings = plaster.get_settings(
91
            self.config_uri,
92
            self.config_section
93
        )
94
        self.settings = self.override_settings(settings)
95
        hapic.reset_context()
96
        self.connect_database(create_tables=True)
97
        self.app_config = CFG(self.settings)
98
        self.app_config.configure_filedepot()
99
        self.init_database(self.settings)
100
        DepotManager._clear()
101
        self.run_app()
102
103
    def connect_database(self, create_tables: bool = False) -> None:
104
        self.engine = get_engine(self.settings)
105
        if create_tables:
106
            DeclarativeBase.metadata.create_all(self.engine)
107
        self.session_factory = get_session_factory(self.engine)
108
        self.session = get_tm_session(self.session_factory, transaction.manager)
109
110
    def override_settings(self, settings: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]:  # nopep8
111
        """
112
        Allow to override some setting by code.
113
        by default : do nothing.
114
        """
115
        return settings
116
117
    def run_app(self) -> None:
118
        app = web({}, **self.settings)
119
        self.testapp = TestApp(app)
120
121
    def init_database(self, settings: typing.Dict[str, typing.Any]):
122
        with transaction.manager:
123
            try:
124
                fixtures_loader = FixturesLoader(self.session, self.app_config)
125
                fixtures_loader.loads(self.fixtures)
126
                transaction.commit()
127
                logger.info(self,"Database initialized.")
128
            except IntegrityError:
129
                logger.error(self,'Warning, there was a problem when adding default data'  # nopep8
130
                               ', it may have already been added:')
131
                import traceback
132
                logger.error(self, traceback.format_exc())
133
                transaction.abort()
134
                logger.error(self, 'Database initialization failed')
135
136
    def disconnect_database(self, remove_tables: bool = False) -> None:
137
        self.session.rollback()
138
        transaction.abort()
139
        self.session.close_all()
140
        self.engine.dispose()
141
        if remove_tables:
142
            DeclarativeBase.metadata.drop_all(self.engine)
143
        DepotManager._clear()
144
145
    def tearDown(self) -> None:
146
        logger.debug(self, 'TearDown Test...')
147
        self.disconnect_database(remove_tables=True)
148
        testing.tearDown()
149
150
151
class FunctionalTestEmptyDB(FunctionalTest):
152
    fixtures = []
153
154
155
class FunctionalTestNoDB(FunctionalTest):
156
    """
157
    Special test case when sqlalchemy.url is not correct
158
    """
159
    config_section = 'functional_test_no_db'
160
161
    def override_settings(self, settings: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]:  # nopep8
162
        """
163
        Disable sqlalchemy.url with wrong value
164
        :return new settings dict
165
        """
166
        settings['sqlalchemy.url'] = 'sqlite://'
167
        return settings
168
169
    def init_database(self, settings: typing.Dict[str, typing.Any]) -> None:
170
        self.engine = get_engine(settings)
171
172
173
class CommandFunctionalTest(FunctionalTest):
174
175
    def run_app(self) -> None:
176
        """ Disable run pyramid app for command functional test"""
177
        pass
178
179
180
class BaseTest(unittest.TestCase):
181
    """
182
    Pyramid default test.
183
    """
184
    fixtures = []
185
    config_uri = 'tests_configs.ini'
186
    config_section = 'base_test'
187
188
    def _set_logger(self) -> None:
189
        """
190
        Set all logger to a high level to avoid getting too much noise for tests
191
        """
192
        logger._logger.setLevel('ERROR')
193
        logging.getLogger().setLevel('ERROR')
194
        logging.getLogger('sqlalchemy').setLevel('ERROR')
195
        logging.getLogger('txn').setLevel('ERROR')
196
        logging.getLogger('cliff').setLevel('ERROR')
197
        logging.getLogger('_jb_pytest_runner').setLevel('ERROR')
198
199
    def init_database(self) -> None:
200
        session = get_tm_session(self.session_factory, transaction.manager)
201
        with transaction.manager:
202
            try:
203
                DeclarativeBase.metadata.create_all(self.engine)
204
                fixtures_loader = FixturesLoader(session, self.app_config)
205
                fixtures_loader.loads(self.fixtures)
206
                transaction.commit()
207
                logger.info(self,"Database initialized.")
208
            except IntegrityError:
209
                logger.error(self,'Warning, there was a problem when adding default data'  # nopep8
210
                               ', it may have already been added:')
211
                import traceback
212
                logger.error(self, traceback.format_exc())
213
                transaction.abort()
214
                logger.error(self, 'Database initialization failed')
215
216
    def setUp(self) -> None:
217
        self._set_logger()
218
        logger.debug(self, 'Setup Test...')
219
        self.settings = plaster.get_settings(
220
            self.config_uri,
221
            self.config_section
222
        )
223
        self.config = testing.setUp(settings = self.settings)
224
        self.config.include('tracim_backend.models')
225
        DepotManager._clear()
226
        DepotManager.configure(
227
            'test', {'depot.backend': 'depot.io.memory.MemoryFileStorage'}
228
        )
229
        settings = self.config.get_settings()
230
        self.app_config = CFG(settings)
231
        from tracim_backend.models import (
232
            get_engine,
233
            get_session_factory,
234
            get_tm_session,
235
        )
236
237
        self.engine = get_engine(settings)
238
        self.session_factory = get_session_factory(self.engine)
239
        self.init_database()
240
        self.session = get_tm_session(self.session_factory, transaction.manager)
241
242
    def tearDown(self) -> None:
243
        logger.debug(self, 'TearDown Test...')
244
        from tracim_backend.models.meta import DeclarativeBase
245
246
        self.session.rollback()
247
        self.session.close_all()
248
        transaction.abort()
249
        DeclarativeBase.metadata.drop_all(self.engine)
250
        self.engine.dispose()
251
        DepotManager._clear()
252
        testing.tearDown()
253
254
255
class StandardTest(BaseTest):
256
    """
257
    BaseTest with default fixtures
258
    """
259
    fixtures = [BaseFixture]
260
261
262
class DefaultTest(StandardTest):
263
264
    def _create_workspace_and_test(self, name, user) -> Workspace:
265
        """
266
        All extra parameters (*args, **kwargs) are for Workspace init
267
        :return: Created workspace instance
268
        """
269
        WorkspaceApi(
270
            current_user=user,
271
            session=self.session,
272
            config=self.app_config,
273
        ).create_workspace(name, save_now=True)
274
275
        eq_(
276
            1,
277
            self.session.query(Workspace).filter(
278
                Workspace.label == name
279
            ).count()
280
        )
281
        return self.session.query(Workspace).filter(
282
            Workspace.label == name
283
        ).one()
284
285
    def _create_content_and_test(
286
            self,
287
            name,
288
            workspace,
289
            *args,
290
            **kwargs
291
    ) -> Content:
292
        """
293
        All extra parameters (*args, **kwargs) are for Content init
294
        :return: Created Content instance
295
        """
296
        content = Content(*args, **kwargs)
297
        content.label = name
298
        content.workspace = workspace
299
        self.session.add(content)
300
        self.session.flush()
301
302
        content_api = ContentApi(
303
            current_user=None,
304
            session=self.session,
305
            config=self.app_config,
306
        )
307
        eq_(
308
            1,
309
            content_api.get_canonical_query().filter(
310
                Content.label == name
311
            ).count()
312
        )
313
        return content_api.get_canonical_query().filter(
314
            Content.label == name
315
        ).one()
316
317
    def _create_thread_and_test(self,
318
                                user,
319
                                workspace_name='workspace_1',
320
                                folder_name='folder_1',
321
                                thread_name='thread_1') -> Content:
322
        """
323
        :return: Thread
324
        """
325
        workspace = self._create_workspace_and_test(workspace_name, user)
326
        folder = self._create_content_and_test(
327
            folder_name, workspace,
328
            type=content_type_list.Folder.slug,
329
            owner=user
330
        )
331
        thread = self._create_content_and_test(
332
            thread_name,
333
            workspace,
334
            type=content_type_list.Thread.slug,
335
            parent=folder,
336
            owner=user
337
        )
338
        return thread
339
340
341
class MailHogTest(DefaultTest):
342
    """
343
    Theses test need a working mailhog
344
    """
345
346
    config_section = 'mail_test'
347
348
    def tearDown(self) -> None:
349
        super().tearDown()
350
        logger.debug(self, 'Cleanup MailHog list...')
351
        requests.delete('http://127.0.0.1:8025/api/v1/messages')
352