Passed
Pull Request — develop (#97)
by inkhey
01:36
created

DefaultTest._create_content_and_test()   A

Complexity

Conditions 1

Size

Total Lines 31
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 31
rs 9.328
c 0
b 0
f 0
cc 1
nop 5
1
# -*- coding: utf-8 -*-
2
import logging
3
import unittest
4
5
import plaster
6
import requests
7
import transaction
8
from depot.manager import DepotManager
9
from pyramid import testing
10
from sqlalchemy.exc import IntegrityError
11
12
from tracim_backend.lib.core.content import ContentApi
13
from tracim_backend.lib.core.workspace import WorkspaceApi
14
from tracim_backend.models import get_engine
15
from tracim_backend.models import DeclarativeBase
16
from tracim_backend.models import get_session_factory
17
from tracim_backend.models import get_tm_session
18
from tracim_backend.app_models.contents import content_type_list
19
from tracim_backend.models.data import Workspace
20
from tracim_backend.models.data import ContentRevisionRO
21
from tracim_backend.models.data import Content
22
from tracim_backend.lib.utils.logger import logger
23
from tracim_backend.fixtures import FixturesLoader
24
from tracim_backend.fixtures.users_and_groups import Base as BaseFixture
25
from tracim_backend.config import CFG
26
from tracim_backend.extensions import hapic
27
from tracim_backend import web
28
from webtest import TestApp
29
from io import BytesIO
30
from PIL import Image
31
32
33
def eq_(a, b, msg=None):
34
    # TODO - G.M - 05-04-2018 - Remove this when all old nose code is removed
35
    assert a == b, msg or "%r != %r" % (a, b)
36
37
# TODO - G.M - 2018-06-179 - Refactor slug change function
38
#  as a kind of pytest fixture ?
39
40
41
def set_html_document_slug_to_legacy(session_factory) -> None:
42
    """
43
    Simple function to help some functional test. This modify "html-documents"
44
    type content in database to legacy "page" slug.
45
    :param session_factory: session factory of the test
46
    :return: Nothing.
47
    """
48
    dbsession = get_tm_session(
49
        session_factory,
50
        transaction.manager
51
    )
52
    content_query = dbsession.query(ContentRevisionRO).filter(ContentRevisionRO.type == 'page').filter(ContentRevisionRO.content_id == 6)  # nopep8
53
    assert content_query.count() == 0
54
    html_documents_query = dbsession.query(ContentRevisionRO).filter(ContentRevisionRO.type == 'html-document')  # nopep8
55
    html_documents_query.update({ContentRevisionRO.type: 'page'})
56
    transaction.commit()
57
    assert content_query.count() > 0
58
59
60
def create_1000px_png_test_image():
61
    file = BytesIO()
62
    image = Image.new('RGBA', size=(1000, 1000), color=(0, 0, 0))
63
    image.save(file, 'png')
64
    file.name = 'test_image.png'
65
    file.seek(0)
66
    return file
67
68
69
class FunctionalTest(unittest.TestCase):
70
71
    fixtures = [BaseFixture]
72
    config_uri = 'tests_configs.ini'
73
    config_section = 'functional_test'
74
75
    def _set_logger(self):
76
        """
77
        Set all logger to a high level to avoid getting too much noise for tests
78
        """
79
        logger._logger.setLevel('ERROR')
80
        logging.getLogger().setLevel('ERROR')
81
        logging.getLogger('sqlalchemy').setLevel('ERROR')
82
        logging.getLogger('txn').setLevel('ERROR')
83
        logging.getLogger('cliff').setLevel('ERROR')
84
        logging.getLogger('_jb_pytest_runner').setLevel('ERROR')
85
86
    def setUp(self):
87
        self._set_logger()
88
        DepotManager._clear()
89
        settings = plaster.get_settings(
90
            self.config_uri,
91
            self.config_section
92
        )
93
        self.settings = self.override_settings(settings)
94
        hapic.reset_context()
95
        self.connect_database(create_tables=True)
96
        self.app_config = CFG(self.settings)
97
        self.app_config.configure_filedepot()
98
        self.init_database(self.settings)
99
        DepotManager._clear()
100
        self.run_app()
101
102
    def connect_database(self, create_tables: bool = False):
103
        self.engine = get_engine(self.settings)
104
        if create_tables:
105
            DeclarativeBase.metadata.create_all(self.engine)
106
        self.session_factory = get_session_factory(self.engine)
107
        self.session = get_tm_session(self.session_factory, transaction.manager)
108
109
110
    def override_settings(self, settings):
111
        return settings
112
113
    def run_app(self):
114
        app = web({}, **self.settings)
115
        self.testapp = TestApp(app)
116
117
    def init_database(self, settings):
118
        with transaction.manager:
119
            try:
120
                fixtures_loader = FixturesLoader(self.session, self.app_config)
121
                fixtures_loader.loads(self.fixtures)
122
                transaction.commit()
123
                logger.info(self,"Database initialized.")
124
            except IntegrityError:
125
                logger.error(self,'Warning, there was a problem when adding default data'  # nopep8
126
                               ', it may have already been added:')
127
                import traceback
128
                logger.error(self, traceback.format_exc())
129
                transaction.abort()
130
                logger.error(self, 'Database initialization failed')
131
132
    def disconnect_database(self, remove_tables: bool = False):
133
        self.session.rollback()
134
        transaction.abort()
135
        self.session.close_all()
136
        self.engine.dispose()
137
        if remove_tables:
138
            DeclarativeBase.metadata.drop_all(self.engine)
139
        DepotManager._clear()
140
141
    def tearDown(self):
142
        logger.debug(self, 'TearDown Test...')
143
        from tracim_backend.models.meta import DeclarativeBase
144
        self.disconnect_database(remove_tables=True)
145
        testing.tearDown()
146
147
148
class FunctionalTestEmptyDB(FunctionalTest):
149
    fixtures = []
150
151
152
class FunctionalTestNoDB(FunctionalTest):
153
    config_section = 'functional_test_no_db'
154
155
    def override_settings(self, settings):
156
        settings['sqlalchemy.url'] = 'sqlite://'
157
        return settings
158
159
    def init_database(self, settings):
160
        self.engine = get_engine(settings)
161
162
163
class CommandFunctionalTest(FunctionalTest):
164
165
    def run_app(self):
166
        pass
167
168
169
class BaseTest(unittest.TestCase):
170
    """
171
    Pyramid default test.
172
    """
173
    fixtures = []
174
    config_uri = 'tests_configs.ini'
175
    config_section = 'base_test'
176
177
    def _set_logger(self):
178
        """
179
        Set all logger to a high level to avoid getting too much noise for tests
180
        """
181
        logger._logger.setLevel('ERROR')
182
        logging.getLogger().setLevel('ERROR')
183
        logging.getLogger('sqlalchemy').setLevel('ERROR')
184
        logging.getLogger('txn').setLevel('ERROR')
185
        logging.getLogger('cliff').setLevel('ERROR')
186
        logging.getLogger('_jb_pytest_runner').setLevel('ERROR')
187
188
    def init_database(self):
189
        session = get_tm_session(self.session_factory, transaction.manager)
190
        with transaction.manager:
191
            try:
192
                DeclarativeBase.metadata.create_all(self.engine)
193
                fixtures_loader = FixturesLoader(session, self.app_config)
194
                fixtures_loader.loads(self.fixtures)
195
                transaction.commit()
196
                logger.info(self,"Database initialized.")
197
            except IntegrityError:
198
                logger.error(self,'Warning, there was a problem when adding default data'  # nopep8
199
                               ', it may have already been added:')
200
                import traceback
201
                logger.error(self, traceback.format_exc())
202
                transaction.abort()
203
                logger.error(self, 'Database initialization failed')
204
205
    def setUp(self):
206
        self._set_logger()
207
        logger.debug(self, 'Setup Test...')
208
        self.settings = plaster.get_settings(
209
            self.config_uri,
210
            self.config_section
211
        )
212
        self.config = testing.setUp(settings = self.settings)
213
        self.config.include('tracim_backend.models')
214
        DepotManager._clear()
215
        DepotManager.configure(
216
            'test', {'depot.backend': 'depot.io.memory.MemoryFileStorage'}
217
        )
218
        settings = self.config.get_settings()
219
        self.app_config = CFG(settings)
220
        from tracim_backend.models import (
221
            get_engine,
222
            get_session_factory,
223
            get_tm_session,
224
        )
225
226
        self.engine = get_engine(settings)
227
        self.session_factory = get_session_factory(self.engine)
228
        self.init_database()
229
        self.session = get_tm_session(self.session_factory, transaction.manager)
230
231
    def tearDown(self):
232
        logger.debug(self, 'TearDown Test...')
233
        from tracim_backend.models.meta import DeclarativeBase
234
235
        self.session.rollback()
236
        self.session.close()
237
        transaction.abort()
238
        DeclarativeBase.metadata.drop_all(self.engine)
239
        self.engine.dispose()
240
        DepotManager._clear()
241
        testing.tearDown()
242
243
244
class StandardTest(BaseTest):
245
    """
246
    BaseTest with default fixtures
247
    """
248
    fixtures = [BaseFixture]
249
250
251
class DefaultTest(StandardTest):
252
253
    def _create_workspace_and_test(self, name, user) -> Workspace:
254
        """
255
        All extra parameters (*args, **kwargs) are for Workspace init
256
        :return: Created workspace instance
257
        """
258
        WorkspaceApi(
259
            current_user=user,
260
            session=self.session,
261
            config=self.app_config,
262
        ).create_workspace(name, save_now=True)
263
264
        eq_(
265
            1,
266
            self.session.query(Workspace).filter(
267
                Workspace.label == name
268
            ).count()
269
        )
270
        return self.session.query(Workspace).filter(
271
            Workspace.label == name
272
        ).one()
273
274
    def _create_content_and_test(
275
            self,
276
            name,
277
            workspace,
278
            *args,
279
            **kwargs
280
    ) -> Content:
281
        """
282
        All extra parameters (*args, **kwargs) are for Content init
283
        :return: Created Content instance
284
        """
285
        content = Content(*args, **kwargs)
286
        content.label = name
287
        content.workspace = workspace
288
        self.session.add(content)
289
        self.session.flush()
290
291
        content_api = ContentApi(
292
            current_user=None,
293
            session=self.session,
294
            config=self.app_config,
295
        )
296
        eq_(
297
            1,
298
            content_api.get_canonical_query().filter(
299
                Content.label == name
300
            ).count()
301
        )
302
        return content_api.get_canonical_query().filter(
303
            Content.label == name
304
        ).one()
305
306
    def _create_thread_and_test(self,
307
                                user,
308
                                workspace_name='workspace_1',
309
                                folder_name='folder_1',
310
                                thread_name='thread_1') -> Content:
311
        """
312
        :return: Thread
313
        """
314
        workspace = self._create_workspace_and_test(workspace_name, user)
315
        folder = self._create_content_and_test(
316
            folder_name, workspace,
317
            type=content_type_list.Folder.slug,
318
            owner=user
319
        )
320
        thread = self._create_content_and_test(
321
            thread_name,
322
            workspace,
323
            type=content_type_list.Thread.slug,
324
            parent=folder,
325
            owner=user
326
        )
327
        return thread
328
329
330
class MailHogTest(DefaultTest):
331
    """
332
    Theses test need a working mailhog
333
    """
334
335
    config_section = 'mail_test'
336
337
    def tearDown(self):
338
        super().tearDown()
339
        logger.debug(self, 'Cleanup MailHog list...')
340
        requests.delete('http://127.0.0.1:8025/api/v1/messages')
341