Completed
Push — master ( a78279...04d974 )
by
unknown
10s
created

Indexer.remove_session()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 2
c 1
b 0
f 1
dl 0
loc 12
rs 9.4285
1
# -*- coding: utf-8 -*-
2
import logging
3
from rq import Connection, Queue
4
from sqlalchemy import event
5
from sqlalchemy.orm import object_session
6
from zope.interface import Interface
7
8
log = logging.getLogger()
9
10
11
class IIndexer(Interface):
12
    pass
13
14
15
class Indexer(object):
16
    """
17
    Handles the indexing of the DB to ES.
18
    This object contains a number listeners.
19
    First a list of the changes to be made will be kept.
20
    In case of a commit, these changes will also be executed.
21
    """
22
23
    def __init__(self, settings, index_operation, index_operation_name, cls):
24
        self.sessions = set()
25
        self.index_operation = index_operation
26
        self.index_operation_name = index_operation_name
27
        self._register_event_listeners(cls)
28
        self.settings = settings
29
30
    def _register_event_listeners(self, cls):
31
        """
32
        :param cls: DB class
33
        """
34
        event.listen(cls, 'after_insert', self._new_listener)
35
        event.listen(cls, 'after_update', self._update_listener)
36
        event.listen(cls, 'after_delete', self._delete_listener)
37
38
    @staticmethod
39
    def _update_listener(mapper, connection, target):
40
        _add_to_session_list(target, operation='UPDATE')
41
42
    @staticmethod
43
    def _new_listener(mapper, connection, target):
44
        _add_to_session_list(target, operation='ADD')
45
46
    @staticmethod
47
    def _delete_listener(mapper, connection, target):
48
        _add_to_session_list(target, operation='REMOVE')
49
50
    def register_session(self, session, redis=None):
51
        session.redis = redis
52
        session.index_new = set()
53
        session.index_dirty = set()
54
        session.index_deleted = set()
55
        self.sessions.add(session)
56
        event.listen(session, 'after_commit', self.after_commit_listener)
57
        event.listen(session, 'after_rollback', self.after_rollback_listener)
58
59
    def after_commit_listener(self, session):
60
        """
61
        Processing the changes.
62
        All new or changed items are now indexed. All deleted items are now removed from the index.
63
        """
64
        log.info('Commiting indexing orders for session %s' % session)
65
        try:
66
            if session.redis is not None:
67
                self._queue_job(session.redis,
68
                                self.settings['redis.queue_name'],
69
                                self.index_operation_name,
70
                                session.index_new, session.index_dirty, session.index_deleted,
71
                                self.settings)
72
            else:
73
                log.info('Redis not found, falling back to indexing synchronously without redis')
74
                self.index_operation(session.index_new, session.index_dirty, session.index_deleted, self.settings)
75
            session.index_new.clear()
76
            session.index_dirty.clear()
77
            session.index_deleted.clear()
78
        except AttributeError:
79
            log.warning('Trying to commit indexing orders, but indexing sets are not present.')
80
81
    @staticmethod
82
    def _queue_job(redis, queue_name, delegate, *args):
83
        """
84
            creates a new job on the queue
85
            :param redis: redis
86
            :param delegate: method to be executed by the queue. Use fully qualified method name as String.
87
            :param args:  arguments of the method
88
            :return: job_id
89
            """
90
        log.info('Queuing job...')
91
        with Connection(redis):
92
            q = Queue(queue_name)
93
            job = q.enqueue(delegate, *args)
94
            return job.id
95
96
    @staticmethod
97
    def after_rollback_listener(session):
98
        """
99
        Rollback of the transaction, undo the indexes.
100
        If our transaction is terminated, we will reset the
101
        indexing assignments.
102
        """
103
        log.info('Removing indexing orders.')
104
        try:
105
            session.index_new.clear()
106
            session.index_dirty.clear()
107
            session.index_deleted.clear()
108
        except AttributeError:
109
            log.warning('Trying to remove indexing orders, but indexing sets are not present.')
110
111
    def remove_session(self, session):
112
        """
113
        :param sqlalchemy.session.Session session: Database session to remove
114
        """
115
        try:
116
            del session.redis
117
            del session.index_new
118
            del session.index_dirty
119
            del session.index_deleted
120
        except AttributeError:
121
            log.warning('Removing a session that has no indexing sets.')
122
        self.sessions.remove(session)
123
124
125
def _add_to_session_list(target, operation):
126
    session = object_session(target)
127
    try:
128
        if operation == 'ADD':
129
            session.index_new.add(target.id)
130
        elif operation == 'UPDATE':
131
            session.index_dirty.add(target.id)
132
        elif operation == 'REMOVE':
133
            session.index_deleted.add(target.id)
134
        log.info(operation + ': ' + str(target) + ' {0} from index'.format(target.id))
135
    except AttributeError:
136
        log.warning(
137
            'Trying to register a ' + str(target) +
138
            ' for indexing ' + operation + ', but indexing sets are not present.')