1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
import os |
3
|
|
|
import unittest |
4
|
|
|
|
5
|
|
|
from sqlalchemy import func, engine_from_config |
6
|
|
|
from sqlalchemy.orm import sessionmaker |
7
|
|
|
|
8
|
|
|
from oe_utils.data.data_managers import AtomFeedManager, FeedArchiveNotFound |
9
|
|
|
try: |
10
|
|
|
from initfeedgen import fill_db_feeds, Feed, FeedEntry |
11
|
|
|
except: |
12
|
|
|
from tests.initfeedgen import fill_db_feeds, Feed, FeedEntry |
13
|
|
|
try: |
14
|
|
|
from unittest.mock import Mock, patch, MagicMock, call, PropertyMock |
15
|
|
|
except ImportError: |
16
|
|
|
from mock import Mock, patch, MagicMock, call, PropertyMock # pragma: no cover |
17
|
|
|
import tempfile |
18
|
|
|
try: |
19
|
|
|
import configparser |
20
|
|
|
except ImportError: |
21
|
|
|
import ConfigParser as configparser |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
here = os.path.dirname(os.path.abspath(__file__)) |
25
|
|
|
settings = {"sqlalchemy.url": "sqlite:///{}/test.sqlite".format(here)} |
26
|
|
|
engine = engine_from_config(settings, prefix='sqlalchemy.') |
27
|
|
|
here = os.path.dirname(os.path.abspath(__file__)) |
28
|
|
|
fixture_directory = tempfile.gettempdir() |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
def setUpModule(): |
32
|
|
|
fill_db_feeds(engine) |
33
|
|
|
|
34
|
|
|
|
35
|
|
|
class TestAtomFeedManager(unittest.TestCase): |
36
|
|
|
def setUp(self): |
37
|
|
|
self.session = sessionmaker(bind=engine)() |
38
|
|
|
self.atom_feed_manager = AtomFeedManager(self.session, fixture_directory, Feed, FeedEntry) |
39
|
|
|
self.files_to_clean = [] |
40
|
|
|
|
41
|
|
|
def tearDown(self): |
42
|
|
|
for file_to_clean in self.files_to_clean: |
43
|
|
|
if os.path.isfile(file_to_clean): |
44
|
|
|
os.remove(file_to_clean) |
45
|
|
|
self.session.rollback() |
46
|
|
|
self.session.close() |
47
|
|
|
|
48
|
|
|
def test_current_feed(self): |
49
|
|
|
current_feed_id = self.session.query(func.max(Feed.id)).scalar() |
50
|
|
|
self.assertEqual(current_feed_id, self.atom_feed_manager.current_feed.id) |
51
|
|
|
|
52
|
|
|
def test_get_from_archive(self): |
53
|
|
|
feed_id = self.atom_feed_manager.current_feed.id |
54
|
|
|
archive_content = 'some xml string' |
55
|
|
|
feed_path = fixture_directory + '/' + str(feed_id) + '.xml' |
56
|
|
|
with open(feed_path, 'w') as rec_file: |
57
|
|
|
rec_file.write(archive_content) |
58
|
|
|
self.files_to_clean.append(feed_path) |
59
|
|
|
response = self.atom_feed_manager.get_from_archive(feed_id) |
60
|
|
|
self.assertEqual(archive_content, response) |
61
|
|
|
|
62
|
|
|
def test_fail_to_not_existing_from_archive(self): |
63
|
|
|
feed_id = '999' |
64
|
|
|
with self.assertRaises(FeedArchiveNotFound): |
65
|
|
|
self.atom_feed_manager.get_from_archive(feed_id) |
66
|
|
|
|