|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
import os |
|
3
|
|
|
import unittest |
|
4
|
|
|
|
|
5
|
|
|
from sqlalchemy import func, engine_from_config |
|
6
|
|
|
from sqlalchemy.orm import sessionmaker |
|
7
|
|
|
|
|
8
|
|
|
from oe_utils.data.data_managers import AtomFeedManager, FeedArchiveNotFound |
|
9
|
|
|
try: |
|
10
|
|
|
from initfeedgen import fill_db_feeds, Feed, FeedEntry |
|
11
|
|
|
except: |
|
12
|
|
|
from tests.initfeedgen import fill_db_feeds, Feed, FeedEntry |
|
13
|
|
|
try: |
|
14
|
|
|
from unittest.mock import Mock, patch, MagicMock, call, PropertyMock |
|
15
|
|
|
except ImportError: |
|
16
|
|
|
from mock import Mock, patch, MagicMock, call, PropertyMock # pragma: no cover |
|
17
|
|
|
import tempfile |
|
18
|
|
|
try: |
|
19
|
|
|
import configparser |
|
20
|
|
|
except ImportError: |
|
21
|
|
|
import ConfigParser as configparser |
|
22
|
|
|
|
|
23
|
|
|
|
|
24
|
|
|
here = os.path.dirname(os.path.abspath(__file__)) |
|
25
|
|
|
settings = {"sqlalchemy.url": "sqlite:///{}/test.sqlite".format(here)} |
|
26
|
|
|
engine = engine_from_config(settings, prefix='sqlalchemy.') |
|
27
|
|
|
here = os.path.dirname(os.path.abspath(__file__)) |
|
28
|
|
|
fixture_directory = tempfile.gettempdir() |
|
29
|
|
|
|
|
30
|
|
|
|
|
31
|
|
|
def setUpModule(): |
|
32
|
|
|
fill_db_feeds(engine) |
|
33
|
|
|
|
|
34
|
|
|
|
|
35
|
|
|
class TestAtomFeedManager(unittest.TestCase): |
|
36
|
|
|
def setUp(self): |
|
37
|
|
|
self.session = sessionmaker(bind=engine)() |
|
38
|
|
|
self.atom_feed_manager = AtomFeedManager(self.session, fixture_directory, Feed, FeedEntry) |
|
39
|
|
|
self.files_to_clean = [] |
|
40
|
|
|
|
|
41
|
|
|
def tearDown(self): |
|
42
|
|
|
for file_to_clean in self.files_to_clean: |
|
43
|
|
|
if os.path.isfile(file_to_clean): |
|
44
|
|
|
os.remove(file_to_clean) |
|
45
|
|
|
self.session.rollback() |
|
46
|
|
|
self.session.close() |
|
47
|
|
|
|
|
48
|
|
|
def test_current_feed(self): |
|
49
|
|
|
current_feed_id = self.session.query(func.max(Feed.id)).scalar() |
|
50
|
|
|
self.assertEqual(current_feed_id, self.atom_feed_manager.current_feed.id) |
|
51
|
|
|
|
|
52
|
|
|
def test_get_from_archive(self): |
|
53
|
|
|
feed_id = self.atom_feed_manager.current_feed.id |
|
54
|
|
|
archive_content = 'some xml string' |
|
55
|
|
|
feed_path = fixture_directory + '/' + str(feed_id) + '.xml' |
|
56
|
|
|
with open(feed_path, 'w') as rec_file: |
|
57
|
|
|
rec_file.write(archive_content) |
|
58
|
|
|
self.files_to_clean.append(feed_path) |
|
59
|
|
|
response = self.atom_feed_manager.get_from_archive(feed_id) |
|
60
|
|
|
self.assertEqual(archive_content, response) |
|
61
|
|
|
|
|
62
|
|
|
def test_fail_to_not_existing_from_archive(self): |
|
63
|
|
|
feed_id = '999' |
|
64
|
|
|
with self.assertRaises(FeedArchiveNotFound): |
|
65
|
|
|
self.atom_feed_manager.get_from_archive(feed_id) |
|
66
|
|
|
|