IIndexer   A
last analyzed

Complexity

Total Complexity 0

Size/Duplication

Total Lines 2
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 2
rs 10
wmc 0
1
# -*- coding: utf-8 -*-
2
import logging
3
from rq import Connection, Queue
4
from sqlalchemy import event
5
from sqlalchemy.orm import object_session
6
from zope.interface import Interface
7
8
log = logging.getLogger()
9
10
11
class IIndexer(Interface):
12
    pass
13
14
15
class Indexer(object):
16
    """
17
    Handles the indexing of the DB to ES.
18
    This object contains a number listeners.
19
    First a list of the changes to be made will be kept.
20
    In case of a commit, these changes will also be executed.
21
    """
22
23
    def __init__(self, settings, index_operation, index_operation_name, cls):
24
        self.sessions = set()
25
        self.index_operation = index_operation
26
        self.index_operation_name = index_operation_name
27
        self._register_event_listeners(cls)
28
        self.settings = settings
29
        self.cls_name = cls.__name__
30
31
    def _register_event_listeners(self, cls):
32
        """
33
        :param cls: DB class
34
        """
35
        event.listen(cls, 'after_insert', self._new_listener)
36
        event.listen(cls, 'after_update', self._update_listener)
37
        event.listen(cls, 'after_delete', self._delete_listener)
38
39
    @staticmethod
40
    def _update_listener(mapper, connection, target):
41
        _add_to_session_list(target, operation='UPDATE')
42
43
    @staticmethod
44
    def _new_listener(mapper, connection, target):
45
        _add_to_session_list(target, operation='ADD')
46
47
    @staticmethod
48
    def _delete_listener(mapper, connection, target):
49
        _add_to_session_list(target, operation='REMOVE')
50
51
    def register_session(self, session, redis=None):
52
        session.redis = redis
53
        session.index_new = session.index_new if hasattr(session, 'index_new') else {}
54
        session.index_new[self.cls_name] = set()
55
        session.index_dirty = session.index_dirty if hasattr(session, 'index_dirty') else {}
56
        session.index_dirty[self.cls_name] = set()
57
        session.index_deleted = session.index_deleted if hasattr(session, 'index_deleted') else {}
58
        session.index_deleted[self.cls_name] = set()
59
        self.sessions.add(session)
60
        event.listen(session, 'after_commit', self.after_commit_listener)
61
        event.listen(session, 'after_rollback', self.after_rollback_listener)
62
63
    def after_commit_listener(self, session):
64
        """
65
        Processing the changes.
66
        All new or changed items are now indexed. All deleted items are now removed from the index.
67
        """
68
        log.info('Commiting indexing orders for session %s' % session)
69
        try:
70
            if session.redis is not None:
71
                self._queue_job(session.redis,
72
                                self.settings['redis.queue_name'],
73
                                self.index_operation_name,
74
                                session.index_new[self.cls_name],
75
                                session.index_dirty[self.cls_name],
76
                                session.index_deleted[self.cls_name],
77
                                self.settings)
78
            else:
79
                log.info('Redis not found, falling back to indexing synchronously without redis')
80
                self.index_operation(
81
                    session.index_new[self.cls_name],
82
                    session.index_dirty[self.cls_name],
83
                    session.index_deleted[self.cls_name],
84
                    self.settings
85
                )
86
            session.index_new[self.cls_name].clear()
87
            session.index_dirty[self.cls_name].clear()
88
            session.index_deleted[self.cls_name].clear()
89
        except AttributeError:
90
            log.warning('Trying to commit indexing orders, but indexing sets are not present.')
91
92
    @staticmethod
93
    def _queue_job(redis, queue_name, delegate, *args):
94
        """
95
            creates a new job on the queue
96
            :param redis: redis
97
            :param delegate: method to be executed by the queue. Use fully qualified method name as String.
98
            :param args:  arguments of the method
99
            :return: job_id
100
            """
101
        log.info('Queuing job...')
102
        with Connection(redis):
103
            q = Queue(queue_name)
104
            job = q.enqueue(delegate, *args)
105
            return job.id
106
107
    def after_rollback_listener(self, session):
108
        """
109
        Rollback of the transaction, undo the indexes.
110
        If our transaction is terminated, we will reset the
111
        indexing assignments.
112
        """
113
        log.info('Removing indexing orders.')
114
        try:
115
            session.index_new[self.cls_name].clear()
116
            session.index_dirty[self.cls_name].clear()
117
            session.index_deleted[self.cls_name].clear()
118
        except (AttributeError, KeyError):
119
            log.warning('Trying to remove indexing orders, but indexing sets are not present.')
120
121
    def remove_session(self, session):
122
        """
123
        :param sqlalchemy.session.Session session: Database session to remove
124
        """
125
        try:
126
            del session.redis
127
            del session.index_new[self.cls_name]
128
            del session.index_dirty[self.cls_name]
129
            del session.index_deleted[self.cls_name]
130
        except (AttributeError, KeyError):
131
            log.warning('Removing a session that has no indexing sets.')
132
        self.sessions.remove(session)
133
134
135
def _add_to_session_list(target, operation):
136
    session = object_session(target)
137
    try:
138
        if operation == 'ADD':
139
            session.index_new[target.__class__.__name__].add(target.id)
140
        elif operation == 'UPDATE':
141
            session.index_dirty[target.__class__.__name__].add(target.id)
142
        elif operation == 'REMOVE':
143
            session.index_deleted[target.__class__.__name__].add(target.id)
144
        log.info(operation + ': ' + str(target) + ' {0} from index'.format(target.id))
145
    except (AttributeError, KeyError):
146
        log.warning(
147
            'Trying to register a ' + str(target) +
148
            ' for indexing ' + operation + ', but indexing sets are not present.')