Completed
Push — dev-0.5.2 ( b6a9d6...3bdb98 )
by Felipe A.
01:10
created

WathdogEventSource.watch()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 13
rs 9.4285
c 0
b 0
f 0
1
2
import threading
3
import collections
4
import warnings
5
import os.path
6
7
import watchdog.observers
8
import watchdog.events
9
10
11
class Event(list):
12
    '''
13
    Event subscription list.
14
15
    A list of callable objects. Calling an instance of this will cause a
16
    call to each item in the list in ascending order by index.
17
18
    Usage:
19
    >>> def f(x):
20
    ...     print 'f(%s)' % x
21
    >>> def g(x):
22
    ...     print 'g(%s)' % x
23
    >>> e = Event()
24
    >>> e()
25
    >>> e.append(f)
26
    >>> e(123)
27
    f(123)
28
    >>> e.remove(f)
29
    >>> e()
30
    >>> e += (f, g)
31
    >>> e(10)
32
    f(10)
33
    g(10)
34
    >>> del e[0]
35
    >>> e(2)
36
    g(2)
37
    '''
38
39
    lock_class = threading.Lock
40
    queue_class = collections.deque
41
42
    def __init__(self, iterable=()):
43
        self._lock = self.lock_class()
44
        self._queue = self.queue_class()
45
        super(Event, self).__init__(iterable)
46
47
    def __call__(self, *args, **kwargs):
48
        self._queue.append((args, kwargs))
49
        while self._queue:
50
            if self._lock.acquire(False):
51
                try:
52
                    args, kwargs = self._queue.popleft()
53
                    for f in self:
54
                        f(*args, **kwargs)
55
                finally:
56
                    self._lock.release()
57
            else:
58
                break
59
60
    def __repr__(self):
61
        return "Event(%s)" % list.__repr__(self)
62
63
64
class EventManager(collections.defaultdict):
65
    '''
66
    Attribute-dict creating :class:`Event` objects on demand.
67
68
    Usage:
69
    >>> def f(x):
70
    ...     print 'f(%s)' % x
71
    >>> def g(x):
72
    ...     print 'g(%s)' % x
73
    >>> m = EventManager()
74
    >>> 'e' in m
75
    False
76
    >>> m.e.append(f)
77
    >>> 'e' in m
78
    True
79
    >>> m.e(123)
80
    f(123)
81
    >>> m.e.remove(f)
82
    >>> m.e()
83
    >>> m.e += (f, g)
84
    >>> m.e(10)
85
    f(10)
86
    g(10)
87
    >>> del m.e[0]
88
    >>> m.e(2)
89
    g(2)
90
    '''
91
    def __init__(self, app=None):
92
        self.app = app
93
        super(EventManager, self).__init__(Event)
94
95
    def __getattr__(self, name):
96
        if name.startswith('_'):
97
            return super(Event, self).__getattr__(name)
98
        return self[name]
99
100
101
class WathdogEventSource(object):
102
    observer_class = watchdog.observers.Observer
103
    event_class = collections.namedtuple(
104
        'FSEvent', ('type', 'path', 'source', 'is_directory')
105
        )
106
    event_map = {
107
        watchdog.events.EVENT_TYPE_MOVED: 'fs_move',
108
        watchdog.events.EVENT_TYPE_DELETED: 'fs_remove',
109
        watchdog.events.EVENT_TYPE_CREATED: 'fs_create',
110
        watchdog.events.EVENT_TYPE_MODIFIED: 'fs_modify'
111
        }
112
    _observer = None
113
114
    def __init__(self, manager, app=None):
115
        self.manager = manager
116
        self.app = app
117
        self.reload()
118
119
    def dispatch(self, wevent):
120
        event = self.event_class(
121
            self.event_map[wevent.event_type],
122
            wevent.dest_path if type == 'fs_move' else wevent.src_path,
123
            wevent.src_path,
124
            wevent.is_directory
125
            )
126
        event_type_specific = '%s_%s' % (
127
            event.type,
128
            'file' if event.is_directory else 'directory'
129
            )
130
        self.manager['fs_any'](event)
131
        self.manager[event.type](event)
132
        self.manager[event_type_specific](event)
133
134
    def reload(self):
135
        self.clear()
136
        path = self.app.config.get('base_directory') if self.app else None
137
        if not path:
138
            return
139
        if not os.path.isdir(path):
140
            warnings.warn(
141
                'Path {0!r} is not observable.'.format(path),
142
                category=RuntimeWarning,
143
                stacklevel=2
144
                    )
145
            return
146
        observer = self.observer_class()
147
        observer.schedule(self, path, recursive=True)
148
        observer.start()
149
        self._observer = observer
150
151
    def clear(self):
152
        observer = self._observer
153
        if observer:
154
            observer.unschedule_all()
155
            observer.stop()
156
            observer.join()
157
            self._observer = None
158